aboutsummaryrefslogtreecommitdiff
path: root/broadcast.py
blob: ad27283daf3f6cee42866041b7a70296ecb25360 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import socket
import sys
import struct
import time
import xml.etree.ElementTree as ET
import threading
import optparse

from packet import Packet, CMD, itos

parser = optparse.OptionParser()
parser.add_option('-t', '--test', dest='test', action='store_true', help='Play a test tone (440, 880) on all clients in sequence (the last overlaps with the first of the next)')
parser.add_option('-T', '--sync-test', dest='sync_test', action='store_true', help='Don\'t wait for clients to play tones properly--have them all test tone at the same time')
parser.add_option('-q', '--quit', dest='quit', action='store_true', help='Instruct all clients to quit')
parser.add_option('-f', '--factor', dest='factor', type='float', default=1.0, help='Rescale time by this factor (0<f<1 are faster; 0.5 is twice the speed, 2 is half)')
parser.add_option('-r', '--route', dest='routes', action='append', help='Add a routing directive (see --route-help)')
parser.add_option('-v', '--verbose', dest='verbose', action='store_true', help='Be verbose; dump events and actual time (can slow down performance!)')
parser.add_option('--help-routes', dest='help_routes', action='store_true', help='Show help about routing directives')
parser.set_defaults(routes=[])
options, args = parser.parse_args()

if options.help_routes:
    print '''Routes are a way of either exclusively or mutually binding certain streams to certain playback clients. They are especially fitting in heterogenous environments where some clients will outperform others in certain pitches or with certain parts.

Routes are fully specified by:
-The attribute to be routed on (either type "T", or UID "U")
-The value of that attribute
-The exclusivity of that route ("+" for inclusive, "-" for exclusive)
-The stream group to be routed there.

The syntax for that specification resembles the following:

    broadcast.py -r U:bass=+bass -r U:treble1,U:treble2=+treble -r T:BEEP=-beeps,-trk3,-trk5

The specifier consists of a comma-separated list of attribute-colon-value pairs, followed by an equal sign. After this is a comma-separated list of exclusivities paired with the name of a stream group as specified in the file. The above example shows that stream groups "bass", "treble", and "beeps" will be routed to clients with UID "bass", "treble", and TYPE "BEEP" respectively. Additionally, TYPE "BEEP" will receive tracks 4 and 6 (indices 3 and 5) of the MIDI file (presumably split with -T), and that these three groups are exclusively to be routed to TYPE "BEEP" clients only (the broadcaster will drop the stream if no more are available), as opposed to the preference of the bass and treble groups, which may be routed onto other stream clients if they are available.'''
    exit()

PORT = 13676
factor = options.factor

print 'Factor:', factor

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

clients = []
uid_groups = {}
type_groups = {}

s.sendto(str(Packet(CMD.PING)), ('255.255.255.255', PORT))
s.settimeout(0.5)

try:
	while True:
		data, src = s.recvfrom(4096)
		clients.append(src)
except socket.timeout:
	pass

print 'Clients:'
for cl in clients:
	print cl,
        s.sendto(str(Packet(CMD.CAPS)), cl)
        data, _ = s.recvfrom(4096)
        pkt = Packet.FromStr(data)
        print 'ports', pkt.data[0],
        tp = itos(pkt.data[1])
        print 'type', tp,
        uid = ''.join([itos(i) for i in pkt.data[2:]]).rstrip('\x00')
        print 'uid', uid
        if uid == '':
            uid = None
        uid_groups.setdefault(uid, []).append(cl)
        type_groups.setdefault(tp, []).append(cl)
	if options.test:
		s.sendto(str(Packet(CMD.PLAY, 0, 250000, 440, 255)), cl)
                if not options.sync_test:
                    time.sleep(0.25)
                    s.sendto(str(Packet(CMD.PLAY, 0, 250000, 880, 255)), cl)
	if options.quit:
		s.sendto(str(Packet(CMD.QUIT)), cl)

if options.test and options.sync_test:
    time.sleep(0.25)
    for cl in clients:
        s.sendto(str(Packet(CMD.PLAY, 0, 250000, 880, 255)), cl)

if options.test or options.quit:
    print uid_groups
    print type_groups
    exit()

