plusdeck.dbus.interface

This module contains the core DbusInterface class. This class is used directly to serve the interface, and is subclassed by the DbusClient class in plusdeck.dbus.client.

You will likely not use this module directly. For information on the client, see plusdeck.dbus.client. For information on the service, see plusdeck.dbus.service.

DbusInterface

Bases: DbusInterfaceCommonAsync

A DBus interface for controlling the Plus Deck 2C PC Cassette Deck.

Source code in plusdeck/dbus/interface.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
class DbusInterface(  # type: ignore
    DbusInterfaceCommonAsync, interface_name=DBUS_NAME  # type: ignore
):
    """
    A DBus interface for controlling the Plus Deck 2C PC Cassette Deck.
    """

    def __init__(self: Self, client: Client, config_file: Optional[str] = None) -> None:
        super().__init__()
        self._config: Config = Config.from_file(config_file)
        self.client: Client = client
        self._client_lock: asyncio.Lock = asyncio.Lock()
        self._rcv: Optional[Receiver] = None
        self._current_state: State = State.UNSUBSCRIBED
        self.subscribe()

    @dbus_property_async("(ss)")
    def config(self: Self) -> ConfigT:
        """
        The DBus service's currently loaded configuration.
        """

        return (self._config.file or "", self._config.port)

    def subscribe(self: Self) -> None:
        self._subscription = asyncio.create_task(self.subscription())

    async def subscription(self: Self) -> None:
        if not self._rcv:
            self._rcv: Optional[Receiver] = await self.client.subscribe()

        while True:
            if not self._rcv:
                break
            try:
                state: State = await self._rcv.get_state()
                self._current_state = state
                self.state.emit(state.name)  # type: ignore
            except TimeoutError:
                pass

    async def close(self: Self) -> None:
        """
        Unsubscribe from events and close the client.
        """

        async with self._client_lock:
            await self.client.unsubscribe()

            self._rcv = None
            await self._subscription
            self.client.close()
            await self.client.closed

    @property
    def closed(self: Self) -> asyncio.Future:
        """
        A Future that resolves when the client is closed.
        """

        return self.client.closed

    @dbus_method_async("", flags=DbusUnprivilegedFlag)
    async def play_a(self: Self) -> None:
        """
        Play side A.
        """
        self.client.play_a()

    @dbus_method_async("", flags=DbusUnprivilegedFlag)
    async def play_b(self: Self) -> None:
        """
        Play side B.
        """

        self.client.play_b()

    @dbus_method_async("", flags=DbusUnprivilegedFlag)
    async def fast_forward_a(self: Self) -> None:
        """
        Fast-forward side A.
        """

        self.client.fast_forward_a()

    @dbus_method_async("", flags=DbusUnprivilegedFlag)
    async def fast_forward_b(self: Self) -> None:
        """
        Fast-forward side B.
        """

        self.client.fast_forward_b()

    @dbus_method_async("", flags=DbusUnprivilegedFlag)
    async def rewind_a(self: Self) -> None:
        """
        Rewind side A. Equivalent to fast-forwarding side B.
        """

        self.client.rewind_a()

    @dbus_method_async("", flags=DbusUnprivilegedFlag)
    async def rewind_b(self: Self) -> None:
        """
        Rewind side B. Equivalent to fast-forwarding side A.
        """

        self.client.rewind_b()

    @dbus_method_async("", flags=DbusUnprivilegedFlag)
    async def pause(self: Self) -> None:
        """
        Pause if playing, or start playing if paused.
        """

        self.client.pause()

    @dbus_method_async("", flags=DbusUnprivilegedFlag)
    async def stop(self: Self) -> None:
        """
        Stop the tape.
        """

        self.client.stop()

    @dbus_method_async("", flags=DbusUnprivilegedFlag)
    async def eject(self: Self) -> None:
        """
        Eject the tape.
        """

        self.client.eject()

    @dbus_method_async("sd", "b", flags=DbusUnprivilegedFlag)
    async def wait_for(self: Self, state: str, timeout: float) -> bool:
        """
        Wait for an expected state, with an optional timeout. When timeout is negative,
        it will be ignored.
        """

        ok = True

        st = State[state]
        to = timeout if timeout >= 0 else None

        try:
            await self.client.wait_for(st, to)
        except TimeoutError:
            ok = False

        return ok

    @dbus_property_async("s")
    def current_state(self: Self) -> str:
        """
        Get the last known state of the Plus Deck 2C PC Cassette Deck.
        """
        return self._current_state.name

    @dbus_signal_async("s")
    def state(self: Self) -> str:
        """
        Listen for updates to the state of the Plus Deck 2C Cassette Deck.
        """

        raise NotImplementedError("state")

closed property

A Future that resolves when the client is closed.

close() async

Unsubscribe from events and close the client.

