#!/usr/bin/env python3 """ Enhanced HTTP Server with Range Request Support for Media Streaming Supports HTTP Range requests to enable seeking in audio/video files. Can serve multiple directories mapped to different URL paths. Uses ThreadingHTTPServer for improved performance with multiple connections. """ import http.server import socketserver import argparse import os import socket import urllib.parse from datetime import datetime import sys import mimetypes import re # Global variables DIRECTORIES = {} PORT = 8000 def get_local_ip_addresses(): """Get a list of all local IP addresses on the machine.""" try: hostname = socket.gethostname() addresses = socket.getaddrinfo(hostname, None) ipv4_addresses = [addr[4][0] for addr in addresses if addr[0] == socket.AF_INET] return list(set(ipv4_addresses)) except Exception as e: print(f"Error getting IP addresses: {e}") return [] class RangeHTTPHandler(http.server.SimpleHTTPRequestHandler): """HTTP request handler that supports Range requests for media streaming.""" def log_message(self, format, *args): """Override to provide more detailed logs.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") client_address = self.client_address[0] message = format % args print(f"[{timestamp}] {client_address} - {message}") def translate_path(self, path): """Override the path translation to use our directory mappings.""" parsed_path = urllib.parse.urlparse(path) path = parsed_path.path # Handle multiple URL encoding layers original_path = path while True: decoded = urllib.parse.unquote(path) if decoded == path: break path = decoded if path == "/" or path == "/server-info": return super().translate_path(urllib.parse.unquote(self.path)) if path.startswith('/C/Users/') or path.startswith('/c/Users/'): windows_path = path[1].upper() + ":" + path[2:].replace('/', '\\') return windows_path for url_path, fs_path in DIRECTORIES.items(): if path.startswith(url_path): relative_path = path[len(url_path):] if not relative_path.startswith('/'): relative_path = '/' + relative_path normalized_path = os.path.normpath(relative_path).lstrip('/') result = os.path.join(fs_path, normalized_path) return result return super().translate_path(original_path) def parse_range_header(self, range_header, file_size): """Parse the Range header and return start and end byte positions.""" range_match = re.match(r'bytes=(\d*)-(\d*)', range_header) if not range_match: return None start_str, end_str = range_match.groups() if start_str and end_str: start = int(start_str) end = int(end_str) elif start_str and not end_str: start = int(start_str) end = file_size - 1 elif not start_str and end_str: start = file_size - int(end_str) end = file_size - 1 else: return None if start < 0 or end >= file_size or start > end: return None return start, end def send_range_response(self, file_path, range_header): """Send a partial content response for range requests.""" try: file_size = os.path.getsize(file_path) range_result = self.parse_range_header(range_header, file_size) if range_result is None: self.send_error(416, "Range Not Satisfiable") self.send_header("Content-Range", f"bytes */{file_size}") self.end_headers() return start, end = range_result content_length = end - start + 1 self.send_response(206, "Partial Content") content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(content_length)) self.send_header("Content-Range", f"bytes {start}-{end}/{file_size}") self.send_header("Accept-Ranges", "bytes") # Remove cache control headers for better performance self.end_headers() with open(file_path, 'rb') as f: f.seek(start) remaining = content_length while remaining > 0: chunk_size = min(65536, remaining) # 64KB chunks for better performance chunk = f.read(chunk_size) if not chunk: break self.wfile.write(chunk) remaining -= len(chunk) except Exception as e: self.send_error(500, f"Error serving range request: {str(e)}") def format_file_size(self, size_bytes): """Format file size in human readable format.""" if size_bytes == 0: return "0 B" size_names = ["B", "KB", "MB", "GB", "TB"] import math i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) s = round(size_bytes / p, 2) return f"{s} {size_names[i]}" def get_file_icon(self, filename, is_dir=False): """Get appropriate icon for file type.""" if is_dir: return "📁" ext = os.path.splitext(filename)[1].lower() if ext in ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v']: return "🎬" elif ext in ['.mp3', '.wav', '.flac', '.aac', '.ogg', '.wma', '.m4a']: return "🎵" elif ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg', '.webp']: return "🖼️" elif ext in ['.pdf', '.doc', '.docx', '.txt', '.rtf']: return "📄" elif ext in ['.zip', '.rar', '.7z', '.tar', '.gz']: return "📦" else: return "📄" def is_media_file(self, filename): """Check if file is a media file.""" ext = os.path.splitext(filename)[1].lower() return ext in ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm', '.m4v', '.mp3', '.wav', '.flac', '.aac', '.ogg', '.wma', '.m4a'] def create_directory_listing(self, path, file_path): """Create an enhanced directory listing page.""" try: file_list = os.listdir(file_path) except OSError: self.send_error(404, "Directory not found") return dirs = [] files = [] for item in file_list: item_path = os.path.join(file_path, item) if os.path.isdir(item_path): dirs.append(item) else: files.append(item) dirs.sort(key=str.lower) files.sort(key=str.lower) parent_path = "" if path != "/" and len(path.strip("/")) > 0: path_parts = path.strip("/").split("/") if len(path_parts) > 1: parent_path = "/" + "/".join(path_parts[:-1]) else: parent_path = "/" decoded_path = urllib.parse.unquote(path) html = f""" Media Browser - {decoded_path}
🏠 Home """ if path != "/": path_parts = decoded_path.strip("/").split("/") current_path = "" for i, part in enumerate(path_parts): current_path += "/" + part if i < len(path_parts) - 1: html += f'{part}' else: html += f'{part}' html += f"""

