view WinampLibrary.py @ 0:af65171c2294

Read support, experimental write support (.dat only)
author brad
date Fri, 09 Nov 2012 19:10:59 -0600
parents
children
line wrap: on
line source

import os
import struct
import Media
import time
from datetime import datetime

class WinampLibrary:

	base_dir = ''
	# Fields mappings in the format:
	#	<MLMedia attribute>: {'name': <Winamp internal name>, 'type': <Winamp data type>, 'winamp_id': <Winamp field ID>}
	field_encode = {
				'file_name': {'name': 'filename', 'type': 12, 'winamp_id': 0},
				'title': {'name': 'title', 'type': 3, 'winamp_id': 1},
				'artist': {'name': 'artist', 'type': 3, 'winamp_id': 2},
				'album': {'name': 'album', 'type': 3, 'winamp_id': 3},
				'year': {'name': 'year', 'type': 4, 'winamp_id': 4},
				'genre': {'name': 'genre', 'type': 3, 'winamp_id': 5},
				'comment': {'name': 'comment', 'type': 3, 'winamp_id': 6},
				'album_artist': {'name': 'albumartist', 'type': 3, 'winamp_id': 20},
				'composer': {'name': 'composer', 'type': 3, 'winamp_id': 24},
				'publisher': {'name': 'publisher', 'type': 3, 'winamp_id': 23},
				'category': {'name': 'category', 'type': 3, 'winamp_id': 34},
				'track': {'name': 'trackno', 'type': 4, 'winamp_id': 7},
				'length': {'name': 'length', 'type': 11, 'winamp_id': 8},
				'type': {'name': 'type', 'type': 4, 'winamp_id': 9},
				'last_update': {'name': 'lastupd', 'type': 10, 'winamp_id': 10},
				'last_play': {'name': 'lastplay', 'type': 10, 'winamp_id': 11},
				'rating': {'name': 'rating', 'type': 4, 'winamp_id': 12},
				'play_count': {'name': 'playcount', 'type': 4, 'winamp_id': 15},
				'file_time': {'name': 'filetime', 'type': 10, 'winamp_id': 16},
				'file_size': {'name': 'filesize', 'type': 4, 'winamp_id': 17},
				'bit_rate': {'name': 'bitrate', 'type': 4, 'winamp_id': 18},
				'disc': {'name': 'disc', 'type': 4, 'winamp_id': 19},
				'replaygain_album_gain': {'name': 'replaygain_album_gain', 'type': 3, 'winamp_id': 21},
				'replaygain_track_gain': {'name': 'replaygain_track_gain', 'type': 3, 'winamp_id': 22},
				'bpm': {'name': 'bpm', 'type': 4, 'winamp_id': 25},
				'discs': {'name': 'discs', 'type': 4, 'winamp_id': 26},
				'tracks': {'name': 'tracks', 'type': 4, 'winamp_id': 27},
				'is_podcast': {'name': 'ispodcast', 'type': 4, 'winamp_id': 28},
				'podcast_channel': {'name': 'podcastchannel', 'type': 3, 'winamp_id': 29},
				'podcast_pub_date': {'name': 'podcastpubdate', 'type': 10, 'winamp_id': 30},
				'gracenote_file_id': {'name': 'GracenoteFileID', 'type': 3, 'winamp_id': 31},
				'gracenote_ext_data': {'name': 'GracenoteExtData', 'type': 3, 'winamp_id': 32},
				'lossless': {'name': 'lossless', 'type': 4, 'winamp_id': 33},
				'codec': {'name': 'codec', 'type': 3, 'winamp_id': 35},
				'director': {'name': 'director', 'type': 3, 'winamp_id': 36},
				'producer': {'name': 'producer', 'type': 3, 'winamp_id': 37},
				'width': {'name': 'width', 'type': 4, 'winamp_id': 38},
				'height': {'name': 'height', 'type': 4, 'winamp_id': 39},
				'tuid2': {'name': 'tuid2', 'type': 3, 'winamp_id': 14},
				}
	field_decode = dict((v['name'], k) for k, v in field_encode.iteritems())
	medias = []
	
	#def __init__(self):
	#	self.base_dir = base_dir
	
	def files_from_path(self, path):
		path = os.path.normpath(path)
		if not os.path.isdir(path):
			path = os.path.dirname(path)
		idxfile = os.path.join(path, 'main.idx')
		datfile = os.path.join(path, 'main.dat')
		return idxfile, datfile
	
	def read(self, path):
		self.fields = {}
		idxfile, datfile = self.files_from_path(path)
		if not os.path.isfile(idxfile):
			raise IOError('Could not find Winamp library index file ' + idxfile)
		if not os.path.isfile(idxfile):
			raise IOError('Could not find Winamp library data file ' + datfile)	
		
		idx = open(idxfile, 'rb')
		if idx.read(8) != 'NDEINDEX':
			idx.close()
			raise Exception(idxfile + ' does not appear to be a valid Winamp library index file')
		num_records, = struct.unpack('<i', idx.read(4))
		idx.read(4) # no one seems to know what these four bytes are for
		dat = open(datfile, 'rb')
		for _ in range(0, num_records):
			data = idx.read(8)
			if len(data) < 8:
				break
			offset, media_id = struct.unpack('ii', data)
			print offset, media_id
			media = self.read_media(dat, offset)
			if media is not None:
				self.medias.append(media)
		idx.close()
		dat.close()
		
	def write(self, path):
		fields_sorted = sorted(self.field_encode.itervalues(), key = lambda field: field['winamp_id'])
		self.fields = list(self.field_decode[i['name']] for i in fields_sorted)
		
		idxfile, datfile = self.files_from_path(path)
		idx = open(idxfile, 'wb')
		dat = open(datfile, 'wb')
		idx.write('NDEINDEX')
		dat.write('NDETABLE')
		self.last_write_offset = 0
		self.current_write_offset = 8
		# Number of actual media files, plus the column and index records
		num_records = len(self.medias) + 2
		# Write the number of records plus four mystery bytes to the index
		idx.write(struct.pack('<i', num_records) + '\xFF\x00\x00\x00')
		
		self.write_column_record(dat)
		self.write_mystery_record(dat)

		for media in self.medias:
			self.write_media_record(dat, media)		
	
	def write_column_record(self, dat):
		position = 'first'
		n = 0
		for field in self.fields:
			if n == len(self.fields) - 1:
				position = 'last'
			self.write_field(dat, field, self.field_encode[field]['name'], position, header_record = True)
			n += 1
			if n == 1:
				position = 'middle'
	
	# Who knows what this is for?
	def write_mystery_record(self, dat):
		data_packed = '\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x04\x4E\x6F\x6E\x65\xAE'
		offset_next = self.current_write_offset + len(data_packed) + 14
		header = struct.pack('<BBiii', 255, 1, len(data_packed), offset_next, 0)

		write_data = header + data_packed
		dat.write(write_data)

		self.last_write_offset = self.current_write_offset
		self.current_write_offset += len(write_data)

		data_packed = '\x00\x00\x00\x00\x0C\x00\x00\x00\x08\x66\x69\x6C\x65\x6E\x61\x6D\x65\x00'
		offset_prev = self.last_write_offset
		header = struct.pack('<BBiii', 0, 1, len(data_packed), 0, offset_prev)

		write_data = header + data_packed
		dat.write(write_data)

		self.last_write_offset = self.current_write_offset
		self.current_write_offset += len(write_data)
	
	def write_media_record(self, dat, media):
		position = 'first'
		n = 0
		for field in self.fields:
			if n == len(self.fields) - 1:
				position = 'last'
			self.write_field(dat, field, getattr(media, field), position)
			n += 1
			if n == 1:
				position = 'middle'

	def write_field(self, dat, field, data, position, header_record = False):
		if data is None:
			return
		
		field_id = self.field_encode[field]['winamp_id']
		data_type = self.field_encode[field]['type']
		
		if header_record:
			data_packed = struct.pack('<BBB', data_type, 0, len(data))
			data_packed += data
			data_type = 0
		else:
			# String field
			if data_type == 3 or data_type == 12:
				data_encoded = data.encode('utf-16')
				data_packed = struct.pack('<H', len(data_encoded)) + data_encoded
		
			# Integer field
			elif data_type == 4 or data_type == 11:
				data_packed = struct.pack('<i', data)

			# Date field
			elif data_type == 10:
				timestamp = int(time.mktime(data.timetuple()))
				data_packed = struct.pack('<i', timestamp)

			else:
				print 'Warning: unsupported data type ' + str(data_type) + ' for field ' + field

		offset_prev = self.last_write_offset
		offset_next = self.current_write_offset + len(data_packed) + 14
		if position == 'first':
			offset_prev = 0
		elif position == 'last':
			offset_next = 0
			
		header = struct.pack('<BBiii', field_id, data_type, len(data_packed), offset_next, offset_prev)
		write_data = header + data_packed
		dat.write(write_data)
		
		self.last_write_offset = self.current_write_offset
		self.current_write_offset += len(write_data)
		
	def read_media(self, dat, offset):
		dat.seek(offset)
		media = Media.Media()
		while True:
			field_id, data_type, size, offset_next, offset_prev = struct.unpack('<BBiii', dat.read(14))
			print 'data', field_id, data_type, size, offset_next, offset_prev
			# Column field
			if data_type == 0:
				# The column name is always a string, and the middle byte appears to be unused
				_, _, data_size = struct.unpack('<BBB', dat.read(3))
				field_name = dat.read(data_size)
				if field_name in self.field_decode:
					self.fields[field_id] = field_name
				else:
					print 'Warning: unknown field name ' + field_name
			
			elif field_id in self.fields:
				prop = self.field_decode[self.fields[field_id]]
				value = None

				# String field
				if data_type == 3 or data_type == 12:
					data_size, = struct.unpack('<H', dat.read(2))
					data = dat.read(data_size)
					enc = 'utf-16' if data.find('\xff\xfe') == 0 else 'ascii'
					value = unicode(data, encoding=enc, errors='ignore')
				
				# Integer field
				elif data_type == 4 or data_type == 11:
					value, = struct.unpack('<i', dat.read(4))
				
				# Date field
				elif data_type == 10:
					timestamp, = struct.unpack('<i', dat.read(4))
					value = datetime.fromtimestamp(timestamp)
				
				# Index field? Not actually a media
				elif data_type == 1:
					return

				else:
					print 'Warning: unsupported data type ' + str(data_type) + ' for field ' + self.fields[field_id]
				
				if value is not None:
					setattr(media, prop, value)
			
			else:
				print '?' + str(field_id)	
			if offset_next == 0:
				break
			dat.seek(offset_next)
		
		if data_type > 1: # not column or index field
			return media

lib = WinampLibrary()
lib.read('C:\\Users\\Brad\\AppData\\Roaming\\Winamp\\Plugins\\ml')
lib.write('C:\\Users\\Brad\\AppData\\Roaming\\Winamp\\Plugins\\ml\\out')