Sunday, February 22, 2009

foreach.py and alac.py

I've had an iPod Touch for quite some time now, but after upgrading to the 2.2 firmware, lost all of my old (corrupted) music. This means one thing: time to put music onto it. Rather than do ffmpeg and such by hand, I decided to write a pair of Python scripts to do it.

  • The first script is foreach.py, which iterates through each line in a file, interpreting each line as a file:

    #!/usr/bin/python
    from sys import argv, stderr


    class ArgLib2(object):
    class Mapping(object):
    def __init__(self, target):
    self.target = target
    def get_target(self):
    return self.target
    class ValuedMapping(Mapping):
    def __init__(self, target, value):
    ArgLib2.Mapping.__init__(self, target)
    self.default_value = value
    def value(self):
    return self.default_value
    def __init__(self, long = {}, long_synoms = {}, short_synoms = {}):
    self.long, self.long_synoms, self.short_synoms = long, long_synoms, short_synoms
    self.anonymous = []
    self.error = False
    def add_long(self, name, default_value = None):
    self.long[name] = default_value
    def add_long_synom(self, name, mapping):
    self.long_synoms[name] = mapping
    def add_short_synom(self, name, mapping):
    self.short_synoms[name] = mapping
    def get_value(self, name):
    return self.long[name]
    def set_value(self, name, value):
    self.long[name] = value
    def each_anonymous(self):
    for arg in anynmous:
    yield arg
    def get_anonymous(self, i):
    return self.anonymous[i]
    def anonymous_len(self):
    return len(self.anonymous)
    def __getitem__(self, i):
    try:
    return self.long[i]
    except KeyError:
    try:
    return self.anonymous[i]
    except IndexError:
    raise KeyError, 'No such option or anonymous index: %s' % i
    def parse(self, args, stoplevel = None):
    nopts = 0
    self.anonymous = []
    self.error = False
    i = 0
    while i < len(args):
    arg = args[i]
    if arg[0] == '-':
    if arg == '--': # End parsing
    i += 1
    break
    elif arg[1] == '-':
    if '=' in arg: # Long arguments
    parts = arg.partition('=')
    tgt = parts[0][2:]
    if tgt in self.long:
    self.long[parts[0][2:]] = parts[2]
    else:
    print >>stderr, 'No such option: %s' % parts[0]
    else: # Long synonyms
    try:
    mapping = self.long_synoms[arg[2:]]
    if mapping.get_target() in self.long:
    try: # Long synonyms that automatically set a value without an argument
    self.long[mapping.get_target()] = mapping.value()
    except AttributeError: # Long synonyms that require an argument
    i += 1
    if i < len(args):
    self.long[mapping.get_target()] = args[i]
    else:
    print >>stderr, 'Option requires a value: %s' % arg
    else:
    print >>stderr, 'No such internal option: %s' % mapping.get_target()
    except KeyError:
    print >>stderr, 'No such option: %s' % arg
    error = True
    else: # Short arguments
    options = arg[1:]
    j = 0
    while j < len(options):
    try:
    mapping = self.short_synoms[options[j]]
    if mapping.get_target() in self.long:
    try: # Short synonyms that automatically set a value without an argument
    self.long[mapping.get_target()] = mapping.value()
    except AttributeError: # Short synonyms that require an argument
    if j == len(options) - 1: # Must be the last short option to specify a value
    i += 1
    if i < len(args):
    self.long[mapping.get_target()] = args[i]
    else:
    print >>stderr, 'Option requires a value: %s' % options[j]
    break
    else:
    print >>stderr, 'Option requires a value: %s' % options[j]
    else:
    print >>stderr, 'No such internal option: %s' % mapping.get_target()
    except KeyError:
    print >>stderr, 'No such short option: %s' % options[j]
    j += 1
    else:
    nopts += 1
    self.anonymous.append(arg)
    if not (stoplevel is None) and nopts >= stoplevel:
    i += 1
    break
    i += 1
    if i > 0:
    self.anonymous += args[i - 1:]
    else:
    self.anonymous = args
    return not self.error
    @staticmethod
    def create_mapping(target):
    return ArgLib2.Mapping(target)
    @staticmethod
    def create_mapping_valued(target, default_value):
    return ArgLib2.ValuedMapping(target, default_value)


    from os.path import isfile
    from os import fork, waitpid, WEXITSTATUS, execvp
    from sys import exit
    MAX_ARG_COUNT = 16

    def forkexec(cmd, args, wait = True):
    pid = fork()
    if pid: # Parent
    id, status = waitpid(pid, 0)
    if wait:
    estatus = WEXITSTATUS(status)
    if estatus == 127:
    print >>stderr, '%s: Unknown command: %s' % cmd
    return estatus
    else:
    return 0
    else: # Child
    execvp(cmd, args)
    exit(127)._exit()
    def chomp(string):
    if string[-1] == '\n':
    return string[:-1]
    else:
    return string

    def process_arguments(args, repl):
    return [unicode(arg, encoding = 'utf8').replace('{}', unicode(repl, encoding = 'utf8')) for arg in args]

    if __name__ == '__main__':
    parser = ArgLib2({
    'mode' : None,
    'fork' : None
    },
    {
    'xargmode' : ArgLib2.create_mapping_valued('mode', 'xarg'),
    'multifork' : ArgLib2.create_mapping_valued('fork', 'multi')
    },
    {
    'x' : ArgLib2.create_mapping_valued('mode', 'xarg')
    }
    )
    if parser.parse(argv[1:], 2) and parser.anonymous_len() >= 2:
    cmd = parser[1]
    if isfile(parser[0]):
    f = open(parser[0], 'r')
    if parser['mode'] == 'xarg':
    args = []
    run = True
    while run:
    # DO
    ln = f.readline()
    ln_len = len(ln)
    while len(args) <= MAX_ARG_COUNT and ln_len > 0:
    if ln != '\n':
    args.append(chomp(ln))
    # DO
    ln = f.readline()
    ln_len = len(ln)
    forkexec(cmd, parser.anonymous[2:] + args)
    args = []
    if ln_len == 0:
    run = False
    break
    elif parser['fork'] == 'multi':
    for line in f:
    if line != '\n':
    line = chomp(line)
    forkexec(cmd, process_arguments(parser.anonymous[2:], line), wait = False)
    else:
    for line in f:
    if line != '\n':
    line = chomp(line)
    forkexec(cmd, process_arguments(parser.anonymous[2:], line))
    f.close()
    else:
    print >>stderr, '%s: No such file: %s' % (argv[0], parser[0])
    exit(2)
    else:
    print >>stderr, """Usage: %s [ options ] srcfile program ...
    --xargmode: Act like xargs, running the program as many times as needed.
    --multifork: For non-xargs mode, fork a process for each file but don't wait. USE AT YOUR OWN RISK.""" % argv[0]
    exit(1)
    Usage: foreach.py [ options ] srcfile program ...
    --xargmode: Act like xargs, running the program as many times as needed.
    --multifork: For non-xargs mode, fork a process for each file but don't wait. USE AT YOUR OWN RISK.
  • The second script is alac.py, which converts a file to ALAC using FFMPEG, and uses TagPy and AtomicParsley to copy some of the tags from the source file into the destination ALAC file:

    #!/usr/bin/python
    from os import fork, waitpid, WEXITSTATUS, execvp, remove, dup2, pipe
    from os import read as os_read
    from os import close as os_close
    from os import write as os_write
    from os.path import isfile
    from os.path import basename as os_basename
    from sys import argv, stderr
    from tagpy import FileRef

    from subprocess import Popen, PIPE






    def convertFile(ifile, ofile):
    if isfile(ifile):
    print ifile, ofile
    f = FileRef(ifile)
    info = f.tag()
    print '\nTitle:', info.title, '\nArtist:', info.artist, '\nAlbum:', info.album, '\nYear:', info.year, '\nTrack:', info.track, '\nGenre: ', info.genre

    err = Popen(('ffmpeg', '-i', ifile, '-acodec', 'alac', ofile), stderr = PIPE).communicate()[1]
    for line in err.split('\n'):
    if 'Multiple frames in a packet' not in line:
    print >>stderr, line

    if isfile(ofile):
    p = Popen(('AtomicParsley', ofile, '--artist', unicode(info.artist), '--album', unicode(info.album), '--title', unicode(info.title), '--year', unicode(info.year), '--tracknum', unicode(info.track), '--genre', unicode(info.genre)))
    id, status = waitpid(p.pid, 0)
    if WEXITSTATUS(status) != 0:
    print >>stderr, 'Child exited with %d' % WEXITSTATUS(status)
    remove(ofile)
    return True
    else:
    return False
    else:
    return False

    def basename(filename, extensions = []):
    ret = os_basename(filename)
    parts = ret.rpartition('.')
    if parts[2] in extensions:
    return parts[0]
    else:
    return ret



    if len(argv) >= 2:
    for arg in argv[1:]:
    if not convertFile(arg, unicode(basename(arg, ['flac']), encoding = 'utf8') + u'.m4a'):
    print >>stderr, '%s: Failed to process: %s' % (argv[0], arg)
    else:
    print >>stderr, 'Usage: %s infiles..' % argv[0]