📁 {os.path.basename(decoded_path) or 'Root Directory'}

""" if parent_path: html += f"""
⬆️
.. (Parent Directory)
Directory
""" for dirname in dirs: # Use single encoding for directory URLs dir_url = path.rstrip('/') + '/' + urllib.parse.quote(dirname, safe='') html += f"""
📁
{dirname}
Directory
""" for filename in files: # Use single encoding for file URLs file_url = path.rstrip('/') + '/' + urllib.parse.quote(filename, safe='') full_file_path = os.path.join(file_path, filename) try: file_size = os.path.getsize(full_file_path) formatted_size = self.format_file_size(file_size) except: formatted_size = "Unknown" icon = self.get_file_icon(filename) is_media = self.is_media_file(filename) media_badge = 'MEDIA' if is_media else '' html += f"""
{icon}
{filename}
{formatted_size} {media_badge}
""" if not dirs and not files: html += """
📂

This directory is empty

No files or folders to display

""" html += """
""" if parent_path: html += f""" """ for dirname in dirs: # Use single encoding for directory URLs dir_url = path.rstrip('/') + '/' + urllib.parse.quote(dirname, safe='') html += f""" """ for filename in files: # Use single encoding for file URLs file_url = path.rstrip('/') + '/' + urllib.parse.quote(filename, safe='') full_file_path = os.path.join(file_path, filename) try: file_size = os.path.getsize(full_file_path) formatted_size = self.format_file_size(file_size) except: formatted_size = "Unknown" icon = self.get_file_icon(filename) file_type = "Media" if self.is_media_file(filename) else "File" html += f""" """ html += """
Name Size Type
⬆️ .. (Parent Directory) - Directory
📁 {dirname} - Directory
{icon} {filename} {formatted_size} {file_type}
""" self.send_response(200) self.send_header("Content-type", "text/html; charset=utf-8") self.end_headers() self.wfile.write(html.encode('utf-8')) def do_GET(self): """Handle GET requests, supporting Range requests for media files.""" if self.path == "/" or self.path == "/server-info": self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() local_ip = self.connection.getsockname()[0] html = f""" Multi-Directory HTTP Server with Range Support

Multi-Directory HTTP Server

Server is running on: {local_ip}:{PORT}

Available interfaces: {', '.join(get_local_ip_addresses())}

🎵 Media Streaming Features

✅ HTTP Range Requests: Enabled - supports seeking in audio/video files

✅ Media Types: MP4, MP3, AVI, MKV, WebM, OGG, WAV, FLAC, and more

✅ Browser Compatibility: Works with HTML5 video/audio players

Directory Mappings

