import ffmpeg import json import os import sys import yt_dlp from base64 import b64encode from copy import deepcopy from datetime import datetime from decimal import * from urllib.parse import urlparse, parse_qs URL_CACHE_FILE = "stream_urls.json" def get_stream_urls(video_url): ydl_opts = { "format": "best", "quiet": True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(video_url, download=False) for fmt in info_dict["formats"]: if fmt.get("protocol") == "https" and fmt.get("acodec") != "none": audio_url = fmt["url"] elif fmt.get("protocol") == "https" and fmt.get("vcodec") != "none": video_url = fmt["url"] return { "video": video_url, "audio": audio_url, "time": info_dict["release_timestamp"], } def get_pyonmaku_tag(start_time, end_time, stream_timestamp): start_timestamp = int(stream_timestamp + start_time) end_timestamp = int(stream_timestamp + end_time) start_timestamp_base64 = ( b64encode(start_timestamp.to_bytes(4)).decode("utf-8").rstrip("=") ) end_timestamp_base64 = ( b64encode(end_timestamp.to_bytes(4)).decode("utf-8").rstrip("=") ) return f"!Pyonmaku({start_timestamp_base64}:{end_timestamp_base64})" def check_if_expired(stream_url): url_expiry = float(parse_qs(urlparse(stream_url).query)["expire"][0]) return datetime.now().timestamp() > url_expiry def remove_old_urls(url_cache): pruned_cache = deepcopy(url_cache) for url, data in url_cache.items(): if check_if_expired(data["video"]) or check_if_expired(data["audio"]): del pruned_cache[url] return pruned_cache def main(): url = sys.argv[1] if len(sys.argv) >= 2 else "" start_time = sys.argv[2] if len(sys.argv) >= 3 else "" end_time = sys.argv[3] if len(sys.argv) >= 4 else "" output_filename = sys.argv[4] if len(sys.argv) >= 5 else "clip.mp4" if not url or not start_time or not end_time: print( "Usage: clip_video.py