diff options
author | Graham Northup <grissess@nexusg.org> | 2018-04-23 09:25:16 -0400 |
---|---|---|
committer | Graham Northup <grissess@nexusg.org> | 2018-04-23 09:25:16 -0400 |
commit | 6cba85974d3b86eb33beac096922d33f05d9434b (patch) | |
tree | dea6bfe55d31f45e1b7650a7d84103773b27d735 /mkiv.py | |
parent | f5b2fde3224de430f0342c1da4f98c028ba11c94 (diff) | |
parent | c8efa1318f4d924b86c410e576c246d1e23839fb (diff) |
Merge branch 'beta'
Diffstat (limited to 'mkiv.py')
-rw-r--r-- | mkiv.py | 347 |
1 files changed, 308 insertions, 39 deletions
@@ -4,10 +4,6 @@ mkiv -- Make Intervals This simple script (using python-midi) reads a MIDI file and makes an interval (.iv) file (actually XML) that contains non-overlapping notes. - -TODO: --MIDI Control events --Percussion ''' import xml.etree.ElementTree as ET @@ -15,6 +11,7 @@ import midi import sys import os import optparse +import math TRACKS = object() PROGRAMS = object() @@ -32,8 +29,24 @@ parser.add_option('-f', '--fuckit', dest='fuckit', action='store_true', help='Us parser.add_option('-v', '--verbose', dest='verbose', action='store_true', help='Be verbose; show important parts about the MIDI scheduling process') parser.add_option('-d', '--debug', dest='debug', action='store_true', help='Debugging output; show excessive output about the MIDI scheduling process (please use less or write to a file)') parser.add_option('-D', '--deviation', dest='deviation', type='int', help='Amount (in semitones/MIDI pitch units) by which a fully deflected pitchbend modifies the base pitch (0 disables pitchbend processing)') +parser.add_option('-M', '--modwheel-freq-dev', dest='modfdev', type='float', help='Amount (in semitones/MIDI pitch unites) by which a fully-activated modwheel modifies the base pitch') +parser.add_option('--modwheel-freq-freq', dest='modffreq', type='float', help='Frequency of modulation periods (sinusoids) of the modwheel acting on the base pitch') +parser.add_option('--modwheel-amp-dev', dest='modadev', type='float', help='Deviation [0, 1] by which a fully-activated modwheel affects the amplitude as a factor of that amplitude') +parser.add_option('--modwheel-amp-freq', dest='modafreq', type='float', help='Frequency of modulation periods (sinusoids) of the modwheel acting on amplitude') +parser.add_option('--modwheel-res', dest='modres', type='float', help='(Fractional) seconds by which to resolve modwheel events (0 to disable)') +parser.add_option('--modwheel-continuous', dest='modcont', action='store_true', help='Keep phase continuous in global time (don\'t reset to 0 for each note)') +parser.add_option('--string-res', dest='stringres', type='float', help='(Fractional) seconds by which to resolve string models (0 to disable)') +parser.add_option('--string-max', dest='stringmax', type='int', help='Maximum number of events to generate per single input event') +parser.add_option('--string-rate-on', dest='stringonrate', type='float', help='Rate (amplitude / sec) by which to exponentially decay in the string model while a note is active') +parser.add_option('--string-rate-off', dest='stringoffrate', type='float', help='Rate (amplitude / sec) by which to exponentially decay in the string model after a note ends') +parser.add_option('--string-threshold', dest='stringthres', type='float', help='Amplitude (as fraction of original) at which point the string model event is terminated') parser.add_option('--tempo', dest='tempo', help='Adjust interpretation of tempo (try "f1"/"global", "f2"/"track")') -parser.set_defaults(tracks=[], perc='GM', deviation=2, tempo='global') +parser.add_option('--epsilon', dest='epsilon', type='float', help='Don\'t consider overlaps smaller than this number of seconds (which regularly happen due to precision loss)') +parser.add_option('--slack', dest='slack', type='float', help='Inflate the duration of events by this much when scheduling them--this is for clients which need time to release their streams') +parser.add_option('--vol-pow', dest='vol_pow', type='float', help='Exponent to raise volume changes (adjusts energy per delta volume)') +parser.add_option('-0', '--keep-empty', dest='keepempty', action='store_true', help='Keep (do not cull) events with 0 duration in the output file') +parser.add_option('--no-text', dest='no_text', action='store_true', help='Disable text streams (useful for unusual text encodings)') +parser.set_defaults(tracks=[], perc='GM', deviation=2, tempo='global', modres=0.005, modfdev=2.0, modffreq=8.0, modadev=0.5, modafreq=8.0, stringres=0, stringmax=1024, stringrateon=0.7, stringrateoff=0.4, stringthres=0.02, epsilon=1e-12, slack=0.0, vol_pow=2) options, args = parser.parse_args() if options.tempo == 'f1': options.tempo == 'global' @@ -49,6 +62,7 @@ The "ev" object will be a MergeEvent with the following properties: -ev.abstime: the real time in seconds of this event relative to the beginning of playback -ev.bank: the selected bank (all bits) -ev.prog: the selected program +-ev.mw: the modwheel value -ev.ev: a midi.NoteOnEvent: -ev.ev.pitch: the MIDI pitch -ev.ev.velocity: the MIDI velocity @@ -214,25 +228,31 @@ for fname in args: return rt class MergeEvent(object): - __slots__ = ['ev', 'tidx', 'abstime', 'bank', 'prog'] - def __init__(self, ev, tidx, abstime, bank, prog): + __slots__ = ['ev', 'tidx', 'abstime', 'bank', 'prog', 'mw'] + def __init__(self, ev, tidx, abstime, bank=0, prog=0, mw=0): self.ev = ev self.tidx = tidx self.abstime = abstime self.bank = bank self.prog = prog + self.mw = mw def copy(self, **kwargs): - args = {'ev': self.ev, 'tidx': self.tidx, 'abstime': self.abstime, 'bank': self.bank, 'prog': self.prog} + args = {'ev': self.ev, 'tidx': self.tidx, 'abstime': self.abstime, 'bank': self.bank, 'prog': self.prog, 'mw': self.mw} args.update(kwargs) return MergeEvent(**args) def __repr__(self): - return '<ME %r in %d on (%d:%d) @%f>'%(self.ev, self.tidx, self.bank, self.prog, self.abstime) + return '<ME %r in %d on (%d:%d) MW:%d @%f>'%(self.ev, self.tidx, self.bank, self.prog, self.mw, self.abstime) + + vol_at = [[{0: 0x3FFF} for i in range(16)] for j in range(len(pat))] events = [] + cur_mw = [[0 for i in range(16)] for j in range(len(pat))] cur_bank = [[0 for i in range(16)] for j in range(len(pat))] cur_prog = [[0 for i in range(16)] for j in range(len(pat))] + chg_mw = [[0 for i in range(16)] for j in range(len(pat))] chg_bank = [[0 for i in range(16)] for j in range(len(pat))] chg_prog = [[0 for i in range(16)] for j in range(len(pat))] + chg_vol = [[0 for i in range(16)] for j in range(len(pat))] ev_cnts = [[0 for i in range(16)] for j in range(len(pat))] tnames = [''] * len(pat) progs = set([0]) @@ -253,25 +273,40 @@ for fname in args: progs.add(ev.value) chg_prog[tidx][ev.channel] += 1 elif isinstance(ev, midi.ControlChangeEvent): - if ev.control == 0: - cur_bank[tidx][ev.channel] = (0x3F80 & cur_bank[tidx][ev.channel]) | ev.value - chg_bank[tidx][ev.channel] += 1 - elif ev.control == 32: + if ev.control == 0: # Bank -- MSB cur_bank[tidx][ev.channel] = (0x3F & cur_bank[tidx][ev.channel]) | (ev.value << 7) chg_bank[tidx][ev.channel] += 1 + elif ev.control == 32: # Bank -- LSB + cur_bank[tidx][ev.channel] = (0x3F80 & cur_bank[tidx][ev.channel]) | ev.value + chg_bank[tidx][ev.channel] += 1 + elif ev.control == 1: # ModWheel -- MSB + cur_mw[tidx][ev.channel] = (0x3F & cur_mw[tidx][ev.channel]) | (ev.value << 7) + chg_mw[tidx][ev.channel] += 1 + elif ev.control == 33: # ModWheel -- LSB + cur_mw[tidx][ev.channel] = (0x3F80 & cur_mw[tidx][ev.channel]) | ev.value + chg_mw[tidx][ev.channel] += 1 + elif ev.control == 7: # Volume -- MSB + lvtime, lvol = sorted(vol_at[tidx][ev.channel].items(), key = lambda pair: pair[0])[-1] + vol_at[tidx][ev.channel][abstime] = (0x3F & lvol) | (ev.value << 7) + chg_vol[tidx][ev.channel] += 1 + elif ev.control == 39: # Volume -- LSB + lvtime, lvol = sorted(vol_at[tidx][ev.channel].items(), key = lambda pair: pair[0])[-1] + vol_at[tidx][ev.channel][abstime] = (0x3F80 & lvol) | ev.value + chg_vol[tidx][ev.channel] += 1 + events.append(MergeEvent(ev, tidx, abstime, cur_bank[tidx][ev.channel], cur_prog[tidx][ev.channel], cur_mw[tidx][ev.channel])) + ev_cnts[tidx][ev.channel] += 1 elif isinstance(ev, midi.MetaEventWithText): - events.append(MergeEvent(ev, tidx, abstime, 0, 0)) + events.append(MergeEvent(ev, tidx, abstime)) elif isinstance(ev, midi.Event): if isinstance(ev, midi.NoteOnEvent) and ev.velocity == 0: ev.__class__ = midi.NoteOffEvent #XXX Oww - events.append(MergeEvent(ev, tidx, abstime, cur_bank[tidx][ev.channel], cur_prog[tidx][ev.channel])) + events.append(MergeEvent(ev, tidx, abstime, cur_bank[tidx][ev.channel], cur_prog[tidx][ev.channel], cur_mw[tidx][ev.channel])) ev_cnts[tidx][ev.channel] += 1 - if options.verbose: - print 'Track name, event count, final banks, bank changes, final programs, program changes:' - for tidx, tname in enumerate(tnames): - print tidx, ':', tname, ',', ','.join(map(str, ev_cnts[tidx])), ',', ','.join(map(str, cur_bank[tidx])), ',', ','.join(map(str, chg_bank[tidx])), ',', ','.join(map(str, cur_prog[tidx])), ',', ','.join(map(str, chg_prog[tidx])) - print 'All programs observed:', progs + print 'Track name, event count, final banks, bank changes, final programs, program changes, final modwheel, modwheel changes, volume changes:' + for tidx, tname in enumerate(tnames): + print tidx, ':', tname, ',', ','.join(map(str, ev_cnts[tidx])), ',', ','.join(map(str, cur_bank[tidx])), ',', ','.join(map(str, chg_bank[tidx])), ',', ','.join(map(str, cur_prog[tidx])), ',', ','.join(map(str, chg_prog[tidx])), ',', ','.join(map(str, cur_mw[tidx])), ',', ','.join(map(str, chg_mw[tidx])), ',', ','.join(map(str, chg_vol[tidx])) + print 'All programs observed:', progs print 'Sorting events...' @@ -281,29 +316,39 @@ for fname in args: print 'Generating streams...' class DurationEvent(MergeEvent): - __slots__ = ['duration', 'pitch'] - def __init__(self, me, pitch, dur): - MergeEvent.__init__(self, me.ev, me.tidx, me.abstime, me.bank, me.prog) + __slots__ = ['duration', 'real_duration', 'pitch', 'modwheel', 'ampl'] + def __init__(self, me, pitch, ampl, dur, modwheel=0): + MergeEvent.__init__(self, me.ev, me.tidx, me.abstime, me.bank, me.prog, me.mw) self.pitch = pitch + self.ampl = ampl self.duration = dur + self.real_duration = dur + self.modwheel = modwheel + + def __repr__(self): + return '<NE %s P:%f A:%f D:%f W:%f>'%(MergeEvent.__repr__(self), self.pitch, self.ampl, self.duration, self.modwheel) class NoteStream(object): - __slots__ = ['history', 'active', 'realpitch'] + __slots__ = ['history', 'active', 'bentpitch', 'modwheel'] def __init__(self): self.history = [] self.active = None - self.realpitch = None + self.bentpitch = None + self.modwheel = 0 def IsActive(self): return self.active is not None - def Activate(self, mev, realpitch = None): - if realpitch is None: - realpitch = mev.ev.pitch + def Activate(self, mev, bentpitch=None, modwheel=None): + if bentpitch is None: + bentpitch = mev.ev.pitch self.active = mev - self.realpitch = realpitch + self.bentpitch = bentpitch + if modwheel is not None: + self.modwheel = modwheel def Deactivate(self, mev): - self.history.append(DurationEvent(self.active, self.realpitch, mev.abstime - self.active.abstime)) + self.history.append(DurationEvent(self.active, self.bentpitch, self.active.ev.velocity / 127.0, mev.abstime - self.active.abstime, self.modwheel)) self.active = None - self.realpitch = None + self.bentpitch = None + self.modwheel = 0 def WouldDeactivate(self, mev): if not self.IsActive(): return False @@ -311,6 +356,8 @@ for fname in args: return mev.ev.pitch == self.active.ev.pitch and mev.tidx == self.active.tidx and mev.ev.channel == self.active.ev.channel if isinstance(mev.ev, midi.PitchWheelEvent): return mev.tidx == self.active.tidx and mev.ev.channel == self.active.ev.channel + if isinstance(mev.ev, midi.ControlChangeEvent): + return mev.tidx == self.active.tidx and mev.ev.channel == self.active.ev.channel raise TypeError('Tried to deactivate with bad type %r'%(type(mev.ev),)) class NSGroup(object): @@ -409,6 +456,23 @@ for fname in args: print ' Group %r:'%(group.name,) for stream in group.streams: print ' Stream: %r'%(stream.active,) + elif options.modres > 0 and isinstance(mev.ev, midi.ControlChangeEvent): + found = False + for group in notegroups: + for stream in group.streams: + if stream.WouldDeactivate(mev): + base = stream.active.copy(abstime=mev.abstime) + stream.Deactivate(mev) + stream.Activate(base, stream.bentpitch, mev.mw) + found = True + if not found: + print 'WARNING: Did not find any matching active streams for %r'%(mev,) + if options.verbose: + print ' Current state:' + for group in notegroups: + print ' Group %r:'%(group.name,) + for stream in group.streams: + print ' Stream: %r'%(stream.active,) else: auxstream.append(mev) @@ -418,13 +482,207 @@ for fname in args: for ns in group.streams: if ns.IsActive(): print 'WARNING: Active notes at end of playback.' - ns.Deactivate(MergeEvent(ns.active, ns.active.tidx, lastabstime, 0, 0)) + ns.Deactivate(MergeEvent(ns.active, ns.active.tidx, lastabstime)) + + if options.slack > 0: + print 'Adding slack time...' + + slack_evs = [] + for group in notegroups: + for ns in group.streams: + for dev in ns.history: + dev.duration += options.slack + slack_evs.append(dev) + + print 'Resorting all streams...' + for group in notegroups: + group.streams = [] + + for dev in slack_evs: + for group in notegroups: + if not group.filter(dev): + continue + for ns in group.streams: + if dev.abstime >= ns.history[-1].abstime + ns.history[-1].duration: + ns.history.append(dev) + break + else: + group.streams.append(NoteStream()) + group.streams[-1].history.append(dev) + break + else: + print 'WARNING: No stream accepts event', dev + + if options.modres > 0: + print 'Resolving modwheel events...' + ev_cnt = 0 + for group in notegroups: + for ns in group.streams: + i = 0 + while i < len(ns.history): + dev = ns.history[i] + if dev.modwheel > 0: + realpitch = dev.pitch + realamp = dev.ampl + mwamp = float(dev.modwheel) / 0x3FFF + dt = 0.0 + origtime = dev.abstime + events = [] + while dt < dev.duration: + dev.abstime = origtime + dt + if options.modcont: + t = origtime + else: + t = dt + events.append(DurationEvent(dev, realpitch + mwamp * options.modfdev * math.sin(2 * math.pi * options.modffreq * t), realamp + mwamp * options.modadev * (math.sin(2 * math.pi * options.modafreq * t) - 1.0) / 2.0, min(options.modres, dev.duration - dt), dev.modwheel)) + dt += options.modres + ns.history[i:i+1] = events + i += len(events) + ev_cnt += len(events) + if options.verbose: + print 'Event', i, 'note', dev, 'in group', group.name, 'resolved to', len(events), 'events' + if options.debug: + for ev in events: + print '\t', ev + else: + i += 1 + print '...resolved', ev_cnt, 'events' + + if options.stringres: + print 'Resolving string models...' + st_cnt = sum(sum(len(ns.history) for ns in group.streams) for group in notegroups) + in_cnt = 0 + ex_cnt = 0 + ev_cnt = 0 + dev_grps = [] + for group in notegroups: + for ns in group.streams: + i = 0 + while i < len(ns.history): + dev = ns.history[i] + ntime = float('inf') + if i + 1 < len(ns.history): + ntime = ns.history[i+1].abstime + dt = 0.0 + ampf = 1.0 + origtime = dev.abstime + events = [] + while dt < dev.duration and ampf * dev.ampl >= options.stringthres: + dev.abstime = origtime + dt + events.append(DurationEvent(dev, dev.pitch, ampf * dev.ampl, min(options.stringres, dev.duration - dt), dev.modwheel)) + if len(events) > options.stringmax: + print 'WARNING: Exceeded maximum string model events for event', i + if options.verbose: + print 'Final ampf', ampf, 'dt', dt + break + ampf *= options.stringrateon ** options.stringres + dt += options.stringres + in_cnt += 1 + dt = dev.duration + while ampf * dev.ampl >= options.stringthres: + dev.abstime = origtime + dt + events.append(DurationEvent(dev, dev.pitch, ampf * dev.ampl, options.stringres, dev.modwheel)) + if len(events) > options.stringmax: + print 'WARNING: Exceeded maximum string model events for event', i + if options.verbose: + print 'Final ampf', ampf, 'dt', dt + break + ampf *= options.stringrateoff ** options.stringres + dt += options.stringres + ex_cnt += 1 + if events: + for j in xrange(len(events) - 1): + cur, next = events[j], events[j + 1] + if abs(cur.abstime + cur.duration - next.abstime) > options.epsilon: + print 'WARNING: String model events cur: ', cur, 'next:', next, 'have gap/overrun of', next.abstime - (cur.abstime + cur.duration) + dev_grps.append(events) + else: + print 'WARNING: Event', i, 'note', dev, ': No events?' + if options.verbose: + print 'Event', i, 'note', dev, 'in group', group.name, 'resolved to', len(events), 'events' + if options.debug: + for ev in events: + print '\t', ev + i += 1 + ev_cnt += len(events) + print '...resolved', ev_cnt, 'events (+', ev_cnt - st_cnt, ',', in_cnt, 'inside', ex_cnt, 'extra), resorting streams...' + for group in notegroups: + group.streams = [] + + dev_grps.sort(key = lambda evg: evg[0].abstime) + for devgr in dev_grps: + dev = devgr[0] + for group in notegroups: + if group.filter(dev): + grp = group + break + else: + grp = NSGroup() + notegroups.append(grp) + for ns in grp.streams: + if not ns.history: + ns.history.extend(devgr) + break + last = ns.history[-1] + if dev.abstime >= last.abstime + last.duration - 1e-3: + ns.history.extend(devgr) + break + else: + ns = NoteStream() + grp.streams.append(ns) + ns.history.extend(devgr) + scnt = 0 + for group in notegroups: + for ns in group.streams: + scnt += 1 + print 'Final sort:', len(notegroups), 'groups with', scnt, 'streams' + + if not options.keepempty: + print 'Culling empty events...' + ev_cnt = 0 + for group in notegroups: + for ns in group.streams: + i = 0 + while i < len(ns.history): + if ns.history[i].duration == 0.0: + del ns.history[i] + ev_cnt += 1 + else: + i += 1 + print '...culled', ev_cnt, 'events' if options.verbose: print 'Final group mappings:' for group in notegroups: print ('<anonymous>' if group.name is None else group.name), '<=', '(', len(group.streams), 'streams)' + print 'Final volume resolution...' + for group in notegroups: + for ns in group.streams: + for ev in ns.history: + t, vol = sorted(filter(lambda pair: pair[0] <= ev.abstime, vol_at[ev.tidx][ev.ev.channel].items()), key=lambda pair: pair[0])[-1] + ev.ampl *= (float(vol) / 0x3FFF) ** options.vol_pow + + print 'Checking consistency...' + for group in notegroups: + if options.verbose: + print 'Group', '<None>' if group.name is None else group.name, 'with', len(group.streams), 'streams...', + ecnt = 0 + for ns in group.streams: + for i in xrange(len(ns.history) - 1): + cur, next = ns.history[i], ns.history[i + 1] + if cur.abstime + cur.duration > next.abstime + options.epsilon: + print 'WARNING: event', i, 'collides with next event (@', cur.abstime, '+', cur.duration, 'next @', next.abstime, ';', next.abstime - (cur.abstime + cur.duration), 'overlap)' + ecnt += 1 + if cur.abstime > next.abstime: + print 'WARNING: event', i + 1, 'out of sort order (@', cur.abstime, 'next @', next.abstime, ';', cur.abstime - next.abstime, 'underlap)' + ecnt += 1 + if options.verbose: + if ecnt > 0: + print '...', ecnt, 'errors occured' + else: + print 'ok' + print 'Generated %d streams in %d groups'%(sum(map(lambda x: len(x.streams), notegroups)), len(notegroups)) print 'Playtime:', lastabstime, 'seconds' @@ -455,13 +713,20 @@ for fname in args: for note in ns.history: ivnote = ET.SubElement(ivns, 'note') ivnote.set('pitch', str(note.pitch)) - ivnote.set('vel', str(note.ev.velocity)) + ivnote.set('vel', str(int(note.ampl * 127.0))) + ivnote.set('ampl', str(note.ampl)) ivnote.set('time', str(note.abstime)) - ivnote.set('dur', str(note.duration)) - - ivtext = ET.SubElement(ivstreams, 'stream', type='text') - for tev in textstream: - ivev = ET.SubElement(ivtext, 'text', time=str(tev.abstime), type=type(tev.ev).__name__, text=tev.ev.text) + ivnote.set('dur', str(note.real_duration)) + + if not options.no_text: + ivtext = ET.SubElement(ivstreams, 'stream', type='text') + for tev in textstream: + text = tev.ev.text + try: + text = text.decode('utf8') + except UnicodeDecodeError: + text = 'base64:' + text.encode('base64') + ivev = ET.SubElement(ivtext, 'text', time=str(tev.abstime), type=type(tev.ev).__name__, text=text) ivaux = ET.SubElement(ivstreams, 'stream') ivaux.set('type', 'aux') @@ -474,5 +739,9 @@ for fname in args: ivev.set('time', str(mev.abstime)) ivev.set('data', repr(fw.encode_midi_event(mev.ev))) + ivargs = ET.SubElement(ivmeta, 'args') + ivargs.text = ' '.join('%r' % (i,) for i in sys.argv[1:]) + print 'Done.' - open(os.path.splitext(os.path.basename(fname))[0]+'.iv', 'w').write(ET.tostring(iv)) + txt = ET.tostring(iv, 'UTF-8') + open(os.path.splitext(os.path.basename(fname))[0]+'.iv', 'wb').write(txt) |