import socket import sys import struct import time import xml.etree.ElementTree as ET import threading import thread import optparse import random import itertools from packet import Packet, CMD, itos parser = optparse.OptionParser() parser.add_option('-t', '--test', dest='test', action='store_true', help='Play a test tone (440, 880) on all clients in sequence (the last overlaps with the first of the next)') parser.add_option('--test-delay', dest='test_delay', type='float', help='Time for which to play a test tone') parser.add_option('-T', '--transpose', dest='transpose', type='int', help='Transpose by a set amount of semitones (positive or negative)') parser.add_option('--sync-test', dest='sync_test', action='store_true', help='Don\'t wait for clients to play tones properly--have them all test tone at the same time') parser.add_option('-R', '--random', dest='random', type='float', help='Generate random notes at approximately this period') parser.add_option('--rand-low', dest='rand_low', type='int', help='Low frequency to randomly sample') parser.add_option('--rand-high', dest='rand_high', type='int', help='High frequency to randomly sample') parser.add_option('-l', '--live', dest='live', help='Enter live mode (play from a controller in real time), specifying the port to connect to as "client,port"; use just "," to manually subscribe later') parser.add_option('-L', '--list-live', dest='list_live', action='store_true', help='List all the clients and ports that can be connected to for live performance') parser.add_option('--no-sustain', dest='no_sustain', action='store_true', help='Don\'t use sustain hacks in live mode') parser.add_option('-q', '--quit', dest='quit', action='store_true', help='Instruct all clients to quit') parser.add_option('-p', '--play', dest='play', action='append', help='Play a single tone or chord (specified multiple times) on all listening clients (either "midi pitch" or "@frequency")') parser.add_option('-P', '--play-async', dest='play_async', action='store_true', help='Don\'t wait for the tone to finish using the local clock') parser.add_option('-D', '--duration', dest='duration', type='float', help='How long to play this note for') parser.add_option('-V', '--volume', dest='volume', type='float', help='Master volume [0.0, 1.0]') parser.add_option('-s', '--silence', dest='silence', action='store_true', help='Instruct all clients to stop playing any active tones') parser.add_option('-S', '--seek', dest='seek', type='float', help='Start time in seconds (scaled by --factor)') parser.add_option('-f', '--factor', dest='factor', type='float', help='Rescale time by this factor (0 0 and dispinfo.current_w > 0: DISP_WIDTH = dispinfo.current_w DISP_HEIGHT = dispinfo.current_h print 'Pygame info' WIDTH = DISP_WIDTH if options.pg_width > 0: WIDTH = options.pg_width HEIGHT = DISP_HEIGHT if options.pg_height > 0: HEIGHT = options.pg_height flags = 0 if options.fullscreen: flags |= pygame.FULLSCREEN disp = pygame.display.set_mode((WIDTH, HEIGHT), flags) print 'Disp acquire' PFAC = HEIGHT / 128.0 clock = pygame.time.Clock() print 'Pygame GUI initialized, running...' while True: disp.scroll(-1, 0) disp.fill((0, 0, 0), (WIDTH - 1, 0, 1, HEIGHT)) idx = 0 for cli, note in sorted(playing_notes.items(), key = lambda pair: pair[0]): pitch = note[0] col = colorsys.hls_to_rgb(float(idx) / len(targets), note[1]/2.0, 1.0) col = [int(i*255) for i in col] disp.fill(col, (WIDTH - 1, HEIGHT - pitch * PFAC - PFAC, 1, PFAC)) idx += 1 pygame.display.flip() for ev in pygame.event.get(): if ev.type == pygame.KEYDOWN: if ev.key == pygame.K_ESCAPE: thread.interrupt_main() pygame.quit() exit() clock.tick(60) GUIS['pygame'] = gui_pygame PORT = 13676 factor = options.factor print 'Factor:', factor s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) if options.bind_addr: addr, _, port = options.bind_addr.partition(':') if not port: port = '12074' s.bind((addr, int(port))) clients = [] targets = [] uid_groups = {} type_groups = {} if not options.dry: s.sendto(str(Packet(CMD.PING)), ('255.255.255.255', PORT)) s.settimeout(options.wait_time) try: while True: data, src = s.recvfrom(4096) clients.append(src) except socket.timeout: pass print len(clients), 'detected clients' print 'Clients:' for cl in clients: print cl, s.sendto(str(Packet(CMD.CAPS)), cl) data, _ = s.recvfrom(4096) pkt = Packet.FromStr(data) print 'ports', pkt.data[0], tp = itos(pkt.data[1]) print 'type', tp, uid = ''.join([itos(i) for i in pkt.data[2:]]).rstrip('\x00') print 'uid', uid if uid == '': uid = None uid_groups.setdefault(uid, []).append(cl) type_groups.setdefault(tp, []).append(cl) if options.test: ts, tms = int(options.test_delay), int(options.test_delay * 1000000) % 1000000 s.sendto(str(Packet(CMD.PLAY, ts, tms, 440, options.volume)), cl) if not options.sync_test: time.sleep(options.test_delay) s.sendto(str(Packet(CMD.PLAY, ts, tms, 880, options.volume)), cl) if options.quit: s.sendto(str(Packet(CMD.QUIT)), cl) if options.silence: s.sendto(str(Packet(CMD.PLAY, 0, 1, 1, 0.0)), cl) for i in xrange(pkt.data[0]): targets.append(cl+(i,)) playing_notes = {} for tg in targets: playing_notes[tg] = (0, 0) if options.gui: gui_thr = threading.Thread(target=GUIS[options.gui], args=()) gui_thr.setDaemon(True) gui_thr.start() if options.play: for i, val in enumerate(options.play): if val.startswith('@'): options.play[i] = int(val[1:]) else: options.play[i] = int(440.0 * 2**((int(val) - 69)/12.0)) for i, cl in enumerate(targets): s.sendto(str(Packet(CMD.PLAY, int(options.duration), int(1000000*(options.duration-int(options.duration))), options.play[i%len(options.play)], options.volume, cl[2])), cl[:2]) if not options.play_async: time.sleep(options.duration) exit() if options.test and options.sync_test: time.sleep(0.25) for cl in targets: s.sendto(str(Packet(CMD.PLAY, 0, 250000, 880, 1.0, cl[2])), cl[:2]) if options.test or options.quit or options.silence: print uid_groups print type_groups exit() if options.random > 0: while True: for cl in targets: s.sendto(str(Packet(CMD.PLAY, int(options.random), int(1000000*(options.random-int(options.random))), random.randint(options.rand_low, options.rand_high), options.volume, cl[2])), cl[:2]) time.sleep(options.random) if options.live or options.list_live: if options.gui: print 'Waiting a second for GUI init...' time.sleep(3.0) import midi from midi import sequencer S = sequencer.S if options.list_live: print sequencer.SequencerHardware() exit() seq = sequencer.SequencerRead(sequencer_resolution=120) client_set = set(targets) active_set = {} # note (pitch) -> [client] deferred_set = set() # pitches held due to sustain sustain_status = False client, _, port = options.live.partition(',') if client or port: seq.subscribe_port(client, port) seq.start_sequencer() if not options.gui: # FIXME seq.set_nonblock(False) while True: ev = S.event_input(seq.client) if ev is None: time.sleep(0) event = None if ev: if options.verbose: print 'SEQ:', ev if ev < 0: seq._error(ev) if ev.type == S.SND_SEQ_EVENT_NOTEON: event = midi.NoteOnEvent(channel = ev.data.note.channel, pitch = ev.data.note.note, velocity = ev.data.note.velocity) elif ev.type == S.SND_SEQ_EVENT_NOTEOFF: event = midi.NoteOffEvent(channel = ev.data.note.channel, pitch = ev.data.note.note, velocity = ev.data.note.velocity) elif ev.type == S.SND_SEQ_EVENT_CONTROLLER: event = midi.ControlChangeEvent(channel = ev.data.control.channel, control = ev.data.control.param, value = ev.data.control.value) elif ev.type == S.SND_SEQ_EVENT_PGMCHANGE: event = midi.ProgramChangeEvent(channel = ev.data.control.channel, value = ev.data.control.value) elif ev.type == S.SND_SEQ_EVENT_PITCHBEND: event = midi.PitchWheelEvent(channel = ev.data.control.channel, pitch = ev.data.control.value) elif options.verbose: print 'WARNING: Unparsed event, type %r'%(ev.type,) continue if event is not None: if isinstance(event, midi.NoteOnEvent) and event.velocity == 0: event.__class__ = midi.NoteOffEvent if options.verbose: print 'EVENT:', event if isinstance(event, midi.NoteOnEvent): if event.pitch in active_set: if sustain_status: deferred_set.discard(event.pitch) inactive_set = client_set - set(sum(active_set.values(), [])) if not inactive_set: print 'WARNING: Out of clients to do note %r; dropped'%(event.pitch,) continue cli = sorted(inactive_set)[0] s.sendto(str(Packet(CMD.PLAY, 65535, 0, int(440.0 * 2**((event.pitch-69)/12.0)), event.velocity / 127.0, cli[2])), cli[:2]) active_set.setdefault(event.pitch, []).append(cli) playing_notes[cli] = (event.pitch, event.velocity / 127.0) if options.verbose: print 'LIVE:', event.pitch, '+ =>', active_set[event.pitch] elif isinstance(event, midi.NoteOffEvent): if event.pitch not in active_set or not active_set[event.pitch]: print 'WARNING: Deactivating inactive note %r'%(event.pitch,) continue if sustain_status: deferred_set.add(event.pitch) continue cli = active_set[event.pitch].pop() s.sendto(str(Packet(CMD.PLAY, 0, 1, 1, 0)), cli) playing_notes[cli] = (0, 0) if options.verbose: print 'LIVE:', event.pitch, '- =>', active_set[event.pitch] if sustain_status: print '...ignored (sustain on)' elif isinstance(event, midi.ControlChangeEvent): if event.control == 64 and not options.no_sustain: sustain_status = (event.value >= 64) if options.verbose: print 'LIVE: SUSTAIN', ('+' if sustain_status else '-') if not sustain_status: for pitch in deferred_set: if pitch not in active_set or not active_set[pitch]: print 'WARNING: Attempted deferred removal of inactive note %r'%(pitch,) continue for cli in active_set[pitch]: s.sendto(str(Packet(CMD.PLAY, 0, 1, 1, 0, cli[2])), cli[:2]) playing_notes[cli] = (0, 0) del active_set[pitch] deferred_set.clear() if options.repeat: args = itertools.cycle(args) for fname in args: if options.pcm and not fname.endswith('.iv'): if fname == '-': import wave pcr = wave.open(sys.stdin) samprate = pcr.getframerate() pcr.read = pcr.readframes else: try: import audiotools pcr = audiotools.open(fname).to_pcm() assert pcr.channels == 1 and pcr.bits_per_sample == 16 and pcr.sample_rate == 44100 samprate = pcr.sample_rate except ImportError: import wave pcr = wave.open(fname, 'r') assert pcr.getnchannels() == 1 and pcr.getsampwidth() == 2 and pcr.getframerate() == 44100 samprate = pcr.getframerate() pcr.read = pcr.readframes def read_all(fn, n): buf = '' while len(buf) < n: nbuf = fn.read(n - len(buf)) if not isinstance(nbuf, str): nbuf = nbuf.to_bytes(False, True) buf += nbuf return buf BASETIME = time.time() - options.pcmlead sampcnt = 0 buf = read_all(pcr, 16) while len(buf) >= 32: frag = buf[:32] buf = buf[32:] for cl in clients: s.sendto(struct.pack('>L', CMD.PCM) + frag, cl) sampcnt += len(frag) / 2 delay = max(0, BASETIME + (sampcnt / float(samprate)) - time.time()) #print sampcnt, delay if delay > 0: time.sleep(delay) if len(buf) < 32: buf += read_all(pcr, 16) try: iv = ET.parse(fname).getroot() except IOError: import traceback traceback.print_exc() print fname, ': Bad file' continue notestreams = iv.findall("./streams/stream[@type='ns']") groups = set([ns.get('group') for ns in notestreams if 'group' in ns.keys()]) number = (len(notestreams) * abs(options.number) if options.number < 0 else options.number) print len(notestreams), 'notestreams' print len(clients), 'clients' print len(targets), 'targets' print len(groups), 'groups' print number, 'clients used (number)' class Route(object): def __init__(self, fattr, fvalue, group, excl=False): if fattr == 'U': self.map = uid_groups elif fattr == 'T': self.map = type_groups elif fattr == '0': self.map = {} else: raise ValueError('Not a valid attribute specifier: %r'%(fattr,)) self.value = fvalue if group is not None and group not in groups: raise ValueError('Not a present group: %r'%(group,)) self.group = group self.excl = excl @classmethod def Parse(cls, s): fspecs, _, grpspecs = map(lambda x: x.strip(), s.partition('=')) fpairs = [] ret = [] for fspec in [i.strip() for i in fspecs.split(',')]: fattr, _, fvalue = map(lambda x: x.strip(), fspec.partition(':')) fpairs.append((fattr, fvalue)) for part in [i.strip() for i in grpspecs.split(',')]: for fattr, fvalue in fpairs: if part[0] == '+': ret.append(Route(fattr, fvalue, part[1:], False)) elif part[0] == '-': ret.append(Route(fattr, fvalue, part[1:], True)) elif part[0] == '0': ret.append(Route(fattr, fvalue, None, True)) else: raise ValueError('Not an exclusivity: %r'%(part[0],)) return ret def Apply(self, cli): return cli[:2] in self.map.get(self.value, []) def __repr__(self): return ''%(self.group, ('U' if self.map is uid_groups else 'T'), self.value) class RouteSet(object): def __init__(self, clis=None): if clis is None: clis = targets[:] self.clients = clis self.routes = [] def Route(self, stream): testset = self.clients[:] grp = stream.get('group', 'ALL') if options.verbose: print 'Routing', grp, '...' excl = False for route in self.routes: if route.group == grp: if options.verbose: print '\tMatches route', route excl = excl or route.excl matches = filter(lambda x, route=route: route.Apply(x), testset) if matches: if options.verbose: print '\tUsing client', matches[0] self.clients.remove(matches[0]) return matches[0] if options.verbose: print '\tNo matches, moving on...' if route.group is None: if options.verbose: print 'Encountered NULL route, removing from search space...' toremove = [] for cli in testset: if route.Apply(cli): toremove.append(cli) for cli in toremove: if options.verbose: print '\tRemoving', cli, '...' testset.remove(cli) if excl: if options.verbose: print '\tExclusively routed, no route matched.' return None if not testset: if options.verbose: print '\tOut of clients, no route matched.' return None cli = testset[0] self.clients.remove(cli) if options.verbose: print '\tDefault route to', cli return cli routeset = RouteSet() for rspec in options.routes: try: routeset.routes.extend(Route.Parse(rspec)) except Exception: import traceback traceback.print_exc() if options.verbose: print 'All routes:' for route in routeset.routes: print route class NSThread(threading.Thread): def drop_missed(self): nsq, cl = self._Thread__args cnt = 0 while nsq and float(nsq[0].get('time'))*factor < time.time() - BASETIME: nsq.pop(0) cnt += 1 if options.verbose: print self, 'dropped', cnt, 'notes due to miss' self._Thread__args = (nsq, cl) def wait_for(self, t): if t <= 0: return time.sleep(t) def run(self): nsq, cls = self._Thread__args for note in nsq: ttime = float(note.get('time')) pitch = float(note.get('pitch')) + options.transpose ampl = float(note.get('ampl', float(note.get('vel', 127.0)) / 127.0)) dur = factor*float(note.get('dur')) while time.time() - BASETIME < factor*ttime: self.wait_for(factor*ttime - (time.time() - BASETIME)) if options.dry: cl = self.ident # XXX hack else: for cl in cls: s.sendto(str(Packet(CMD.PLAY, int(dur), int((dur*1000000)%1000000), int(440.0 * 2**((pitch-69)/12.0)), ampl * options.volume, cl[2])), cl[:2]) if options.verbose: print (time.time() - BASETIME), cl, ': PLAY', pitch, dur, vel playing_notes[cl] = (pitch, ampl) self.wait_for(dur - ((time.time() - BASETIME) - factor*ttime)) playing_notes[cl] = (0, 0) if options.verbose: print '% 6.5f'%(time.time() - BASETIME,), cl, ': DONE' threads = {} if options.dry: for ns in notestreams: nsq = ns.findall('note') threads[ns] = NSThread(args=(nsq, set())) targets = threads.values() # XXX hack else: nscycle = itertools.cycle(notestreams) for idx, ns in zip(xrange(number), nscycle): cli = routeset.Route(ns) if cli: nsq = ns.findall('note') if ns in threads: threads[ns]._Thread__args[1].add(cli) else: threads[ns] = NSThread(args=(nsq, set([cli]))) if options.verbose: print 'Playback threads:' for thr in threads.values(): print thr._Thread__args[1] BASETIME = time.time() - (options.seek*factor) if options.seek > 0: for thr in threads.values(): thr.drop_missed() for thr in threads.values(): thr.start() for thr in threads.values(): thr.join() print fname, ': Done!'