diff options
Diffstat (limited to 'workspace.py')
-rwxr-xr-x | workspace.py | 1007 |
1 files changed, 0 insertions, 1007 deletions
diff --git a/workspace.py b/workspace.py deleted file mode 100755 index c9f9a35..0000000 --- a/workspace.py +++ /dev/null @@ -1,1007 +0,0 @@ -#!/usr/bin/env python3 - -"""Module for parsing and rendering QLC workspaces. - -Note that all instances of all classes in this module should be considered immutable unless -otherwise stated: this program is designed for reading QLC workspaces only, not modifying them. -Additionally, no Function should be modified after creation: many properties are set at -creation time and will not be updated. - -# Differences from QLC+ - -1. Fade timimg: there might be a 1-2ms difference in the fade lengths between this program and - QLC+. If this is a problem, I would recommend also swapping out all human eyeballs and - creating a protocol that isn't DMX. - -2. Restrictions: certain pointless things are disallowed by this program that are allowed by - QLC+ (though they usually crash it): circular references (e.g. chaser A includes chaser B - which itself includes chaser A...), infinite length shows (this one does crash QLC+). - -3. Sequences: there are no sequences in this program, only chasers. To create a sequence by - hand, create a new Scene for each sequence step, attach it to a ChaserStep with the desired - parameters, then attach the steps to a Chaser. This is automated by Workspace.load so you - should never have to deal with this. - -4. Function overlapping: overlapping of functions on one track of a show is theoretically - supported; as this isn't supported in QLC+, this is untested. - -5. Channel group values: channel group values are ignored. As far as I can tell, QLC+ sets the - value of the individual channels as well wherever channel groups are used, and I'm not sure - how QLC+ determines which value takes precedence when multiple channel groups share a - channel. - -6. Fading: this program ignores scene and sequence fade times (they seem unused) as well as the - settings on chasers/sequences for step fading (QLC+ overwrites the step fade times anyways). - Neither of these should have any effect on output compared to QLC+. - -7. Sequence fading: QLC+ is just wack here. This worked as of this writing: create a show with - one track and create a sequence on that track with three steps. Step 1 has 0ms fade in, 1s - hold, 1s fade out and holds channel 1 at 100. Step 2 has 0ms fade in, 1s hold, 0ms fade out - and holds channel 2 at 100 (others at 0). Step 3 has 0ms fade in, 1s hold, 500ms fade out, - and holds channel 3 at 100. According to QLC+, despite dislaying the proper values, the - actual fade out times for the steps are 0ms, 500ms, and 500ms, respectively. The point is - that QLC+ has no idea what to do for fade outs. This program interprets fade ins and outs as - identical in effect; combining them allows for somewhat non-linear fading. If you wish to - replicate the QLC+ behavior, hook up a random number generator to the fading and go nuts. - -8. Show fading: QLC+ tends to cut fade-outs that overlap with other steps on the same track; - seeing as QLC+ lacks any fade-out logic, BLC holds fades as long as they specify. - -9? Precedence: BLC adopts a highest-takes-precedence doctrine when determining what level lights - should be held at. This may be different than QLC+. - -# Pre-Rendering Workspaces - -The typical way to render workspaces is to determine the appropriate top-level function (i.e. a -Show or Chaser), render that function periodically, and output the values. However, if you are -paranoid, certain functions can be entirely rendered ahead of time, leaving you to merely -dispatch the values at the appropriate time. - -Any Show may be pre-rendered using Show's prerender method. This will return an iterable of the -form: - - [(time, values), ...] - -Where values is as returned by a single call to the Show's render method. The values given at a -certain time index must be held until the next time index. - -A Chaser may be pre-rendered provided it satisfies: - - - No infinite length fades - - All steps of infinite-length are Scenes - -Chaser's render_all method will return an iterable of iterables of the form: - - [[(time, values), ...], ...] - -Each block of (time, values) pairs represents an infinite segment in the chaser, i.e. the final -value in each block should be held until some condition becomes true. This does restrict the -chaser in that steps of finite length cannot be skipped, so take this into account. In all but -the first step, values does not necessarily have a value for each channel in the show's scope; -it gives only the changed values at that time. Note also that this only supports the rendering -of single-shot chasers presently. Additionally, time is reset to 0 at the start of each block. - -# General Notes for Implementation - -- When a function is fading, render always returns nx=1. The reason for this is that it would - require a lot more computation to calculate a more accurate value, requiring the function to - render not only the current time index but also all time indexes until the value actually - changes. render_all fixes this by returning only changed values, but still renders every time - index during fades. If rendering shows "live", i.e. without pre-rendering, I recommend taking - nx = max(nx, m) for some m > 10 (e.g. ~16 for 60 Hz DMX), as rendering faster than the - transmission rate of your connection is pointless. - -- This library is thread-safe except for the Function "data" objects: these objects may only be - used in one thread at a time. - -- The hash function on each class is likely to be slow: use it to prevent running an even slower - operation if a function hasn't changed; a Function's hash will be consistent as long as the - workspace on disk doesn't change -""" - -from abc import ABC, abstractmethod -import json -from multiprocessing.pool import ThreadPool -import subprocess as subp -import logging - -from lxml import etree - -## BEGIN Constants - -QLC_INFTY = 429467294 - -CHASER = "Chaser" -STEP = "Step" -SCENE = "Scene" -SHOW = "Show" -SEQUENCE = "Sequence" -AUDIO = "Audio" - -FORWARD = "Forward" -LOOP = "Loop" -SINGLESHOT = "SingleShot" - -QXW = "{http://www.qlcplus.org/Workspace}" - -## END Constants - -## BEGIN Utility functions - -def ffprobe_audio_length(f, path="ffprobe"): - """Use ffprobe to check audio length in milliseconds. - - Will always return the nearest whole millisecond greater than or equal to the duration. - - Parameters: - f: the path to check - path: the path of ffprobe - """ - try: - a = subp.check_output([path, "-show_format", "-print_format", "json", f], stderr=subp.DEVNULL) - except subp.CalledProcessError: - return 0 - return int(1000*float(json.loads(a)["format"]["duration"])+0.5) - -## END Utility functions - -## BEGIN Topology classes -class Fixture: - """Class representing a single light fixture. - - May be composed of multiple channels. - """ - def __hash__(self): - return self._hash - - def __repr__(self): - return "Fixture(id=%d, name=%s, universe=%d, start=%d, channels=%d)" % (self.id, self.name, self.universe.id, self.address_start, self.channel_count) - - def __init__(self, id_, name, address, universe, mode, channels=1): - self.name = name - self.address_start = address - self.channel_count = channels - self.mode = mode - self.universe = universe - self.id = id_ - self._hash = hash((self.name, self.address_start, self.channel_count, self.mode, - self.id, self.universe)) - self.channels = [Channel(self, i) for i in range(channels)] - -class Channel: - """Class representing a single output channel.""" - def __hash__(self): - return self._hash - - def __repr__(self): - return "Channel(address=%d)" % (self.address) - - def __init__(self, fixture, offset): - if offset >= fixture.channel_count or offset < 0: - raise ValueError("Invalid offset") - self.fixture = fixture - self.offset = offset - self.address = self.fixture.address_start + offset - self.universe = self.fixture.universe - self._hash = hash((self.fixture, self.offset, self.address)) - -class ChannelGroup: - """Class representing a group of output channels.""" - def __hash__(self): - return self._hash - - def __repr__(self): - return "ChannelGroup(id=%d, name=%s, channels=(%s))" % (self.id, self.name, - ", ".join((repr(c) for c in self.channels))) - - def __init__(self, id_, name, channels): - self.id = id_ - self.name = name - self.channels = tuple(channels) - - self._hash = hash((self.id, self.name, self.channels)) - -class Universe: - """Class representing an output universe.""" - def __hash__(self): - return self._hash - - def __repr__(self): - return "Universe(id=%d, name=%s)" % (self.id, self.name) - - def __init__(self, id_, name): - self.id = id_ - self.name = name - - self._hash = hash((self.id, self.name)) - -## END Toplogy classes - -## BEGIN Base classes - -class Function(ABC): - """Class for representing the generic attributes of a QLC function. - - id is not necessarily globally unique: in most cases it will be, but it may just be unique - to a given parent function (e.g. two sequences can each have a different step with the same - id). - - duration is the "hard" duration of the function: for steps of sequences/tracks/chasers, this - is the fade in time plus the hold time of the step and is the time that must elapse - (barring skipping) before another step can run. actual_duration is the actual duration of - the function; in the same setting, this would be the sum of the fade in, hold, and fade out - times. - - scope must be an iterable of channels representing all of the channels used by this function - regardless of whether or not they are currently being used. - - This class itself must be stateless: anything that requires storage of state must also - require the caller to store that state. - """ - repr_attr = ("id", "name",) - def __hash__(self): - return self._hash - - @staticmethod - def get_data(): - """Return an initial state for the function.""" - return None - - def __repr__(self): - buff = [] - for c in self.repr_attr: - if not issubclass(type(c), str): - c, f = c - v = repr(f(getattr(self,c))) - else: - v = repr(getattr(self,c)) - buff.append("%s=%s" % (c,v)) - - return "%s(%s)" % (self.__class__.__name__, ", ".join(buff)) - - @abstractmethod - def render(self, t: int, data=None): - """Render the function at the given time. - - Parameters: - t: the time index to render in milliseconds. The first time index is 0. - data: the state of the function. - - t must be relative to the start time of this function. data may be used to pass in - state information if necessary (e.g. current step for chasers). - - This function must return a 4-tuple: - - (values, audio cues, next change, data) - - Where values is a tuple of (channel, value) elements, audio_cues is a tuple of - (filename, aid, start time, fade in time, fade out time, fade out start) elements, aid - may be used to uniquely identify instances of audio cues. - - next_change is the time index of the next lighting change, and data is the state data - (None if unused). values must contain a value for exactly those channels provided in - scope. - - In the event of an infinite amount of time until the next change, QLC_INFTY is returned. - If this function is fading, 1 should be returned (the minimum time unit). If the - function is done rendering, -1 should be returned. - - It is not an error to call render with a time index greater than the duration of the - function: ((), (), -1, None) should be returned in this case. However, the time index - will always be nonnegative. - - It is an error to call render with data that has been used to render a future time; this - is undefined behavior. - """ - return - - def __init__(self, id_, type_, name, scope, hidden=False, duration=-1, actual_duration=-1): - self.id = id_ - self.type = type_ - self.name = name - self.hidden = hidden - self.duration = min(QLC_INFTY, duration) - self.actual_duration = min(QLC_INFTY, actual_duration) - self.scope = tuple(scope) - - self._hash = hash((self.id, self.type, self.name, self.scope, self.hidden, self.duration, - self.actual_duration)) - -class FadeFunction(Function): - """QLC function that can fade in/out.""" - def __init__(self, id_, type_, name, scope, hidden=False, duration=-1, actual_duration=-1, fade_in=0, fade_out=0): - if fade_in >= QLC_INFTY or fade_out >= QLC_INFTY: - raise ValueError("Fades cannot be infinite") - super().__init__(id_, type_, name, scope, hidden=hidden, duration=duration, actual_duration=actual_duration) - self.fade_in = min(QLC_INFTY, fade_in) - self.fade_out = min(QLC_INFTY, fade_out) - - self._hash = hash((self._hash, self.fade_in, self.fade_out)) - -class Advanceable(ABC): - """Function that may be advanced.""" - @abstractmethod - def advance(self, data): - """Advance the function.""" - return - -## END Base classes - -## BEGIN Function classes - -class Audio(FadeFunction): - """Class for a QLC+ audio function.""" - repr_attr = ("id", "fname", "fade_in", "fade_out",) - def render(self, t, data=None): - """Render the audio function. - - We do not seek to do anything related to audio in this library: the responsibility for - mixing, fading, playing, probing, etc. the audio file is with the specific application. - As such, this function only returns the relevant data for the audio function.o - """ - if t > self.duration: - return (), (), -1, data - - return (), ((0, self.id, self.fname, self.fade_in, self.fade_out, self.duration-self.fade_out),), self.duration+1-t, data - - def __init__(self, id_, name, fname, fade_in, fade_out, length, run_order=SINGLESHOT, hidden=False): - super().__init__(id_, AUDIO, name, (), hidden=hidden, duration=length, - actual_duration=length, fade_in=fade_in, fade_out=fade_out) - self.fname = fname - self.run_order = run_order - self._hash = hash((self._hash, self.fname, self.run_order)) - -class Scene(Function): - """Class for a QLC Scene. - - duration, fade_in, and fade_out are present in the XML but are ignored by QLC. - - Scenes are mostly meaningless on their own in this context, they must be attached to a - chaser/show to do anything. - """ - def render(self, t, data=None): - """All arguments are unused.""" - return self.values, (), QLC_INFTY, None - - def __init__(self, id_, name, values, hidden=False): - super().__init__(id_, SCENE, name, (c for c,v in values), hidden=hidden, duration=-1, actual_duration=-1) - self.values = tuple(values) - self._hash = hash((self._hash, self.values)) - -class ChaserStep(FadeFunction): - """A single step in a chaser.""" - repr_attr = ("id", "name", "hold", "fade_in", "fade_out", ("function", lambda f: f.id)) - class ChaserStepData: - """Data for the step.""" - def __init__(self, fd, start_time, end_time): - self.fd = fd - self.start_time = start_time - self.end_time = end_time - - def get_data(self, start_time=0): - return self.ChaserStepData(fd=self.function.get_data(), start_time=start_time, end_time=self.duration) - - def render(self, t, data:ChaserStepData=None): - ## The logic is different here: we never check the actual duration of this function and - ## never return -1, the responsibility for determining if this step is over lies with - ## the Chaser. The return value is also different: we return (vals, mul) instead of just - ## vals. mul is the "multiplier" for the function, i.e. what we think that this function - ## should be rendered at. If t > actual_duration, then mul will be 0 (this function is - ## done), but we still need to return the values because the next step might be fading - ## in and so will need to know the values of this function. - if data is None: - data = self.get_data() - t -= data.start_time - ## Render the function at time t - values, acues, nx, data.fd = self.function.render(t, data=data.fd) - ## Determine the multiplier - mul = 1 - if self.fade_in > 0 and t < self.fade_in: ## Fade in first - mul = min(1,t/self.fade_in) - nx = 1 - elif self.fade_out > 0: ## Then fade out - ft = t - data.end_time + 1 - if ft > 0: - mul = 1-min(1,ft/(self.fade_out)) - nx = -1 if ft > self.fade_out else 1 ## Check if we're done - else: - nx = min(nx, -ft + 1) - elif t >= data.end_time: - mul = 0 - - if t < data.end_time: - nx = min(nx, data.end_time-t) - - nacues = [] - for s, aid, f, fin, fout, fstart in acues: - if fstart + fout > self.fade_out + data.end_time: - fstart = data.end_time - self.fade_out - fout = self.fade_out - nacues.append((s+data.start_time, hash((self.id, data.start_time, aid)), - f, max(self.fade_in, fin), fout, fstart)) - - return (values, mul), tuple(nacues), nx, data - - def __init__(self, id_, fade_in, fade_out, hold, function): - super().__init__(id_, STEP, function.name, function.scope, hidden=False, - duration=hold+fade_in, actual_duration=hold+fade_out+fade_in, - fade_in=fade_in, fade_out=fade_out) - self.id = id_ - self.hold = hold - self.function = function - self._hash = hash((self._hash, self.function, self.hold)) - -class Chaser(Function, Advanceable): - """Class for representing a QLC+ Chaser or Sequence. - - Since they essentially do the same thing (Chaser being more general), they have only one - class here.""" - repr_attr = ("id", "name", ("steps", lambda s: ",".join((i.id for i in s)))) - class ChaserData: - """Current state of a chaser.""" - def __init__(self, step_data, obey_loop): - self.step_data = step_data - self.obey_loop = obey_loop - - @staticmethod - def advance(t, data): - """End the current chaser step. - - After calling this function, the chaser must be rendered at a time at least t before - calling it again. - """ - if data.step_data: - data.step_data[-1][1].end_time = t - data.step_data[-1][1].start_time - - return data - - def get_data(self): - return self.ChaserData(step_data=[], obey_loop=True) - - def next_step(self, n) -> int: ## TODO: Implement other chaser types - """Return the next step in the chaser.""" - if self.run_order == LOOP: - return (n+1) % len(self.steps) - elif self.run_order == SINGLESHOT: - if n >= len(self.steps) - 1: - return -1 - return n+1 - return None - - def render(self, t, data=None): - if t >= self.actual_duration: ## Quick check - return (), (), -1, data - elif data is None: - data = self.get_data() - - if not data.step_data: - data.step_data.append((0, self.steps[0].get_data())) - - vals = {c: 0 for c in self.scope} - nx = QLC_INFTY - i = 0 - acues = [] - svs = [] - ## First pass, get values - while i < len(data.step_data): - sn, sd = data.step_data[i] - step = self.steps[sn] - sv, sacues, snx, _ = step.render(t, sd) - acues.extend(sacues) - ## Figure out if we're fading out or in - svs.append((t > (sd.start_time+sd.end_time), sv)) - if t >= sd.start_time + sd.end_time and i+1 == len(data.step_data): ## Add the next step - nsn = self.next_step(sn) - if nsn != -1: ## Still another step to do - nss = sd.start_time + sd.end_time - data.step_data.append((nsn, self.steps[nsn].get_data(nss))) - if t >= sd.start_time+sd.end_time+step.fade_out and (len(data.step_data) == i+1 or (len(data.step_data) > i+1 and t >= data.step_data[i+1][1].start_time + self.steps[data.step_data[i+1][0]].fade_in)): ## Done this step - data.step_data.pop(i) - continue - if snx < nx and snx != -1: - nx = snx - i += 1 - - ## Second pass, handle fading - zero = {c: 0 for c in self.scope} - for i, (fout, (cval,mul)) in enumerate(svs): - if mul == 0: - continue - cval = dict(cval) - - if mul == 1: ## Don't bother looking for another one - other = zero - elif fout: ## Grab the previous step's values - other = zero if i+1 == len(svs) else dict(svs[i+1][1][0]) - else: ## Grab the next step's values - other = zero if i == 0 else dict(svs[i-1][1][0]) - - for c in self.scope: - v = (other[c]*(1-mul) if c in other else 0) + (mul*cval[c] if c in cval else 0) - v = min(255, int(v+0.5)) - if vals[c] < v: - vals[c] = v - - if not data.step_data: - return (), (), -1, data - - return tuple(vals.items()), tuple(sorted(acues, key=lambda a: a[0])), nx, data - - def render_all(self, minnx=1): - """Render the entire Chaser.""" - ## Verify that we can render this one - if self.run_order != SINGLESHOT: - raise ValueError("Can only render SingleShot Chasers") - for s in self.steps: - if s.hold == QLC_INFTY and s.function.actual_duration == QLC_INFTY: - raise ValueError("Cannot render Chaser with infinite hold of infinite function") - elif QLC_INFTY in (s.fade_in, s.fade_out): - raise ValueError("Cannot render Chaser with infinite fades") - - steps = [] - - t = 0 - start_time = 0 - data = None - current = [] - acurrent = [] - - values = {c: 0 for c in self.scope} - - ## We're gonna have to break encapsulation here - while True: - vals, acues, nx, data = self.render(t, data=data) - changes = [] - for c,v in vals: - if t == 0 or values[c] != v: - values[c] = v - changes.append((c, v)) - current.append((t-start_time, tuple(changes))) - - acurrent += [(t-start_time, *others) for t,*others in acues] - - if nx == -1 or nx >= QLC_INFTY: - ## Done the current step - steps.append((tuple(current), tuple(acurrent))) - if nx == -1: - ## Done - break - ## Reached an infinite segment, advance - current = [] - acurrent = [] - t += 1 - start_time = t - data = self.advance(t, data) - else: - t += max(minnx, nx) - - return tuple(steps) - - def __init__(self, id_, name, steps, hidden=False, run_order=SINGLESHOT, direction=FORWARD): - if run_order not in (LOOP, SINGLESHOT): - raise NotImplementedError("Only Loop and SingleShot chasers are currently supported") - if direction not in (FORWARD,): - raise NotImplementedError("Only Forward direction chasers are currently supported") - scope = set() - if run_order == SINGLESHOT: - max_t = 0 - cur = 0 - for s in steps: - max_t = max(max_t, cur+s.actual_duration) - scope.update(s.scope) - cur += s.duration - dur = sum(map(lambda s: s.duration, steps)) - elif run_order == LOOP: - for s in steps: - scope.update(s.scope) - max_t = QLC_INFTY - dur = QLC_INFTY - super().__init__(id_, CHASER, name, scope, hidden=hidden, - duration=dur, actual_duration=max_t) - self.steps = tuple(steps) - self.run_order = run_order - self.direction = direction - self._hash = hash((self._hash, self.steps, self.run_order, self.direction)) - -class ShowFunction(Function): - """Class for representing a function in a show.""" - repr_attr = ("id", "name", "start_time", ("function", lambda f: f.id)) - def render(self, t, data=None): - if data is None: - data = self.function.get_data() - - values, acues, nx, data = self.function.render(t-self.start_time, data=data) - return values, tuple(((at+self.start_time,hash((self.id, self.start_time, aid)), - *others) for at,aid,*others in acues)), nx, data - - def __init__(self, id_, name, function, start_time): - if function.actual_duration >= QLC_INFTY: - raise ValueError("Cannot have infinite-length functions in shows") - super().__init__(id_, "ShowFunction", name, function.scope, duration=function.duration, - actual_duration=function.actual_duration) - self.function = function - self.start_time = start_time - self._hash = hash((self._hash, self.start_time, self.function)) - -class ShowTrack(Function): - """Class for representing a track in a show.""" - repr_attr = ("id", "name", ("functions", lambda fs: ','.join(("%d@%d" % (f.function.id, f.start_time) for f in fs)))) - def get_data(self): - return tuple((f.function.get_data() for f in self.functions)) - - def render(self, t, data=None): - if t > self.actual_duration: - return (), (), -1, data - - if data is None: - data = self.get_data() - - values = {c: 0 for c in self.scope} - acues = [] - nx = QLC_INFTY - for f,d in zip(self.functions,data): - if t < f.start_time or t > f.start_time + f.actual_duration: - continue - vals, sacues, snx, _ = f.render(t, data=d) - acues.extend(sacues) - for c, v in vals: - if v > values[c]: - values[c] = v - if snx < 0: - continue - elif snx < nx: - nx = snx - if nx == QLC_INFTY: - nx = min((f.start_time-t for f in self.functions if f.start_time > t), default=-1) - - return tuple(values.items()), tuple(sorted(acues, key=lambda a: a[0])), nx, data - - def __init__(self, id_, name, functions): - dur = -1 - adur = -1 - self.functions = tuple(sorted(functions, key=lambda f: f.start_time)) - scope = set() - for f in self.functions: - if f.start_time + f.actual_duration > adur: - adur = f.start_time + f.actual_duration - if f.start_time + f.duration > dur: - dur = f.start_time + f.duration - scope.update(f.scope) - super().__init__(id_, "ShowTrack", name, scope, duration=dur, actual_duration=adur) - self._hash = hash((self._hash, self.functions)) - -class Show(Function): - """Class representing a QLC+ show.""" - def render(self, t, data=None): - if t > self.actual_duration: - return (), (), -1, data - - if data is None: - data = tuple((t.get_data() for t in self.tracks)) - - values = {c: 0 for c in self.scope} - nx = QLC_INFTY - acues = [] - for track,d in zip(self.tracks,data): - if t > track.actual_duration: - continue - vals, tacues, tnx, _ = track.render(t, data=d) - acues.extend(tacues) - if tnx == -1: - continue - for c,v in vals: - if values[c] < v: - values[c] = v - if tnx < nx: - nx = tnx - - return tuple(values.items()), tuple(sorted(acues, key=lambda a: a[0])), nx, data - - def render_all(self, minnx=1): - """Render the entire show. - - minnx is the minimum amount of time between render steps. Setting this to a few - milliseconds less than the transmission time of your connection should be fine, but - the default value of 1 ensures that every fade is rendered as perfectly as it can be - when using integer milliseconds. The time-complexity of this function is approximately - linear in minnx (e.g. minnx=10 will be around 10 times faster than minnx=1 for the same - show). - - This function returns: - - cues, audio_cues - - Where cues is of the form: - - [(time, ((channel, value), (channel, value), ...)), ...] - - Note that (channel, value) pairs are only present if that channel changed value at the - given t value, so values must be held at previous levels if they are ommitted. - - audio_cues is of the form: - - [(start time, filename, fade in, fade out, fade out start time), ...] - - Both cues and audio_cues are sorted by t/start time. A typical loop for rendering - lighting cues would be: - - cues, _ = show.render_all() - current_time = 0 - - while cues: - while cues[0][0] < current_time: - _, changes = cues.pop(0) - for c, v in changes: - ## Set address c to value v - """ - if self.actual_duration == QLC_INFTY: - raise ValueError("Cannot render infinite-length shows (please rethink your life if you created this show)") - - acues = set() - cues = [] - t = 0 - current = {c: 0 for c in self.scope} - data = None - while True: - changes = [] - vals, tacues, nx, data = self.render(t, data=data) - for c,v in vals: - if t == 0 or current[c] != v: - changes.append((c.address, v)) - current[c] = v - if changes: - cues.append((t,tuple(changes))) - acues.update(tacues) - if nx < 0: - break - t += max(nx, minnx) - - return tuple(cues), tuple(sorted(acues, key=lambda a: a[1])) - - def __init__(self, id_, name, tracks): - scope = set() - dur = -1 - adur = -1 - for t in tracks: - scope.update(t.scope) - if t.duration > dur: - dur = t.duration - if t.actual_duration > adur: - adur = t.actual_duration - super().__init__(id_, SHOW, name, scope, duration=dur, actual_duration=adur) - self.tracks = tuple(tracks) - self._hash = hash((self._hash, self.tracks)) - -## END Function classes - -## BEGIN Primary classes - -class Workspace: - """Class for representing a QLC workspace. - - Should be created using Workspace.load and is assumed to be immutable. - """ - @classmethod - def load(cls, fname, audio_length=ffprobe_audio_length): - """Load a QLC+ workspace. - - This function returns the created Workspace object. - - Parameters: - fname: the file to load from. May be any format accepted by lxml.etree.parse. - audio_length: a function accepting an audio filename and returning the length of - that audio file in milliseconds. - """ - a = etree.parse(fname) - ws = a.getroot() - - creator = ws.find(QXW+"Creator") - self = cls(creator.find(QXW+"Name").text, creator.find(QXW+"Version").text, - creator.find(QXW+"Author").text) - - engine = ws.find(QXW+"Engine") - - ## Load universes - logging.info("Loading universes...") - for u in engine.find(QXW+"InputOutputMap").findall(QXW+"Universe"): - uid = int(u.attrib["ID"]) - self.universes[uid] = Universe(uid, u.attrib["Name"]) - logging.info("Loaded %d universe(s)" % len(self.universes)) - - ## Load fixtures - logging.info("Loading fixtures...") - total_channels = 0 - for f in engine.iterfind(QXW+"Fixture"): - fid = int(f.find(QXW+"ID").text) - uid = int(f.find(QXW+"Universe").text) - name = f.find(QXW+"Name").text - address = int(f.find(QXW+"Address").text) + 1 ## TODO: +1, yes or no? - channels = int(f.find(QXW+"Channels").text) - total_channels += channels - mode = f.find(QXW+"Mode") - self.fixtures[fid] = Fixture(fid, name, address, self.universes[uid], mode, channels=channels) - logging.info("Loaded %d fixtures with %d channels" % (len(self.fixtures), total_channels)) - - ## Load channel groups - logging.info("Loading channel groups...") - for cg in engine.iterfind(QXW+"ChannelsGroup"): - vals = [int(i) for i in cg.text.split(',')] - cg = ChannelGroup(int(cg.attrib["ID"]), cg.attrib["Name"], - [self.fixtures[fid].channels[offset] for fid, offset in zip(vals[::2], vals[1::2])]) - self.channel_groups[cg.id] = cg - logging.info("Loaded %d channel groups" % len(self.channel_groups)) - - logging.info("Determining proper function load order...") - load = [] - audio_fnames = [] - ids = set() - work = engine.iterfind(QXW+"Function") - while work: - todo = [] - for f in work: - typ = f.attrib["Type"] - bad = False - if typ == SHOW: - for t in f.iterfind(QXW+"Track"): - for s in t.iterfind(QXW+"ShowFunction"): - if s.attrib["ID"] not in ids: - bad = True - break - if bad: - break - elif typ == CHASER: - for s in f.iterfind(QXW+"Step"): - if s.text not in ids: - bad = True - break - elif typ == AUDIO: - audio_fnames.append(f.find(QXW+"Source").text) - if bad: - todo.append(f) - else: - ids.add(f.attrib["ID"]) - load.append(f) - work = todo - logging.info("Found %d functions" % len(load)) - - ## Calculate all audio lengths before load. This will reduce duplicate calls if the same - ## file is present in multiple functions and lets us use a ThreadPool to speed it up - logging.info("Scanning %d audio functions..." % len(audio_fnames)) - with ThreadPool() as pool: - audio_fnames = tuple(set(audio_fnames)) - audio_lengths = {f: l for f,l in zip(audio_fnames, pool.map(audio_length, audio_fnames))} - - if 0 in audio_lengths.values(): - for f,l in audio_lengths.items(): - if l == 0: - logging.warning("zero-length audio file \"%s\"" % f) - - logging.info("Scanned %d audio functions" % len(load)) - - ## Now have an appropriate load order, load them - logging.info("Loading functions...") - for func in load: - ftype = func.attrib["Type"] - sid = int(func.attrib["ID"]) - speed = func.find(QXW+"Speed") - if speed is not None: - fin = int(speed.attrib["FadeIn"]) - fout = int(speed.attrib["FadeOut"]) - else: - fin = None - fout = None - hidden = ("Hidden" in func.attrib) and (func.attrib["Hidden"] == "True") - name = func.attrib["Name"] - ro = func.find(QXW+"RunOrder") - if ro is not None: - ro = ro.text - - if ftype == SCENE: ## Scenes can't depend on other scenes, do them first - values = [] - for v in func.iterfind(QXW+"FixtureVal"): - if v.text is None: - vals = (0, 0) - else: - vals = [int(i) for i in v.text.split(',')] - fixture = self.fixtures[int(v.attrib["ID"])] - for offset, val in zip(vals[::2], vals[1::2]): - values.append((fixture.channels[offset], val)) - - func = Scene(sid, name, values, hidden=hidden) - elif ftype == AUDIO: - fname = func.find(QXW+"Source").text - func = Audio(sid, name, fname, fin, fout, audio_lengths[fname], run_order=ro, - hidden=hidden) - elif ftype == SEQUENCE: - ## smodes = func.find(QXW+"SpeedModes") - ## sfin = smodes.attrib["FadeIn"] - ## sfout = smodes.attrib["FadeOut"] - ## sdur = smodes.attrib["Duration"] - ## bound_scene = self.functions[int(func.attrib["BoundScene"])] - steps = [] - for step in func.iterfind(QXW+"Step"): - stfin = int(step.attrib["FadeIn"]) - stnum = int(step.attrib["Number"]) - stfout = int(step.attrib["FadeOut"]) - sthold = int(step.attrib["Hold"]) - used = set() - values = [] - if step.text is not None: - conv = step.text.split(':') - for fid, val in zip(conv[::2], conv[1::2]): - fixture = self.fixtures[int(fid)] - offset, value = val.split(',') - channel = fixture.channels[int(offset)] - used.add(channel) - values.append((channel, int(value))) - ## for c,_ in bound_scene.values: - ## if c not in used: - ## values.append((c, 0)) - scene = Scene(stnum, "", values, hidden=True) - step = ChaserStep(stnum, fade_in=stfin, fade_out=stfout, hold=sthold, - function=scene) - steps.append(step) - func = Chaser(sid, name, steps, hidden=hidden, - run_order=func.find(QXW+"RunOrder").text, - direction=func.find(QXW+"Direction").text) - elif ftype == SHOW: ## Finally shows - ## td = func.find(QXW+"TimeDivision") - ## tdtype = td.attrib["Type"] - ## tdbpm = int(td.attrib["BPM"]) - tracks = [] - for track in func.iterfind(QXW+"Track"): - tmute = track.attrib["isMute"] == "1" - if tmute: - continue - tid = int(track.attrib["ID"]) - tname = track.attrib["Name"] - ## if "SceneID" in track.attrib: - ## tscene = self.functions[int(track.attrib["SceneID"])] - ## else: - ## tscene = None - funcs = [] - for sf in track.iterfind(QXW+"ShowFunction"): - sfid = int(sf.attrib["ID"]) - sfstart = int(sf.attrib["StartTime"]) - funcs.append(ShowFunction(sfid, "", self.functions[sfid], sfstart)) - - tracks.append(ShowTrack(tid, tname, funcs)) - if not tracks: - continue - func = Show(sid, name, tracks) - elif ftype == CHASER: - ## smodes = func.find(QXW+"SpeedModes") - ## sfin = smodes.attrib["FadeIn"] - ## sfout = smodes.attrib["FadeOut"] - ## sdur = smodes.attrib["Duration"] - steps = [] - for step in func.iterfind(QXW+"Step"): - stfin = int(step.attrib["FadeIn"]) - stnum = int(step.attrib["Number"]) - stfout = int(step.attrib["FadeOut"]) - sthold = int(step.attrib["Hold"]) - stid = int(step.text) - step = ChaserStep(stid, stfin, stfout, sthold, self.functions[stid]) - steps.append(step) - func = Chaser(sid, name, steps, hidden=hidden, - run_order=func.find(QXW+"RunOrder").text, - direction=func.find(QXW+"Direction").text) - else: - raise ValueError("Unhandled type %s" % ftype) - - self.functions[sid] = func - - logging.info("Loaded %d top-level functions" % len(self.functions)) - - return self - - def __init__(self, creator, version, author): - self.universes = {} - self.fixtures = {} - self.channel_groups = {} - self.creator = creator - self.version = version - self.author = author - self.functions = {} - -## END Primary classes |