I used these two scripts together in the following manner:

python ../foreach.py --xargmode $PLAYLIST python alac.py

Not only does it support FLAC input, but it supports anything that both AtomicParsley and FFMPEG support.

Monday, February 2, 2009

Python Typechecker

Excited about the superdynamicness of Python, I decided to build a simple typechecking system for it, as a friend of mine mentioned he did quite some time ago:

types.py

#!/usr/bin/python

def typechecker(types, args):
return not False in [types[i] in type(args[i]).__mro__ for i in xrange(len(types))]

def typed(return_type, *types):
def typedf(f):
def ret(*args):
if len(args) != len(types):
raise TypeError, 'Invalid arguments; requires %u arguments.' % len(types)
elif not typechecker(types, args):
raise TypeError, 'Invalid arguments (%s); requires: %s' % ([type(arg) for arg in args], types)
r = f(*args)
if not (return_type is None) and not (return_type in type(r).__mro__):
raise TypeError, 'Invalid return type (%s); requires: %s' % (type(r), return_type)
return r
return ret
return typedf

Here's an example of its use:

if __name__ == '__main__':
@typed(basestring, basestring, int, int)
def safe_str_getslice(string, start, length):
return string[start:length]


print safe_str_getslice('Pastafarianism', 0, 5)
print safe_str_getslice('Pastafarianism', 0, 5.1)
Pasta
Traceback (most recent call last):
File "types.py", line 28, in <module>
print safe_str_getslice('Pastafarianism', 0, 5.1)
File "types.py", line 12, in ret
raise TypeError, 'Invalid arguments (%s); requires: %s' % ([type(arg) for arg in args], types)
TypeError: Invalid arguments ([<type 'str'>, <type 'int'>, <type 'float'>]); requires: (<type 'basestring'>, <type 'int'>, <type 'int'>)