#!/usr/bin/env python3 import os import re import shutil import subprocess import sys import tempfile import asyncio from pathlib import Path # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- TRACK_TAGS = ["TRACKNUMBER", "TRACK", "Track name/Position"] ALBUM_ARTIST_TAGS = {"ALBUMARTIST", "ALBUM_ARTIST"} ILLEGAL_FILENAME_CHARS = re.compile(r'[\\/:*?"<>|]') LEADING_TRACK_NUM = re.compile(r'^[0-9]+[. ]*') ORIGINAL_MIX = re.compile(r'\s*\(Original Mix\)', re.IGNORECASE) SEPARATOR_RE = re.compile(r'[;&]+|\s+&\s+') # --------------------------------------------------------------------------- # Utility helpers (Synchronous for pre-flight) # --------------------------------------------------------------------------- def find_executable(name: str) -> str | None: script_dir = Path(__file__).parent local = script_dir / (f"{name}.exe" if os.name == "nt" else name) return str(local) if local.exists() else shutil.which(name) def run(cmd, **kwargs) -> subprocess.CompletedProcess: return subprocess.run(cmd, capture_output=True, text=True, check=False, **kwargs) def get_tag_values(file_path, tag_name: str, metaflac: str) -> list[str]: result = run([metaflac, f"--show-tag={tag_name}", str(file_path)]) if result.returncode != 0 or not result.stdout: return [] return [ line.split("=", 1)[1].strip() for line in result.stdout.splitlines() if "=" in line ] def safe_filename(name: str) -> str: return re.sub(r'\s+', ' ', ILLEGAL_FILENAME_CHARS.sub('', name)).strip() # --------------------------------------------------------------------------- # Artist processing # --------------------------------------------------------------------------- def join_artists(parts: list[str]) -> str: if not parts: return "" if len(parts) == 1: return parts[0] if len(parts) == 2: return f"{parts[0]} & {parts[1]}" return ", ".join(parts[:-1]) + f" & {parts[-1]}" def process_artist(artists: list[str], album_artist: str | None = None) -> str: parts = [a.strip() for a in artists if a.strip()] if not parts: return "" seen: set[str] = set() unique = [p for p in parts if not (p in seen or seen.add(p))] if album_artist and album_artist in unique and unique[0] != album_artist: unique.remove(album_artist) unique.insert(0, album_artist) return join_artists(unique) # --------------------------------------------------------------------------- # Title helpers # --------------------------------------------------------------------------- def extract_title_from_filename(filename_stem: str, artist_hint: str = "") -> str: stem = LEADING_TRACK_NUM.sub('', filename_stem) if " - " not in stem: return stem parts = stem.split(" - ", 1) first, second = parts[0], parts[1] if artist_hint: if stem.startswith(f"{artist_hint} - "): return stem[len(artist_hint) + 3:] if stem.endswith(f" - {artist_hint}"): return stem[: -len(artist_hint) - 3] TITLE_HINTS = ("(", "feat", "ft") return first if any(h in first.lower() for h in TITLE_HINTS) else second def extract_track_from_metadata(file_path, metaflac: str) -> str: for tag in TRACK_TAGS: values = get_tag_values(file_path, tag, metaflac) if values: num = values[0].split('/')[0].strip() if num.isdigit(): return num return "" def extract_title_from_metadata(file_path, metaflac: str) -> str: titles = get_tag_values(file_path, "TITLE", metaflac) if titles: return titles[0] artist_hint = (get_tag_values(file_path, "ARTIST", metaflac) or [""])[0] return extract_title_from_filename(Path(file_path).stem, artist_hint) # --------------------------------------------------------------------------- # Pre-flight checks # --------------------------------------------------------------------------- def check_genre_tags(input_dir: Path, metaflac: str) -> str | None: print("Checking genre tags...") flac_files = list(input_dir.glob("*.flac")) if not flac_files: return None with_genre = sum(bool(get_tag_values(f, "GENRE", metaflac)) for f in flac_files) without_genre = len(flac_files) - with_genre print(f"Found {len(flac_files)} FLAC files: {with_genre} with genre tags, {without_genre} without.") if without_genre == len(flac_files): print("\nAll FLAC files are missing genre tags!") genre = input("Enter a genre to apply to all files (or Enter to skip): ").strip() if genre: print(f"Will set genre to: '{genre}' for all files") return genre print("Skipping genre tagging") elif without_genre > 0: print("Some files missing genre — preserving existing genres.") return None def detect_album_artist(input_dir: Path, metaflac: str) -> str | None: print("Detecting album artist...") flac_files = list(input_dir.glob("*.flac")) if not flac_files: return None freq: dict[str, int] = {} for fp in flac_files: raw = ",".join(get_tag_values(fp, "ARTIST", metaflac)) for part in SEPARATOR_RE.sub(',', raw).split(','): part = part.strip() if part: freq[part] = freq.get(part, 0) + 1 if not freq: print("No artists found in metadata.") return _prompt_album_artist() total = len(flac_files) universal = [a for a, c in freq.items() if c == total] if len(universal) == 1: print(f"Detected album artist: '{universal[0]}' (in all {total} tracks)") return universal[0] if len(universal) > 1: chosen = sorted(universal)[0] print(f"Multiple universal artists — selected '{chosen}'") return chosen top_artist, top_count = max(freq.items(), key=lambda x: x[1]) if top_count >= max(2, total * 0.5): print(f"Likely album artist: '{top_artist}' ({top_count}/{total} tracks)") return top_artist print("Could not reliably detect album artist from frequency analysis.") return _prompt_album_artist() def _prompt_album_artist() -> str | None: print("\nUnable to automatically detect album artist.") name = input("Enter the album artist (or Enter to skip reordering): ").strip() if name: print(f"Will use '{name}' as album artist for reordering") return name print("Skipping artist reordering") return None def extract_cover_art(input_dir: Path, metaflac: str) -> None: covers = ( list(input_dir.glob("cover.jpg")) + list(input_dir.glob("cover.png")) + list(input_dir.glob("folder.jpg")) + list(input_dir.glob("*.jpg")) + list(input_dir.glob("*.png")) ) if covers: return print("No cover art found — attempting extraction from FLAC files...") for fp in input_dir.glob("*.flac"): cover_path = input_dir / "cover.jpg" result = subprocess.run( [metaflac, f"--export-picture-to={cover_path}", str(fp)], capture_output=True, check=False, ) if result.returncode == 0 and cover_path.exists(): print(f"Cover art extracted from {fp.name}") break def find_title_corrections(input_dir: Path, metaflac: str) -> dict[str, str]: print("Checking for incomplete title metadata...") corrections: dict[str, str] = {} for fp in input_dir.glob("*.flac"): titles = get_tag_values(fp, "TITLE", metaflac) current = titles[0] if titles else "" if not current: continue artist_hint = (get_tag_values(fp, "ARTIST", metaflac) or [""])[0] filename_title = extract_title_from_filename(fp.stem, artist_hint) filename_title = LEADING_TRACK_NUM.sub('', filename_title) has_parens = "(" in filename_title and ")" in filename_title metadata_lacks_parens = "(" not in current if (has_parens and metadata_lacks_parens) or ( len(filename_title) > len(current) + 10 and current in filename_title ): print(f" Incomplete title in {fp.name} — will fix during processing") corrections[str(fp)] = filename_title return corrections def preprocess_files( input_dir: Path, metaflac: str, title_corrections: dict[str, str] ) -> None: print("Pre-processing files without track numbers...") renames: list[tuple[Path, Path]] = [] for fp in input_dir.glob("*.flac"): if re.match(r'^[0-9]+', fp.name): continue track_num = extract_track_from_metadata(fp, metaflac) title = title_corrections.get(str(fp)) or extract_title_from_metadata(fp, metaflac) if track_num and title: new_name = f"{int(track_num):02d} {safe_filename(title)}.flac" renames.append((fp, input_dir / new_name)) for old, new in renames: if not new.exists(): old.rename(new) print(f" Renamed: {old.name} -> {new.name}") if str(old) in title_corrections: title_corrections[str(new)] = title_corrections.pop(str(old)) # --------------------------------------------------------------------------- # Per-file tag parsing # --------------------------------------------------------------------------- def parse_tags( tags_file: Path, file_path: str, title_corrections: dict[str, str], ) -> tuple[dict[str, str], list[str], str]: tag_map: dict[str, str] = {} artists: list[str] = [] title = "" with open(tags_file, encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if "=" not in line: continue key, value = line.split("=", 1) key = key.upper().strip() value = ( value.replace("\u2018", "'") .replace("\u2019", "'") .replace("\u201c", '"') .replace("\u201d", '"') .strip() ) if key in ALBUM_ARTIST_TAGS: continue # Unconditionally drop all DESCRIPTION and COMMENT tags if key in ("DESCRIPTION", "COMMENT"): continue if key == "TITLE": title = ORIGINAL_MIX.sub('', value).strip() or value if file_path in title_corrections: title = title_corrections[file_path] tag_map["TITLE"] = title elif key == "ARTIST": normalized = SEPARATOR_RE.sub(',', value) artists.extend(p.strip() for p in normalized.split(',') if p.strip()) else: tag_map[key] = value return tag_map, artists, title # --------------------------------------------------------------------------- # Asynchronous Processing Engine # --------------------------------------------------------------------------- async def run_async(*args) -> None: proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode().strip() or stdout.decode().strip() raise Exception(f"Command failed with code {proc.returncode}: {error_msg}") async def process_single_file( file_path: Path, input_dir: Path, flac_exe: str, metaflac_exe: str, global_genre: str | None, album_artist: str | None, title_corrections: dict[str, str], sem: asyncio.Semaphore ) -> None: async with sem: m = re.match(r'^([0-9]+)', file_path.name) if not m: print(f"Skipping {file_path.name} — no valid track number after preprocessing") return track_number = int(m.group(1)) formatted_track = f"{track_number:02d}" with tempfile.TemporaryDirectory() as tmpdir: tmpdir_path = Path(tmpdir) tags_file = tmpdir_path / f"{file_path.stem}_tags.txt" try: await run_async( metaflac_exe, f"--export-tags-to={tags_file}", "--no-utf8-convert", str(file_path) ) except Exception as e: print(f"Error exporting tags from {file_path.name}: {e}") return tag_map, artists, title = parse_tags(tags_file, str(file_path), title_corrections) clean_track = tag_map.get("TRACKNUMBER", str(track_number)).split('/')[0].strip() tag_map["TRACK"] = clean_track tag_map["TRACKNUMBER"] = clean_track if not title: title = safe_filename(LEADING_TRACK_NUM.sub('', file_path.stem)) tag_map["TITLE"] = ORIGINAL_MIX.sub('', title).strip() if global_genre and "GENRE" not in tag_map: tag_map["GENRE"] = global_genre new_artist = process_artist(artists, album_artist) if artists and new_artist != ", ".join(artists): print(f" Artist reordered in {file_path.name}: '{', '.join(artists)}' -> '{new_artist}'") new_filename = f"{formatted_track} {safe_filename(title)}.flac" new_tags_file = tmpdir_path / f"{file_path.stem}_new_tags.txt" with open(new_tags_file, "w", encoding="utf-8") as f: for key, value in tag_map.items(): if key != "ARTIST": f.write(f"{key}={value}\n") f.write(f"ARTIST={new_artist}\n") temp_wav = tmpdir_path / f"{file_path.stem}.wav" temp_flac = tmpdir_path / f"{file_path.stem}_out.flac" try: print(f"Started: {file_path.name}...") await run_async(flac_exe, "-d", str(file_path), "-o", str(temp_wav)) await run_async(flac_exe, "-8", "-e", "-p", str(temp_wav), "-o", str(temp_flac)) await run_async( metaflac_exe, "--remove-all-tags", f"--import-tags-from={new_tags_file}", str(temp_flac) ) new_file_path = input_dir / new_filename shutil.move(str(temp_flac), str(new_file_path)) if new_file_path != file_path: file_path.unlink(missing_ok=True) print(f"Finished: {file_path.name} -> {new_filename}") except Exception as e: print(f"Failed to process {file_path.name} — original preserved. Error: {e}") async def process_all_files( input_dir: Path, flac_exe: str, metaflac_exe: str, global_genre: str | None, album_artist: str | None, title_corrections: dict[str, str] ) -> None: print(f"\nStarting main processing (up to 3 files concurrently)...") sem = asyncio.Semaphore(3) tasks = [] for file_path in input_dir.glob("*.flac"): task = asyncio.create_task( process_single_file( file_path, input_dir, flac_exe, metaflac_exe, global_genre, album_artist, title_corrections, sem ) ) tasks.append(task) if tasks: await asyncio.gather(*tasks) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main() -> None: flac_exe = find_executable("flac") metaflac_exe = find_executable("metaflac") if not flac_exe or not metaflac_exe: print("Error: flac and metaflac not found in PATH or script directory.") sys.exit(1) dir_name = input("Enter the directory to clean: ").strip() input_dir = Path.cwd() / dir_name if not input_dir.is_dir(): print(f"Error: Directory does not exist — {input_dir}") sys.exit(1) os.chdir(input_dir) global_genre = check_genre_tags(input_dir, metaflac_exe) album_artist = detect_album_artist(input_dir, metaflac_exe) extract_cover_art(input_dir, metaflac_exe) title_corrections = find_title_corrections(input_dir, metaflac_exe) preprocess_files(input_dir, metaflac_exe, title_corrections) asyncio.run( process_all_files( input_dir, flac_exe, metaflac_exe, global_genre, album_artist, title_corrections ) ) print("\nCleaning complete!") if __name__ == "__main__": main()