try:
	iv = ET.parse(args[0]).getroot()
except IOError:
	print 'Bad file'
	exit()

notestreams = iv.findall("./streams/stream[@type='ns']")
groups = set([ns.get('group') for ns in notestreams if 'group' in ns.keys()])
print len(notestreams), 'notestreams'
print len(clients), 'clients'
print len(groups), 'groups'

class Route(object):
    def __init__(self, fattr, fvalue, group, excl=False):
        if fattr == 'U':
            self.map = uid_groups
        elif fattr == 'T':
            self.map = type_groups
        else:
            raise ValueError('Not a valid attribute specifier: %r'%(fattr,))
        self.value = fvalue
        if group not in groups:
            raise ValueError('Not a present group: %r'%(group,))
        self.group = group
        self.excl = excl
    @classmethod
    def Parse(cls, s):
        fspecs, _, grpspecs = map(lambda x: x.strip(), s.partition('='))
        fpairs = []
        ret = []
        for fspec in [i.strip() for i in fspecs.split(',')]:
            fattr, _, fvalue = map(lambda x: x.strip(), fspec.partition(':'))
            fpairs.append((fattr, fvalue))
        for part in [i.strip() for i in grpspecs.split(',')]:
            for fattr, fvalue in fpairs:
                if part[0] == '+':
                    ret.append(Route(fattr, fvalue, part[1:], False))
                elif part[0] == '-':
                    ret.append(Route(fattr, fvalue, part[1:], True))
                else:
                    raise ValueError('Not an exclusivity: %r'%(part[0],))
        return ret
    def __repr__(self):
        return '<Route of %r to %s:%s>'%(self.group, ('U' if self.map is uid_groups else 'T'), self.value)

class RouteSet(object):
    def __init__(self, clis=None):
        if clis is None:
            clis = clients
        self.clients = clis
        self.routes = []
    def Route(self, stream):
        grp = stream.get('group')
        if options.verbose:
            print 'Routing', grp, '...'
        excl = False
        for route in self.routes:
            if route.group == grp:
                if options.verbose:
                    print 'Matches route', route
                excl = excl or route.excl
                matches = filter(lambda x, route=route: route.Apply(x), self.clients)
                if matches:
                    if options.verbose:
                        print 'Using client', matches[0]
                    self.clients.remove(matches[0])
                    return matches[0]
                print 'No matches, moving on...'
        if excl:
            if options.verbose:
                print 'Exclusively routed, no route matched.'
            return None
        if not self.clients:
            if options.verbose:
                print 'Out of clients, no route matched.'
            return None
        cli = self.clients.pop(0)
        if options.verbose:
            print 'Default route to', cli
        return cli

routeset = RouteSet()
for rspec in options.routes:
    routeset.routes.extend(Route.Parse(rspec))

if options.verbose:
    print 'All routes:'
    for route in routeset.routes:
        print route

class NSThread(threading.Thread):
        def wait_for(self, t):
            if t <= 0:
                return
            time.sleep(t)
	def run(self):
		nsq, cl = self._Thread__args
		for note in nsq:
			ttime = float(note.get('time'))
			pitch = int(note.get('pitch'))
			vel = int(note.get('vel'))
			dur = factor*float(note.get('dur'))
			while time.time() - BASETIME < factor*ttime:
				self.wait_for(factor*ttime - (time.time() - BASETIME))
			s.sendto(str(Packet(CMD.PLAY, int(dur), int((dur*1000000)%1000000), int(440.0 * 2**((pitch-69)/12.0)), vel*2)), cl)
                        if options.verbose:
                            print (time.time() - BASETIME), cl, ': PLAY', pitch, dur, vel
			self.wait_for(dur - ((time.time() - BASETIME) - factor*ttime))
                if options.verbose:
                    print '% 6.5f'%(time.time() - BASETIME,), cl, ': DONE'

threads = []
for ns in notestreams:
    cli = routeset.Route(ns)
    if cli:
        nsq = ns.findall('note')
        threads.append(NSThread(args=(nsq, clients.pop(0))))

BASETIME = time.time()
for thr in threads:
	thr.start()
for thr in threads:
	thr.join()