import os
import subprocess
import json
from pathlib import Path
import shlex
import glob
def get_audio_tracks(video_path):
"""Use ffprobe to get audio track information including language codes"""
cmd = [
'ffprobe',
'-v', 'quiet',
'-print_format', 'json',
'-show_streams',
str(video_path)
]
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
data = json.loads(result.stdout)
audio_tracks = []
for stream in data.get('streams', []):
if stream['codec_type'] == 'audio':
track_id = stream['index']
lang = stream.get('tags', {}).get('language', 'und')
if lang == "deu": lang = "ger"
codec = stream.get('codec_name', 'unk')
channels = stream.get('channels', 0)
audio_tracks.append({
'id': track_id,
'language': lang,
'codec': codec,
'channels': channels
})
return audio_tracks
except subprocess.CalledProcessError as e:
print(f"Error probing {video_path}: {e}")
return []
except json.JSONDecodeError as e:
print(f"Error parsing ffprobe output for {video_path}: {e}")
return []
def extract_audio_track(video_path, track_id, language, codec, channels):
"""Extract a single audio track using ffmpeg"""
output_path = f"{video_path}.{codec}.{channels}ch.{language}.mka"
if os.path.exists(output_path):
return False
print(f"writing {output_path!r}")
temp_output_path = f"{output_path}.temp.mka"
cmd = [
'ffmpeg',
'-hide_banner',
'-loglevel', 'error',
'-y',
'-i', str(video_path),
'-map', f'0:{track_id}',
'-c', 'copy',
temp_output_path
]
print(">", shlex.join(cmd))
try:
subprocess.run(cmd, check=True)
except subprocess.CalledProcessError as e:
print(f"Error extracting audio from {video_path}: {e}")
return False
os.rename(temp_output_path, output_path)
return True
def process_video_file(video_path):
"""Process a single video file to extract its audio tracks"""
print(f"process_video_file {str(video_path)!r}")
audio_tracks = get_audio_tracks(video_path)
if not audio_tracks:
print(f"No audio tracks found in {video_path!r}")
return
has_german_audiotrack = False
for track in audio_tracks:
if track["language"] == "ger":
has_german_audiotrack = True
break
if not has_german_audiotrack:
print(f"No german audio track found in {video_path!r}")
for path in glob.glob(str(video_path) + ".*.*ch.eng.mka"):
print(f"removing {path!r}")
os.unlink(path)
return
for track in audio_tracks:
track_id = track['id']
extract_audio_track(video_path, track_id, track['language'], track['codec'], track['channels'])
def process_directory(root_dir):
"""Recursively process all video files in a directory"""
video_extensions = ('.mkv', '.mp4')
for dirpath, _, filenames in os.walk(root_dir):
for filename in filenames:
if filename.lower().endswith(video_extensions):
video_path = Path(dirpath) / filename
process_video_file(video_path)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Extract audio tracks from video files recursively')
parser.add_argument('directory', help='Directory to search for video files')
args = parser.parse_args()
if not os.path.isdir(args.directory):
print(f"Error: {args.directory} is not a valid directory")
exit(1)
process_directory(args.directory)
print("Audio extraction complete")