Existe uma maneira de detectar se uma chamada do Skype está em andamento? (uso dbus? pulseaudio?)

3

Eu construí um pequeno gadget que está pendurado na porta do meu escritório. Eu posso acendê-lo através de um script no meu computador para indicar que estou ocupado e as pessoas não devem entrar. Eu gostaria de executar esse script, sinalizando que estou ocupado, sempre que estou em uma chamada do Skype.

Existe uma maneira de detectar se estou em uma chamada do Skype e executando um script?

Eu estava pensando em usar o dbus e monitorar o uso do pulseaudio, mas não tenho ideia de por onde começar. Alguém pode me apontar na direção certa? O ideal seria um tipo de coisa do tipo ouvinte de interrupção ou evento em python.

Editar:

Há uma pergunta relacionada que não é tranquila e não funciona com a minha versão do Ubuntu (16.04).

O

mplugd também parece promissor.

    
por con-f-use 18.04.2017 / 11:30

2 respostas

1

Bem, isso pode não ser perfeito, mas é um bom protótipo de shell usando pactl para monitoramento.

Aqui, uma parte da saída quando uma chamada está em execução:

pactl list source-outputs

Source Output #2
    Driver: protocol-native.c
    Owner Module: 8
    Client: 11
    Source: 2
    Sample Specification: s16le 2ch 44100Hz
    Channel Map: front-left,front-right
    Format: pcm, format.sample_format = "\"s16le\""  format.rate = "44100"  format.channels = "2"  format.channel_map = "\"front-left,front-right\""
    Corked: no
    Mute: no
    Volume: front-left: 65536 / 100% / 0.00 dB,   front-right: 65536 / 100% / 0.00 dB
            balance 0.00
    Buffer Latency: 0 usec
    Source Latency: 66 usec
    Resample method: n/a
    Properties:
        application.icon_name = "chromium-browser"
        media.name = "RecordStream"
        application.name = "Chrome input"
        native-protocol.peer = "UNIX socket client"
        native-protocol.version = "30"
        application.process.id = "17053"
        application.process.user = "sneetsher"
        application.process.host = "sneetsher-blueskies"
        application.process.binary = "skypeforlinux"
        window.x11.display = ":0.0"
        application.language = "en_US.UTF-8"
        application.process.machine_id = "00074bc0a72a47d49284ce5b9bcda899"
        application.process.session_id = "c4"
        module-stream-restore.id = "source-output-by-application-name:Chrome input"

pactl list sink-inputs

Sink Input #166
    Driver: protocol-native.c
    Owner Module: 8
    Client: 16
    Sink: 1
    Sample Specification: float32le 2ch 44100Hz
    Channel Map: front-left,front-right
    Format: pcm, format.sample_format = "\"float32le\""  format.rate = "44100"  format.channels = "2"  format.channel_map = "\"front-left,front-right\""
    Corked: no
    Mute: no
    Volume: front-left: 65536 / 100% / 0.00 dB,   front-right: 65536 / 100% / 0.00 dB
            balance 0.00
    Buffer Latency: 51519 usec
    Sink Latency: 11124 usec
    Resample method: copy
    Properties:
        application.icon_name = "chromium-browser"
        media.name = "Playback"
        application.name = "Skype for Linux Beta"
        native-protocol.peer = "UNIX socket client"
        native-protocol.version = "30"
        application.process.id = "17053"
        application.process.user = "sneetsher"
        application.process.host = "sneetsher-blueskies"
        application.process.binary = "skypeforlinux"
        window.x11.display = ":0.0"
        application.language = "en_US.UTF-8"
        application.process.machine_id = "00074bc0a72a47d49284ce5b9bcda899"
        application.process.session_id = "c4"
        module-stream-restore.id = "sink-input-by-application-name:Skype for Linux Beta"

Sink Input #167
    Driver: protocol-native.c
    Owner Module: 8
    Client: 17
    Sink: 1
    Sample Specification: float32le 2ch 44100Hz
    Channel Map: front-left,front-right
    Format: pcm, format.sample_format = "\"float32le\""  format.rate = "44100"  format.channels = "2"  format.channel_map = "\"front-left,front-right\""
    Corked: no
    Mute: no
    Volume: front-left: 65536 / 100% / 0.00 dB,   front-right: 65536 / 100% / 0.00 dB
            balance 0.00
    Buffer Latency: 28480 usec
    Sink Latency: 11061 usec
    Resample method: copy
    Properties:
        application.icon_name = "chromium-browser"
        media.name = "Playback"
        application.name = "Skype for Linux Beta"
        native-protocol.peer = "UNIX socket client"
        native-protocol.version = "30"
        application.process.id = "17053"
        application.process.user = "sneetsher"
        application.process.host = "sneetsher-blueskies"
        application.process.binary = "skypeforlinux"
        window.x11.display = ":0.0"
        application.language = "en_US.UTF-8"
        application.process.machine_id = "00074bc0a72a47d49284ce5b9bcda899"
        application.process.session_id = "c4"
        module-stream-restore.id = "sink-input-by-application-name:Skype for Linux Beta"
    
