summaryrefslogtreecommitdiff
path: root/workspace.py
blob: add047bbeb7cbfd66cde511181262e86690c54e9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
"""Workspace module.

Contains the main Workspace implementation.
"""

import datetime as dt
import json
import subprocess as subp 
import xml.etree.ElementTree as et

from .constants import AUDIO, SCENE, BXW
from .functions.function import Function
from .exceptions import LoadError
from .interfaces import XMLSerializable

def ffprobe_audio_length(f: str, path: str = "ffprobe") -> int:
    """Use ffprobe to check audio length in milliseconds.

    Will always return the nearest whole millisecond greater than or equal to the duration.

    Parameters:
        f: the path to check
        path: the path of ffprobe
    """
    try:
        a = subp.check_output([path, "-show_format", "-print_format", "json", f], stderr=subp.DEVNULL)
    except subp.CalledProcessError:
        return 0
    return int(1000*float(json.loads(a)["format"]["duration"])+0.5)

class Workspace(XMLSerializable):
    """Class representing a audiovisual workspace."""
    def __init__(self, name: str, author: str, version: int, modified: dt.datetime):
        self.name = name
        self.author = author
        self.version = version
        self.modified = modified

        self.fixtures = {}
        self._last_fixture_id = -1
        self.functions = {}
        self._last_function_id = -1

        self._audio_lengths = {}

        self._change_callbacks = {}
        self._delete_callbacks = {}

    def get_audio_length(self, filename: str) -> int:
        """Determine the audio length of the given file. 

        This value is returned from the cache, if available. 
        """
        if filename not in self._audio_lengths:
            self._audio_lengths[filename] = ffprobe_audio_length(filename)
        return self._audio_lengths[filename]

    def recheck_audio_length(self, filename: str) -> int:
        """Determine the audio length of the given file. 

        This function re-probes the value, updating the cache result and the durations of 
        Audio functions. 
        """
        self._audio_lengths[filename] = ffprobe_audio_length(filename)
        for f in self.functions.values():
            if f.type == AUDIO and f.filename == filename:
                f.duration = self._audio_lengths[f.filename]
        return self._audio_lengths[filename]
        
    def recheck_audio_lengths(self):
        """Recheck and update all audio lengths."""
        for filename in self._audio_lengths:
            self._audio_lengths[filename] = ffprobe_audio_length(filename)
        for f in self.functions.values():
            if f.type == AUDIO and f.filename in self._audio_lengths:
                f.filename = f.filename

    def register_fixture(self, f: "Fixture"):
        """Register the fixture in the Workspace. 

        Always called when the fixture is instantiated.
        """
        if f.id in self.fixtures:
            raise ValueError("A fixture with that ID already exists")
        self.fixtures[f.id] = f

    def register_function(self, f: Function):
        """Register the function in the Workspace. 

        Always called when the function is instantiated.
        """
        if f.id in self.functions:
            raise ValueError("A function with that ID already exists")
        self.functions[f.id] = f

    @property
    def next_fixture_id(self):
        """Return the next fixture ID."""
        return self._last_fixture_id + 1

    @property 
    def next_function_id(self):
        """Return the next function ID."""
        return self._last_function_id + 1

    def delete_channel(self, c: "Fixture.Channel"):
        """Notify that the given channel was deleted. 

        This is used for removing deleted channels from functions. 
        """
        for f in self.functions.values():
            if f.type == SCENE and c in f.scope:
                f.delete_channel(c)

    def delete_callbacks(self, owner, f: int = None):
        """Remove all callbacks registered by the owner. 

        :param f: the function to remove from (all if None)
        """
        if isinstance(f, Function):
            f = f.id
        for g in self._change_callbacks:
            if f is None or g == f:
                self._change_callbacks[g] = [i for i in self._change_callbacks[g] if i[0] != owner]
        for g in self._delete_callbacks:
            if f is None or g == f:
                self._delete_callbacks[g] = [i for i in self._delete_callbacks[g] if i[0] != owner]

    def register_function_change_callback(self, f: int, callback, owner = None):
        """Register a callback for a function change.

        :param f: the ID of the function to monitor
        :param callback: a function to call, accepting the Function
        :param owner: optional ID to use for removal of the callback later
        """
        if isinstance(f, Function):
            f = f.id
        if f not in self._change_callbacks:
            self._change_callbacks[f] = []
        self._change_callbacks[f].append((owner, callback))

    def register_function_delete_callback(self, f: int, callback, owner = None):
        """Register a callback for a function deletion.

        :param f: the ID of the function to monitor 
        :param callback: a function to call, accepting the Function
        :param owner: optional ID to use for removal of the callback later
        """
        if isinstance(f, Function):
            f = f.id
        if f not in self._delete_callbacks:
            self._delete_callbacks[f] = []
        self._delete_callbacks[f].append((owner, callback))

    def function_changed(self, f: Function):
        """Called when a function is changed. 

        :param f: the changed function
        """
        for callback in self._change_callbacks[f.id]:
            callback(f)

    def function_deleted(self, f: Function):
        """Called when a function is deleted. 

        This also handles removing the function from the Workspace.
        """
        for callback in self._delete_callbacks[f.id]:
            callback(f)

        self.delete_callbacks(f)

        del self.functions[f.id]

    @classmethod
    def load(cls, filename):
        """Load the workspace from a file."""
        tree = et.parse(filename)
        root = tree.getroot()

        return cls.deserialize(None, root)

    @classmethod
    def deserialize(cls, w, e):
        from .topology import Fixture 
        from .functions.scene import Scene
        from .functions.audio import Audio

        if e.tag != BXW+"workspace":
            raise LoadError("Root tag must be workspace")

        try:
            name, author, version, modified, fixtures, functions = e
        except ValueError:
            raise LoadError("Invalid workspace layout")

        ## First load the metadata so we can create the workspace
        name = name.text
        author = author.text
        version = int(version.text)
        modified = dt.datetime.fromisoformat(modified.text)

        w = cls(name=name, author=author, version=version, modified=modified)

        ## Now load the fixtures 
        for fixture in fixtures:
            Fixture.deserialize(w, fixture)

        ## Finally, load the functions
        for function in functions:
            type_ = function.get("type")
            if type_ == AUDIO:
                Audio.deserialize(w, function)
            elif type_ == SCENE:
                Scene.deserialize(w, function)
            else:
                raise LoadError("Unknown function type \"%s\"" % type_)

        return w

    def serialize(self) -> et.Element:
        root = et.Element(BXW+"workspace")

        et.SubElement(root, BXW+"name").text = self.name 
        et.SubElement(root, BXW+"author").text = self.author 
        et.SubElement(root, BXW+"version").text = str(self.version)
        et.SubElement(root, BXW+"modified").text = dt.datetime.now().isoformat()

        fixtures = et.SubElement(root, BXW+"fixtures")
        for n, fixture in enumerate(self.fixtures.values()):
            fe = fixture.serialize()
            fixtures.insert(n, fe)

        functions = et.SubElement(root, BXW+"functions")
        for n, function in enumerate(self.functions.values()):
            fe = function.serialize()
            functions.insert(n, fe)
        
        return root


    def save(self, filename):
        """Save the workspace to a file."""
        et.register_namespace("", BXW.strip("{}"))
        root = self.serialize()
        XMLSerializable.indent(root)
        tree = et.ElementTree(element=root)
        tree.write(filename, encoding="unicode")