aboutsummaryrefslogtreecommitdiff
path: root/mkiv.py
diff options
context:
space:
mode:
Diffstat (limited to 'mkiv.py')
-rw-r--r--mkiv.py347
1 files changed, 308 insertions, 39 deletions
diff --git a/mkiv.py b/mkiv.py
index 717220c..0c87372 100644
--- a/mkiv.py
+++ b/mkiv.py
@@ -4,10 +4,6 @@ mkiv -- Make Intervals
This simple script (using python-midi) reads a MIDI file and makes an interval
(.iv) file (actually XML) that contains non-overlapping notes.
-
-TODO:
--MIDI Control events
--Percussion
'''
import xml.etree.ElementTree as ET
@@ -15,6 +11,7 @@ import midi
import sys
import os
import optparse
+import math
TRACKS = object()
PROGRAMS = object()
@@ -32,8 +29,24 @@ parser.add_option('-f', '--fuckit', dest='fuckit', action='store_true', help='Us
parser.add_option('-v', '--verbose', dest='verbose', action='store_true', help='Be verbose; show important parts about the MIDI scheduling process')
parser.add_option('-d', '--debug', dest='debug', action='store_true', help='Debugging output; show excessive output about the MIDI scheduling process (please use less or write to a file)')
parser.add_option('-D', '--deviation', dest='deviation', type='int', help='Amount (in semitones/MIDI pitch units) by which a fully deflected pitchbend modifies the base pitch (0 disables pitchbend processing)')
+parser.add_option('-M', '--modwheel-freq-dev', dest='modfdev', type='float', help='Amount (in semitones/MIDI pitch unites) by which a fully-activated modwheel modifies the base pitch')
+parser.add_option('--modwheel-freq-freq', dest='modffreq', type='float', help='Frequency of modulation periods (sinusoids) of the modwheel acting on the base pitch')
+parser.add_option('--modwheel-amp-dev', dest='modadev', type='float', help='Deviation [0, 1] by which a fully-activated modwheel affects the amplitude as a factor of that amplitude')
+parser.add_option('--modwheel-amp-freq', dest='modafreq', type='float', help='Frequency of modulation periods (sinusoids) of the modwheel acting on amplitude')
+parser.add_option('--modwheel-res', dest='modres', type='float', help='(Fractional) seconds by which to resolve modwheel events (0 to disable)')
+parser.add_option('--modwheel-continuous', dest='modcont', action='store_true', help='Keep phase continuous in global time (don\'t reset to 0 for each note)')
+parser.add_option('--string-res', dest='stringres', type='float', help='(Fractional) seconds by which to resolve string models (0 to disable)')
+parser.add_option('--string-max', dest='stringmax', type='int', help='Maximum number of events to generate per single input event')
+parser.add_option('--string-rate-on', dest='stringonrate', type='float', help='Rate (amplitude / sec) by which to exponentially decay in the string model while a note is active')
+parser.add_option('--string-rate-off', dest='stringoffrate', type='float', help='Rate (amplitude / sec) by which to exponentially decay in the string model after a note ends')
+parser.add_option('--string-threshold', dest='stringthres', type='float', help='Amplitude (as fraction of original) at which point the string model event is terminated')
parser.add_option('--tempo', dest='tempo', help='Adjust interpretation of tempo (try "f1"/"global", "f2"/"track")')
-parser.set_defaults(tracks=[], perc='GM', deviation=2, tempo='global')
+parser.add_option('--epsilon', dest='epsilon', type='float', help='Don\'t consider overlaps smaller than this number of seconds (which regularly happen due to precision loss)')
+parser.add_option('--slack', dest='slack', type='float', help='Inflate the duration of events by this much when scheduling them--this is for clients which need time to release their streams')
+parser.add_option('--vol-pow', dest='vol_pow', type='float', help='Exponent to raise volume changes (adjusts energy per delta volume)')
+parser.add_option('-0', '--keep-empty', dest='keepempty', action='store_true', help='Keep (do not cull) events with 0 duration in the output file')
+parser.add_option('--no-text', dest='no_text', action='store_true', help='Disable text streams (useful for unusual text encodings)')
+parser.set_defaults(tracks=[], perc='GM', deviation=2, tempo='global', modres=0.005, modfdev=2.0, modffreq=8.0, modadev=0.5, modafreq=8.0, stringres=0, stringmax=1024, stringrateon=0.7, stringrateoff=0.4, stringthres=0.02, epsilon=1e-12, slack=0.0, vol_pow=2)
options, args = parser.parse_args()
if options.tempo == 'f1':
options.tempo == 'global'
@@ -49,6 +62,7 @@ The "ev" object will be a MergeEvent with the following properties:
-ev.abstime: the real time in seconds of this event relative to the beginning of playback
-ev.bank: the selected bank (all bits)
-ev.prog: the selected program
+-ev.mw: the modwheel value
-ev.ev: a midi.NoteOnEvent:
-ev.ev.pitch: the MIDI pitch
-ev.ev.velocity: the MIDI velocity
@@ -214,25 +228,31 @@ for fname in args:
return rt
class MergeEvent(object):
- __slots__ = ['ev', 'tidx', 'abstime', 'bank', 'prog']
- def __init__(self, ev, tidx, abstime, bank, prog):
+ __slots__ = ['ev', 'tidx', 'abstime', 'bank', 'prog', 'mw']
+ def __init__(self, ev, tidx, abstime, bank=0, prog=0, mw=0):
self.ev = ev
self.tidx = tidx
self.abstime = abstime
self.bank = bank
self.prog = prog
+ self.mw = mw
def copy(self, **kwargs):
- args = {'ev': self.ev, 'tidx': self.tidx, 'abstime': self.abstime, 'bank': self.bank, 'prog': self.prog}
+ args = {'ev': self.ev, 'tidx': self.tidx, 'abstime': self.abstime, 'bank': self.bank, 'prog': self.prog, 'mw': self.mw}
args.update(kwargs)
return MergeEvent(**args)
def __repr__(self):
- return '<ME %r in %d on (%d:%d) @%f>'%(self.ev, self.tidx, self.bank, self.prog, self.abstime)
+ return '<ME %r in %d on (%d:%d) MW:%d @%f>'%(self.ev, self.tidx, self.bank, self.prog, self.mw, self.abstime)
+
+ vol_at = [[{0: 0x3FFF} for i in range(16)] for j in range(len(pat))]
events = []
+ cur_mw = [[0 for i in range(16)] for j in range(len(pat))]
cur_bank = [[0 for i in range(16)] for j in range(len(pat))]
cur_prog = [[0 for i in range(16)] for j in range(len(pat))]
+ chg_mw = [[0 for i in range(16)] for j in range(len(pat))]
chg_bank = [[0 for i in range(16)] for j in range(len(pat))]
chg_prog = [[0 for i in range(16)] for j in range(len(pat))]
+ chg_vol = [[0 for i in range(16)] for j in range(len(pat))]
ev_cnts = [[0 for i in range(16)] for j in range(len(pat))]
tnames = [''] * len(pat)
progs = set([0])
@@ -253,25 +273,40 @@ for fname in args:
progs.add(ev.value)
chg_prog[tidx][ev.channel] += 1
elif isinstance(ev, midi.ControlChangeEvent):
- if ev.control == 0:
- cur_bank[tidx][ev.channel] = (0x3F80 & cur_bank[tidx][ev.channel]) | ev.value
- chg_bank[tidx][ev.channel] += 1
- elif ev.control == 32:
+ if ev.control == 0: # Bank -- MSB
cur_bank[tidx][ev.channel] = (0x3F & cur_bank[tidx][ev.channel]) | (ev.value << 7)
chg_bank[tidx][ev.channel] += 1
+ elif ev.control == 32: # Bank -- LSB
+ cur_bank[tidx][ev.channel] = (0x3F80 & cur_bank[tidx][ev.channel]) | ev.value
+ chg_bank[tidx][ev.channel] += 1
+ elif ev.control == 1: # ModWheel -- MSB
+ cur_mw[tidx][ev.channel] = (0x3F & cur_mw[tidx][ev.channel]) | (ev.value << 7)
+ chg_mw[tidx][ev.channel] += 1
+ elif ev.control == 33: # ModWheel -- LSB
+ cur_mw[tidx][ev.channel] = (0x3F80 & cur_mw[tidx][ev.channel]) | ev.value
+ chg_mw[tidx][ev.channel] += 1
+ elif ev.control == 7: # Volume -- MSB
+ lvtime, lvol = sorted(vol_at[tidx][ev.channel].items(), key = lambda pair: pair[0])[-1]
+ vol_at[tidx][ev.channel][abstime] = (0x3F & lvol) | (ev.value << 7)
+ chg_vol[tidx][ev.channel] += 1
+ elif ev.control == 39: # Volume -- LSB
+ lvtime, lvol = sorted(vol_at[tidx][ev.channel].items(), key = lambda pair: pair[0])[-1]
+ vol_at[tidx][ev.channel][abstime] = (0x3F80 & lvol) | ev.value
+ chg_vol[tidx][ev.channel] += 1
+ events.append(MergeEvent(ev, tidx, abstime, cur_bank[tidx][ev.channel], cur_prog[tidx][ev.channel], cur_mw[tidx][ev.channel]))
+ ev_cnts[tidx][ev.channel] += 1
elif isinstance(ev, midi.MetaEventWithText):
- events.append(MergeEvent(ev, tidx, abstime, 0, 0))
+ events.append(MergeEvent(ev, tidx, abstime))
elif isinstance(ev, midi.Event):
if isinstance(ev, midi.NoteOnEvent) and ev.velocity == 0:
ev.__class__ = midi.NoteOffEvent #XXX Oww
- events.append(MergeEvent(ev, tidx, abstime, cur_bank[tidx][ev.channel], cur_prog[tidx][ev.channel]))
+ events.append(MergeEvent(ev, tidx, abstime, cur_bank[tidx][ev.channel], cur_prog[tidx][ev.channel], cur_mw[tidx][ev.channel]))
ev_cnts[tidx][ev.channel] += 1
- if options.verbose:
- print 'Track name, event count, final banks, bank changes, final programs, program changes:'
- for tidx, tname in enumerate(tnames):
- print tidx, ':', tname, ',', ','.join(map(str, ev_cnts[tidx])), ',', ','.join(map(str, cur_bank[tidx])), ',', ','.join(map(str, chg_bank[tidx])), ',', ','.join(map(str, cur_prog[tidx])), ',', ','.join(map(str, chg_prog[tidx]))
- print 'All programs observed:', progs
+ print 'Track name, event count, final banks, bank changes, final programs, program changes, final modwheel, modwheel changes, volume changes:'
+ for tidx, tname in enumerate(tnames):
+ print tidx, ':', tname, ',', ','.join(map(str, ev_cnts[tidx])), ',', ','.join(map(str, cur_bank[tidx])), ',', ','.join(map(str, chg_bank[tidx])), ',', ','.join(map(str, cur_prog[tidx])), ',', ','.join(map(str, chg_prog[tidx])), ',', ','.join(map(str, cur_mw[tidx])), ',', ','.join(map(str, chg_mw[tidx])), ',', ','.join(map(str, chg_vol[tidx]))
+ print 'All programs observed:', progs
print 'Sorting events...'
@@ -281,29 +316,39 @@ for fname in args:
print 'Generating streams...'
class DurationEvent(MergeEvent):
- __slots__ = ['duration', 'pitch']
- def __init__(self, me, pitch, dur):
- MergeEvent.__init__(self, me.ev, me.tidx, me.abstime, me.bank, me.prog)
+ __slots__ = ['duration', 'real_duration', 'pitch', 'modwheel', 'ampl']
+ def __init__(self, me, pitch, ampl, dur, modwheel=0):
+ MergeEvent.__init__(self, me.ev, me.tidx, me.abstime, me.bank, me.prog, me.mw)
self.pitch = pitch
+ self.ampl = ampl
self.duration = dur
+ self.real_duration = dur
+ self.modwheel = modwheel
+
+ def __repr__(self):
+ return '<NE %s P:%f A:%f D:%f W:%f>'%(MergeEvent.__repr__(self), self.pitch, self.ampl, self.duration, self.modwheel)
class NoteStream(object):
- __slots__ = ['history', 'active', 'realpitch']
+ __slots__ = ['history', 'active', 'bentpitch', 'modwheel']
def __init__(self):
self.history = []
self.active = None
- self.realpitch = None
+ self.bentpitch = None
+ self.modwheel = 0
def IsActive(self):
return self.active is not None
- def Activate(self, mev, realpitch = None):
- if realpitch is None:
- realpitch = mev.ev.pitch
+ def Activate(self, mev, bentpitch=None, modwheel=None):
+ if bentpitch is None:
+ bentpitch = mev.ev.pitch
self.active = mev
- self.realpitch = realpitch
+ self.bentpitch = bentpitch
+ if modwheel is not None:
+ self.modwheel = modwheel
def Deactivate(self, mev):
- self.history.append(DurationEvent(self.active, self.realpitch, mev.abstime - self.active.abstime))
+ self.history.append(DurationEvent(self.active, self.bentpitch, self.active.ev.velocity / 127.0, mev.abstime - self.active.abstime, self.modwheel))
self.active = None
- self.realpitch = None
+ self.bentpitch = None
+ self.modwheel = 0
def WouldDeactivate(self, mev):
if not self.IsActive():
return False
@@ -311,6 +356,8 @@ for fname in args:
return mev.ev.pitch == self.active.ev.pitch and mev.tidx == self.active.tidx and mev.ev.channel == self.active.ev.channel
if isinstance(mev.ev, midi.PitchWheelEvent):
return mev.tidx == self.active.tidx and mev.ev.channel == self.active.ev.channel
+ if isinstance(mev.ev, midi.ControlChangeEvent):
+ return mev.tidx == self.active.tidx and mev.ev.channel == self.active.ev.channel
raise TypeError('Tried to deactivate with bad type %r'%(type(mev.ev),))
class NSGroup(object):
@@ -409,6 +456,23 @@ for fname in args:
print ' Group %r:'%(group.name,)
for stream in group.streams:
print ' Stream: %r'%(stream.active,)
+ elif options.modres > 0 and isinstance(mev.ev, midi.ControlChangeEvent):
+ found = False
+ for group in notegroups:
+ for stream in group.streams:
+ if stream.WouldDeactivate(mev):
+ base = stream.active.copy(abstime=mev.abstime)
+ stream.Deactivate(mev)
+ stream.Activate(base, stream.bentpitch, mev.mw)
+ found = True
+ if not found:
+ print 'WARNING: Did not find any matching active streams for %r'%(mev,)
+ if options.verbose:
+ print ' Current state:'
+ for group in notegroups:
+ print ' Group %r:'%(group.name,)
+ for stream in group.streams:
+ print ' Stream: %r'%(stream.active,)
else:
auxstream.append(mev)
@@ -418,13 +482,207 @@ for fname in args:
for ns in group.streams:
if ns.IsActive():
print 'WARNING: Active notes at end of playback.'
- ns.Deactivate(MergeEvent(ns.active, ns.active.tidx, lastabstime, 0, 0))
+ ns.Deactivate(MergeEvent(ns.active, ns.active.tidx, lastabstime))
+
+ if options.slack > 0:
+ print 'Adding slack time...'
+
+ slack_evs = []
+ for group in notegroups:
+ for ns in group.streams:
+ for dev in ns.history:
+ dev.duration += options.slack
+ slack_evs.append(dev)
+
+ print 'Resorting all streams...'
+ for group in notegroups:
+ group.streams = []
+
+ for dev in slack_evs:
+ for group in notegroups:
+ if not group.filter(dev):
+ continue
+ for ns in group.streams:
+ if dev.abstime >= ns.history[-1].abstime + ns.history[-1].duration:
+ ns.history.append(dev)
+ break
+ else:
+ group.streams.append(NoteStream())
+ group.streams[-1].history.append(dev)
+ break
+ else:
+ print 'WARNING: No stream accepts event', dev
+
+ if options.modres > 0:
+ print 'Resolving modwheel events...'
+ ev_cnt = 0
+ for group in notegroups:
+ for ns in group.streams:
+ i = 0
+ while i < len(ns.history):
+ dev = ns.history[i]
+ if dev.modwheel > 0:
+ realpitch = dev.pitch
+ realamp = dev.ampl
+ mwamp = float(dev.modwheel) / 0x3FFF
+ dt = 0.0
+ origtime = dev.abstime
+ events = []
+ while dt < dev.duration:
+ dev.abstime = origtime + dt
+ if options.modcont:
+ t = origtime
+ else:
+ t = dt
+ events.append(DurationEvent(dev, realpitch + mwamp * options.modfdev * math.sin(2 * math.pi * options.modffreq * t), realamp + mwamp * options.modadev * (math.sin(2 * math.pi * options.modafreq * t) - 1.0) / 2.0, min(options.modres, dev.duration - dt), dev.modwheel))
+ dt += options.modres
+ ns.history[i:i+1] = events
+ i += len(events)
+ ev_cnt += len(events)
+ if options.verbose:
+ print 'Event', i, 'note', dev, 'in group', group.name, 'resolved to', len(events), 'events'
+ if options.debug:
+ for ev in events:
+ print '\t', ev
+ else:
+ i += 1
+ print '...resolved', ev_cnt, 'events'
+
+ if options.stringres:
+ print 'Resolving string models...'
+ st_cnt = sum(sum(len(ns.history) for ns in group.streams) for group in notegroups)
+ in_cnt = 0
+ ex_cnt = 0
+ ev_cnt = 0
+ dev_grps = []
+ for group in notegroups:
+ for ns in group.streams:
+ i = 0
+ while i < len(ns.history):
+ dev = ns.history[i]
+ ntime = float('inf')
+ if i + 1 < len(ns.history):
+ ntime = ns.history[i+1].abstime
+ dt = 0.0
+ ampf = 1.0
+ origtime = dev.abstime
+ events = []
+ while dt < dev.duration and ampf * dev.ampl >= options.stringthres:
+ dev.abstime = origtime + dt
+ events.append(DurationEvent(dev, dev.pitch, ampf * dev.ampl, min(options.stringres, dev.duration - dt), dev.modwheel))
+ if len(events) > options.stringmax:
+ print 'WARNING: Exceeded maximum string model events for event', i
+ if options.verbose:
+ print 'Final ampf', ampf, 'dt', dt
+ break
+ ampf *= options.stringrateon ** options.stringres
+ dt += options.stringres
+ in_cnt += 1
+ dt = dev.duration
+ while ampf * dev.ampl >= options.stringthres:
+ dev.abstime = origtime + dt
+ events.append(DurationEvent(dev, dev.pitch, ampf * dev.ampl, options.stringres, dev.modwheel))
+ if len(events) > options.stringmax:
+ print 'WARNING: Exceeded maximum string model events for event', i
+ if options.verbose:
+ print 'Final ampf', ampf, 'dt', dt
+ break
+ ampf *= options.stringrateoff ** options.stringres
+ dt += options.stringres
+ ex_cnt += 1
+ if events:
+ for j in xrange(len(events) - 1):
+ cur, next = events[j], events[j + 1]
+ if abs(cur.abstime + cur.duration - next.abstime) > options.epsilon:
+ print 'WARNING: String model events cur: ', cur, 'next:', next, 'have gap/overrun of', next.abstime - (cur.abstime + cur.duration)
+ dev_grps.append(events)
+ else:
+ print 'WARNING: Event', i, 'note', dev, ': No events?'
+ if options.verbose:
+ print 'Event', i, 'note', dev, 'in group', group.name, 'resolved to', len(events), 'events'
+ if options.debug:
+ for ev in events:
+ print '\t', ev
+ i += 1
+ ev_cnt += len(events)
+ print '...resolved', ev_cnt, 'events (+', ev_cnt - st_cnt, ',', in_cnt, 'inside', ex_cnt, 'extra), resorting streams...'
+ for group in notegroups:
+ group.streams = []
+
+ dev_grps.sort(key = lambda evg: evg[0].abstime)
+ for devgr in dev_grps:
+ dev = devgr[0]
+ for group in notegroups:
+ if group.filter(dev):
+ grp = group
+ break
+ else:
+ grp = NSGroup()
+ notegroups.append(grp)
+ for ns in grp.streams:
+ if not ns.history:
+ ns.history.extend(devgr)
+ break
+ last = ns.history[-1]
+ if dev.abstime >= last.abstime + last.duration - 1e-3:
+ ns.history.extend(devgr)
+ break
+ else:
+ ns = NoteStream()
+ grp.streams.append(ns)
+ ns.history.extend(devgr)
+ scnt = 0
+ for group in notegroups:
+ for ns in group.streams:
+ scnt += 1
+ print 'Final sort:', len(notegroups), 'groups with', scnt, 'streams'
+
+ if not options.keepempty:
+ print 'Culling empty events...'
+ ev_cnt = 0
+ for group in notegroups:
+ for ns in group.streams:
+ i = 0
+ while i < len(ns.history):
+ if ns.history[i].duration == 0.0:
+ del ns.history[i]
+ ev_cnt += 1
+ else:
+ i += 1
+ print '...culled', ev_cnt, 'events'
if options.verbose:
print 'Final group mappings:'
for group in notegroups:
print ('<anonymous>' if group.name is None else group.name), '<=', '(', len(group.streams), 'streams)'
+ print 'Final volume resolution...'
+ for group in notegroups:
+ for ns in group.streams:
+ for ev in ns.history:
+ t, vol = sorted(filter(lambda pair: pair[0] <= ev.abstime, vol_at[ev.tidx][ev.ev.channel].items()), key=lambda pair: pair[0])[-1]
+ ev.ampl *= (float(vol) / 0x3FFF) ** options.vol_pow
+
+ print 'Checking consistency...'
+ for group in notegroups:
+ if options.verbose:
+ print 'Group', '<None>' if group.name is None else group.name, 'with', len(group.streams), 'streams...',
+ ecnt = 0
+ for ns in group.streams:
+ for i in xrange(len(ns.history) - 1):
+ cur, next = ns.history[i], ns.history[i + 1]
+ if cur.abstime + cur.duration > next.abstime + options.epsilon:
+ print 'WARNING: event', i, 'collides with next event (@', cur.abstime, '+', cur.duration, 'next @', next.abstime, ';', next.abstime - (cur.abstime + cur.duration), 'overlap)'
+ ecnt += 1
+ if cur.abstime > next.abstime:
+ print 'WARNING: event', i + 1, 'out of sort order (@', cur.abstime, 'next @', next.abstime, ';', cur.abstime - next.abstime, 'underlap)'
+ ecnt += 1
+ if options.verbose:
+ if ecnt > 0:
+ print '...', ecnt, 'errors occured'
+ else:
+ print 'ok'
+
print 'Generated %d streams in %d groups'%(sum(map(lambda x: len(x.streams), notegroups)), len(notegroups))
print 'Playtime:', lastabstime, 'seconds'
@@ -455,13 +713,20 @@ for fname in args:
for note in ns.history:
ivnote = ET.SubElement(ivns, 'note')
ivnote.set('pitch', str(note.pitch))
- ivnote.set('vel', str(note.ev.velocity))
+ ivnote.set('vel', str(int(note.ampl * 127.0)))
+ ivnote.set('ampl', str(note.ampl))
ivnote.set('time', str(note.abstime))
- ivnote.set('dur', str(note.duration))
-
- ivtext = ET.SubElement(ivstreams, 'stream', type='text')
- for tev in textstream:
- ivev = ET.SubElement(ivtext, 'text', time=str(tev.abstime), type=type(tev.ev).__name__, text=tev.ev.text)
+ ivnote.set('dur', str(note.real_duration))
+
+ if not options.no_text:
+ ivtext = ET.SubElement(ivstreams, 'stream', type='text')
+ for tev in textstream:
+ text = tev.ev.text
+ try:
+ text = text.decode('utf8')
+ except UnicodeDecodeError:
+ text = 'base64:' + text.encode('base64')
+ ivev = ET.SubElement(ivtext, 'text', time=str(tev.abstime), type=type(tev.ev).__name__, text=text)
ivaux = ET.SubElement(ivstreams, 'stream')
ivaux.set('type', 'aux')
@@ -474,5 +739,9 @@ for fname in args:
ivev.set('time', str(mev.abstime))
ivev.set('data', repr(fw.encode_midi_event(mev.ev)))
+ ivargs = ET.SubElement(ivmeta, 'args')
+ ivargs.text = ' '.join('%r' % (i,) for i in sys.argv[1:])
+
print 'Done.'
- open(os.path.splitext(os.path.basename(fname))[0]+'.iv', 'w').write(ET.tostring(iv))
+ txt = ET.tostring(iv, 'UTF-8')
+ open(os.path.splitext(os.path.basename(fname))[0]+'.iv', 'wb').write(txt)