por user.dz 18.04.2017 / 12:23
1

Ok, eu entendi. Ainda é difícil e só testado no Ubuntu 16.04 com o Python 2.7, mas o que quer que seja:

#!/usr/bin/python
# This monitors pulseaudio streams via a dbus event-listener.
#
# On Ubuntu to enable pulseaudio dbus support:
#
#   echo -e '.ifexists module-dbus-protocol.so\nload-module module-dbus-protocol\n.endif' >> ~/.pulse/pulse/default.pa
#   # globally: /etc/pulse/default.pa
#
# And restart pulseaudio:
#
#   pkill pulseaudio; pulseaudio
#

from __future__ import division, print_function, unicode_literals
import dbus, os, gobject, logging
from dbus.mainloop.glib import DBusGMainLoop
from logging import info, debug, error, warning as warn
#logging.getLogger().setLevel(logging.DEBUG)

skypestream = []
cstr = 'org.PulseAudio.Core1'

# convert byte array to string
def dbus2str(db):
    if type(db)==dbus.Struct:
        return str(tuple(dbus2str(i) for i in db))
    if type(db)==dbus.Array:
        return "".join([dbus2str(i) for i in db])
    if type(db)==dbus.Dictionary:
        return dict((dbus2str(k), dbus2str(v)) for k, v in db.items())
    if type(db)==dbus.String:
        return db+''
    if type(db)==dbus.UInt32:
        return str(db+0)
    if type(db)==dbus.Byte:
        return chr(db)
    if type(db)==dbus.Boolean:
        return db==True
    if type(db)==dict:
        return dict((dbus2str(k), dbus2str(v)) for k, v in db.items())
    return "(%s:%s)" % (type(db), db)


def sig_handler(path=None, sender=None, msg=None):
    debug( '\n\npath: %s\n%s\n\nsender: %s\n%s\n\nmsg: %s\n%s\n\n',
        path, dir(path), sender, dir(sender), msg, dir(msg) )
    mem = msg.get_member()
    dbus_pstreams = (
        dbus.Interface(
            pulse_bus.get_object(object_path=path),
            dbus_interface='org.freedesktop.DBus.Properties'
        ) for path in core1.Get(
            cstr,
            'PlaybackStreams',
            dbus_interface='org.freedesktop.DBus.Properties' )
        )

    pstreams = {}
    for pstream in dbus_pstreams:
        try:
            pstreams[pstream.Get(cstr+'.Stream', 'Index')] =  pstream
        except dbus.exceptions.DBusException:
            pass
    if pstreams:
        for stream in pstreams.keys():
            plist = pstreams[stream].Get(cstr+'.Stream', 'PropertyList')
            appname = dbus2str(plist.get('application.name', None))
            if mem == 'PlaybackStreamRemoved' and path in skypestream:
                skypestream.remove(path)
                print('no-skype')
            if appname.find('Skype') > -1:
                if mem == 'NewPlaybackStream':
                    skypestream.append(path)
                    print('busy', appname)


def pulse_bus_address():
    address = None
    if 'PULSE_DBUS_SERVER' in os.environ:
        address = os.environ['PULSE_DBUS_SERVER']
    else:
        bus = dbus.SessionBus()
        server_lookup = bus.get_object("org.PulseAudio1",
            "/org/pulseaudio/server_lookup1")
        address = server_lookup.Get("org.PulseAudio.ServerLookup1",
            "Address", dbus_interface="org.freedesktop.DBus.Properties")
        debug(address)
    if not address: raise RuntimeError('No pulseaudio dbus address found!')
    return address


if __name__ == "__main__":
    DBusGMainLoop(set_as_default=True)

    loop = gobject.MainLoop()

    pulse_bus = dbus.connection.Connection(pulse_bus_address())
    core1 = pulse_bus.get_object(object_path='/org/pulseaudio/core1')

    core1.ListenForSignal(cstr+'.NewPlaybackStream', dbus.Array(signature="o"))
    core1.ListenForSignal(cstr+'.PlaybackStreamRemoved', dbus.Array(signature="o"))

    pulse_bus.add_signal_receiver(sig_handler, message_keyword='msg')

    loop.run()
    
por con-f-use 18.04.2017 / 20:22