#! /usr/bin/python ### ### A simple program for doing blind A/B audio comparisons ### ### (c) 2010 Mark Wooding ### ###----- Licensing notice --------------------------------------------------- ### ### This program is free software; you can redistribute it and/or modify ### it under the terms of the GNU General Public License as published by ### the Free Software Foundation; either version 2 of the License, or ### (at your option) any later version. ### ### This program is distributed in the hope that it will be useful, ### but WITHOUT ANY WARRANTY; without even the implied warranty of ### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ### GNU General Public License for more details. ### ### You should have received a copy of the GNU General Public License ### along with this program; if not, write to the Free Software Foundation, ### Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. ###----- Usage -------------------------------------------------------------- ### ### The command line syntax is: ### ### ab-chop INPUT CAPS OUTPUT PIPELINE... ### ### This means that we should read INPUT, decode it (using a GStreamer ### `decodebin', so it should be able to handle most things you care to throw ### at it), and then re-encode it according to each PIPELINE in turn, decode ### /that/ again, and stash the resulting raw PCM data. When we've finished, ### we line up the PCM data streams side-by-side, chop them into chunks, and ### then stitch chunks from randomly chosen streams together to make a new ### PCM stream. Finally, we encode that mixed-up stream as FLAC, and write ### it to OUTPUT. It also writes a file OUTPUT.sequence which is a list of ### numbers indicating which pipeline each chunk of the original came from. ### ### The motivation is that we want to test encoder quality. So you take a ### reference source (as good as you can find), and use that as your INPUT. ### You then write GStreamer pipeline fragments for the encoders you want to ### compare; say `identity' if you want the unmodified original reference to ### be mixed in. ### ### The only tricky bit is the CAPS, which is a GStreamer capabilities string ### describing the raw PCM format to use as an intermediate representation. ### (This is far too low-level and cumbersome for real use, but it's OK for ### now.) You need to say something like ### ### audio/x-raw-int,width=16,rate=44100,channels=2,depth=16, ### endianness=1234,signed=true ### ### for standard CD audio. ###-------------------------------------------------------------------------- ### External dependencies. ## Standard Python libraries. import sys as SYS import os as OS import shutil as SH import fnmatch as FN import random as R SR = R.SystemRandom() ## GObject and GStreamer. import gobject as G import gst as GS ###-------------------------------------------------------------------------- ### GStreamer utilities. def link_on_demand(src, sink, sinkpad = None, cap = None): """ Link SINK to SRC when a pad appears. More precisely, when SRC reports that a pad with media type matching the `fnmatch' pattern CAP has appeared, link the pad of SINK named SINKPAD (or some sensible pad by default). """ def _link(src, srcpad): if cap is None or FN.fnmatchcase(srcpad.get_caps()[0].get_name(), cap): src.link_pads(srcpad.get_name(), sink, sinkpad) src.connect('pad-added', _link) def make_element(factory, name = None, **props): """ Return an element made by FACTORY with properties specified by PROPS. """ elt = GS.element_factory_make(factory, name) elt.set_properties(**props) return elt def dump_pipeline(pipe, indent = 0): done = {} q = [] for e in pipe.iterate_sources(): q = [e] while q: e, q = q[0], q[1:] if e in done: continue done[e] = True print print '%s%s %s' % (' '*indent, type(e).__name__, e.get_name()) for p in e.pads(): c = p.get_negotiated_caps() peer = p.get_peer() print '%s Pad %s %s (%s)' % \ (' '*(indent + 1), p.get_name(), peer and ('<-> %s.%s' % (peer.get_parent().get_name(), peer.get_name())) or 'unconnected', c and c.to_string() or 'no-negotiated-caps') if peer: q.append(peer.get_parent()) if isinstance(e, GS.Bin): dump_pipeline(e, indent + 1) def run_pipe(pipe, what): """ Run a GStreamer pipeline PIPE until it finishes. """ loop = G.MainLoop() bus = pipe.get_bus() bus.add_signal_watch() def _bus_message(bus, msg): if msg.type == GS.MESSAGE_ERROR: SYS.stderr.write('error from pipeline: %s\n' % msg) SYS.exit(1) elif msg.type == GS.MESSAGE_STATE_CHANGED and \ msg.src == pipe and \ msg.structure['new-state'] == GS.STATE_PAUSED: dump_pipeline(pipe) elif msg.type == GS.MESSAGE_EOS: loop.quit() bus.connect('message', _bus_message) pipe.set_state(GS.STATE_PLAYING) loop.run() GS.DEBUG_BIN_TO_DOT_FILE(pipe, 3, what) pipe.set_state(GS.STATE_NULL) ###-------------------------------------------------------------------------- ### Main program. ## Read the command line arguments. input = SYS.argv[1] caps = GS.caps_from_string(SYS.argv[2]) output = SYS.argv[3] ## We want a temporary place to keep things. This provokes a warning, but ## `mkdir' is atomic and sane so it's not a worry. tmp = OS.tmpnam() OS.mkdir(tmp) try: ## First step: produce raw PCM files from the original source and the ## requested encoders. q = 0 temps = [] for i in SYS.argv[4:]: temp = OS.path.join(tmp, '%d.raw' % q) temps.append(temp) pipe = GS.Pipeline() origin = make_element('filesrc', location = input) decode_1 = make_element('decodebin') convert_1 = make_element('audioconvert') encode = GS.parse_bin_from_description(i, True) decode_2 = make_element('decodebin') convert_2 = make_element('audioconvert') target = make_element('filesink', location = temp) pipe.add(origin, decode_1, convert_1, encode, decode_2, convert_2, target) origin.link(decode_1) link_on_demand(decode_1, convert_1) ##convert_1.link(encode, GS.caps_from_string('audio/x-raw-float, channels=2')) convert_1.link(encode) encode.link(decode_2) link_on_demand(decode_2, convert_2) convert_2.link(target, caps) run_pipe(pipe, 'input-%d' % q) del pipe print 'done %s' % i q += 1 step = 1763520 lens = [OS.stat(i).st_size for i in temps] blocks = (max(*lens) + step - 1)//step while True: seq = [] done = {} for i in xrange(blocks): j = SR.randrange(q) done[j] = True seq.append(j) ok = True for i in xrange(q): if i not in done: ok = False break if ok: break ff = [open(i, 'rb') for i in temps] mix = OS.path.join(tmp, 'mix.raw') out = open(mix, 'wb') pos = 0 for i in seq: f = ff[i] f.seek(pos) buf = f.read(step) out.write(buf) if len(buf) < step: break pos += step out.close() for f in ff: f.close() f = open(output + '.sequence', 'w') f.write(', '.join([str(i) for i in seq]) + '\n') f.close() pipe = GS.Pipeline() origin = make_element('filesrc', location = mix) convert = make_element('audioconvert') encode = make_element('flacenc', quality = 8) target = make_element('filesink', location = output) pipe.add(origin, convert, encode, target) origin.link(convert, caps) GS.element_link_many(convert, encode, target) run_pipe(pipe, 'output') del pipe print 'all done' finally: SH.rmtree(tmp)