Source code in plusdeck/dbus/interface.py
71
72
73
74
75
76
77
78
79
80
81
82
async def close(self: Self) -> None:
    """
    Unsubscribe from events and close the client.
    """

    async with self._client_lock:
        await self.client.unsubscribe()

        self._rcv = None
        await self._subscription
        self.client.close()
        await self.client.closed

config()

The DBus service's currently loaded configuration.

Source code in plusdeck/dbus/interface.py
46
47
48
49
50
51
52
@dbus_property_async("(ss)")
def config(self: Self) -> ConfigT:
    """
    The DBus service's currently loaded configuration.
    """

    return (self._config.file or "", self._config.port)

current_state()

Get the last known state of the Plus Deck 2C PC Cassette Deck.

Source code in plusdeck/dbus/interface.py
182
183
184
185
186
187
@dbus_property_async("s")
def current_state(self: Self) -> str:
    """
    Get the last known state of the Plus Deck 2C PC Cassette Deck.
    """
    return self._current_state.name

eject() async

Eject the tape.

Source code in plusdeck/dbus/interface.py
155
156
157
158
159
160
161
@dbus_method_async("", flags=DbusUnprivilegedFlag)
async def eject(self: Self) -> None:
    """
    Eject the tape.
    """

    self.client.eject()

fast_forward_a() async

Fast-forward side A.

Source code in plusdeck/dbus/interface.py
107
108
109
110
111
112
113
@dbus_method_async("", flags=DbusUnprivilegedFlag)
async def fast_forward_a(self: Self) -> None:
    """
    Fast-forward side A.
    """

    self.client.fast_forward_a()

fast_forward_b() async

Fast-forward side B.

Source code in plusdeck/dbus/interface.py
115
116
117
118
119
120
121
@dbus_method_async("", flags=DbusUnprivilegedFlag)
async def fast_forward_b(self: Self) -> None:
    """
    Fast-forward side B.
    """

    self.client.fast_forward_b()

pause() async

Pause if playing, or start playing if paused.

Source code in plusdeck/dbus/interface.py
139
140
141
142
143
144
145
@dbus_method_async("", flags=DbusUnprivilegedFlag)
async def pause(self: Self) -> None:
    """
    Pause if playing, or start playing if paused.
    """

    self.client.pause()

play_a() async

Play side A.

Source code in plusdeck/dbus/interface.py
92
93
94
95
96
97
@dbus_method_async("", flags=DbusUnprivilegedFlag)
async def play_a(self: Self) -> None:
    """
    Play side A.
    """
    self.client.play_a()

play_b() async

Play side B.

Source code in plusdeck/dbus/interface.py
 99
100
101
102
103
104
105
@dbus_method_async("", flags=DbusUnprivilegedFlag)
async def play_b(self: Self) -> None:
    """
    Play side B.
    """

    self.client.play_b()

rewind_a() async

Rewind side A. Equivalent to fast-forwarding side B.

Source code in plusdeck/dbus/interface.py
123
124
125
126
127
128
129
@dbus_method_async("", flags=DbusUnprivilegedFlag)
async def rewind_a(self: Self) -> None:
    """
    Rewind side A. Equivalent to fast-forwarding side B.
    """

    self.client.rewind_a()

rewind_b() async

Rewind side B. Equivalent to fast-forwarding side A.

Source code in plusdeck/dbus/interface.py
131
132
133
134
135
136
137
@dbus_method_async("", flags=DbusUnprivilegedFlag)
async def rewind_b(self: Self) -> None:
    """
    Rewind side B. Equivalent to fast-forwarding side A.
    """

    self.client.rewind_b()

state()

Listen for updates to the state of the Plus Deck 2C Cassette Deck.

Source code in plusdeck/dbus/interface.py
189
190
191
192
193
194
195
@dbus_signal_async("s")
def state(self: Self) -> str:
    """
    Listen for updates to the state of the Plus Deck 2C Cassette Deck.
    """

    raise NotImplementedError("state")

stop() async

Stop the tape.

Source code in plusdeck/dbus/interface.py
147
148
149
150
151
152
153
@dbus_method_async("", flags=DbusUnprivilegedFlag)
async def stop(self: Self) -> None:
    """
    Stop the tape.
    """

    self.client.stop()

wait_for(state, timeout) async

Wait for an expected state, with an optional timeout. When timeout is negative, it will be ignored.

Source code in plusdeck/dbus/interface.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@dbus_method_async("sd", "b", flags=DbusUnprivilegedFlag)
async def wait_for(self: Self, state: str, timeout: float) -> bool:
    """
    Wait for an expected state, with an optional timeout. When timeout is negative,
    it will be ignored.
    """

    ok = True

    st = State[state]
    to = timeout if timeout >= 0 else None

    try:
        await self.client.wait_for(st, to)
    except TimeoutError:
        ok = False

    return ok