Files
beets-setup/scripts/unknown/convert.py
2026-05-12 12:27:32 -04:00

145 lines
4.9 KiB
Python

import os
import shutil
import argparse
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import logging
from tqdm import tqdm
# Define the sets of extensions
LOSSLESS_EXTENSIONS = {'.flac', '.ape', '.wav', '.alac', '.aiff'}
LOSSY_EXTENSIONS = {'.mp3', '.aac', '.ogg', '.wma'}
PLAYLIST_EXTENSIONS = {'.m3u', '.m3u8'}
LOSSLESS_CODECS = {'flac', 'alac', 'wav', 'ape', 'pcm_s16le', 'pcm_s24le', 'tta', 'wv', 'wmalossless', 'als'}
def parse_arguments():
parser = argparse.ArgumentParser(description='Convert lossless audio files to MP3 and copy over lossy files.')
parser.add_argument('source_dir', help='Source directory to search for files.')
parser.add_argument('dest_dir', help='Destination directory to output converted files.')
parser.add_argument('--bitrate', type=str, default='192k', help='Bitrate for MP3 conversion (e.g., 192k).')
parser.add_argument('--threads', type=int, default=4, help='Number of threads to use for conversion.')
return parser.parse_args()
def get_audio_codec(file_path):
cmd = [
'ffprobe',
'-v', 'error',
'-select_streams', 'a:0',
'-show_entries', 'stream=codec_name',
'-of', 'default=noprint_wrappers=1:nokey=1',
str(file_path)
]
try:
output = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode().strip()
return output
except subprocess.CalledProcessError as e:
tqdm.write(f'Error getting codec for file {file_path}: {e}')
return None
def is_lossless_codec(codec_name):
if codec_name is None:
return False
return codec_name.lower() in LOSSLESS_CODECS
def convert_to_mp3(source_file, dest_file, bitrate):
cmd = [
'ffmpeg',
'-v', 'quiet', # Suppress ffmpeg output
'-y', # Overwrite output files without asking
'-i', str(source_file),
'-codec:a', 'libmp3lame',
'-b:a', bitrate,
str(dest_file)
]
try:
subprocess.check_call(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as e:
tqdm.write(f'Error converting file {source_file} to MP3: {e}')
raise
def process_file(args):
source_file, dest_file, bitrate = args
ext = source_file.suffix.lower()
file_type = 'skip'
if ext in LOSSLESS_EXTENSIONS:
file_type = 'lossless'
dest_file = dest_file.with_suffix('.mp3')
elif ext in LOSSY_EXTENSIONS:
file_type = 'lossy'
elif ext in PLAYLIST_EXTENSIONS:
file_type = 'playlist'
elif ext in {'.m4a', '.wma'}:
codec = get_audio_codec(source_file)
if is_lossless_codec(codec):
file_type = 'lossless'
dest_file = dest_file.with_suffix('.mp3')
else:
file_type = 'lossy'
else:
# Skipping unsupported file types
return
# If destination file exists, skip
if dest_file.exists():
return
# Ensure destination directory exists
dest_file.parent.mkdir(parents=True, exist_ok=True)
# Log processing start
tqdm.write(f'Processing: {source_file}')
if file_type == 'playlist':
shutil.copy2(source_file, dest_file)
elif file_type == 'lossless':
# Convert to MP3
try:
convert_to_mp3(source_file, dest_file, bitrate)
except Exception as e:
tqdm.write(f'Failed to convert {source_file} to MP3: {e}')
elif file_type == 'lossy':
# Copy over lossy files
shutil.copy2(source_file, dest_file)
# Log processing end
tqdm.write(f'Finished processing: {source_file}')
def main():
args = parse_arguments()
source_dir = Path(args.source_dir)
dest_dir = Path(args.dest_dir)
bitrate = args.bitrate
num_threads = args.threads
# Set up logging to be minimal
logging.basicConfig(level=logging.CRITICAL)
# Collect all files to process
files_to_process = []
for root, dirs, files in os.walk(source_dir):
for file in files:
source_file = Path(root) / file
rel_path = source_file.relative_to(source_dir)
dest_file = dest_dir / rel_path
files_to_process.append((source_file, dest_file, bitrate))
# Process files in parallel with progress bar
total_files = len(files_to_process)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = {executor.submit(process_file, file_args): file_args[0] for file_args in files_to_process}
with tqdm(total=total_files, desc='Processing files', unit='file') as pbar:
for future in as_completed(futures):
# Update progress bar
pbar.update(1)
# Handle any exceptions
try:
future.result()
except Exception as e:
tqdm.write(f'Error processing file: {e}')
if __name__ == '__main__':
main()