""" for url_path, fs_path in DIRECTORIES.items(): html += f""" """ if os.name == 'nt': html += f""" """ html += """
URL Path File System Path Links
{url_path} {fs_path} Browse
/C/Users/ C:\\Users\\ Browse
""" self.wfile.write(html.encode()) return range_header = self.headers.get('Range') file_path = self.translate_path(self.path) if os.path.isdir(file_path): self.create_directory_listing(self.path, file_path) return if not os.path.isfile(file_path): self.send_error(404, "File not found") return if range_header: self.send_range_response(file_path, range_header) else: try: file_size = os.path.getsize(file_path) content_type = mimetypes.guess_type(file_path)[0] or 'application/octet-stream' self.send_response(200) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(file_size)) self.send_header("Accept-Ranges", "bytes") # Only add cache control for media files when needed self.end_headers() with open(file_path, 'rb') as f: while True: chunk = f.read(65536) # 64KB chunks for better performance if not chunk: break self.wfile.write(chunk) except Exception as e: self.send_error(500, f"Error serving file: {str(e)}") def prompt_for_directories(): """Prompt the user to enter multiple directories to serve.""" directories = {} print("\nDirectory Mapping Setup") print("----------------------") print("You'll map URL paths to directories on your file system.") print("Example: /media -> C:\\Videos") default_dir = os.getcwd() print(f"\nCurrent directory: {default_dir}") add_default = input("Add current directory as root path '/'? (y/n): ").strip().lower() if add_default == 'y': directories['/'] = default_dir print(f"Added mapping: / -> {default_dir}") if os.name == 'nt': print("\nAvailable drives:") available_drives = [] for letter in range(ord('A'), ord('Z')+1): drive = chr(letter) + ":\\" if os.path.exists(drive): available_drives.append(drive) print(f" {drive}") if available_drives: add_drives = input("\nAdd all available drives? (y/n): ").strip().lower() if add_drives == 'y': for drive in available_drives: letter = drive[0].upper() url_path = f"/{letter}" directories[url_path] = drive print(f"Added mapping: {url_path} -> {drive}") while True: print("\nAdd another directory mapping? (Enter 'q' to finish)") url_path = input("URL path (e.g., /media) or 'q' to finish: ").strip() if url_path.lower() == 'q' or not url_path: break if not url_path.startswith('/'): url_path = '/' + url_path fs_path = input("File system path: ").strip() if os.path.isdir(fs_path): directories[url_path] = os.path.abspath(fs_path) print(f"Added mapping: {url_path} -> {fs_path}") else: print(f"Error: '{fs_path}' is not a valid directory.") if not directories: print("\nNo directories specified. Using current directory as root.") directories['/'] = default_dir return directories class ThreadingHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): """HTTP Server that handles each request in a new thread""" daemon_threads = True def run_server(port, directories=None): """Run the HTTP server on the specified port.""" global PORT, DIRECTORIES PORT = port if directories is None: DIRECTORIES = prompt_for_directories() else: DIRECTORIES = directories print("\nDirectory Mappings:") for url_path, fs_path in DIRECTORIES.items(): print(f" {url_path} -> {fs_path}") server_address = ('0.0.0.0', port) handler = RangeHTTPHandler httpd = ThreadingHTTPServer(server_address, handler) ip_list = get_local_ip_addresses() print(f"\nServer started on port {port}, binding to all interfaces (0.0.0.0)") print("🎵 HTTP Range requests enabled - supports media seeking/streaming") print("Multi-threading enabled - each connection handled in its own thread") print("Available on the following addresses:") for ip in ip_list: print(f" http://{ip}:{port}/") print("\nServer info available at: /server-info") print("Press Ctrl+C to stop the server.") try: httpd.serve_forever() except KeyboardInterrupt: print("\nServer stopped.") finally: httpd.server_close() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Multi-threaded HTTP Server with Range support for media streaming") parser.add_argument("-p", "--port", type=int, default=8000, help="Port to run the server on (default: 8000)") args = parser.parse_args() try: run_server(args.port) except Exception as e: print(f"Error starting server: {e}") sys.exit(1)