import gradio as gr import base64 import os import json import urllib.request import urllib.error import time # ---------------- CONFIGURATION ---------------- # # Updated for Koboldcpp on port 5001. # We append /v1 because the script uses OpenAI-compatible endpoints (/v1/chat/completions). API_URL = "http://127.0.0.1:5001/v1" API_KEY = "" # No API Key needed for local Koboldcpp # Global variable to hold the detected model ID CURRENT_MODEL_ID = "koboldcpp/Qwen3-VL-30B-A3B-Thinking-UD-Q5_K_XL" # Default/Fallback # ---------------- CSS ---------------- # CSS = """ /* NUCLEAR OPTION: HIDE DEFAULT GRADIO LOADERS */ .clean-component .loading, .clean-component .loading-label, .clean-component .pending, .clean-component .generating, .clean-component .wrap .status, .clean-component .status-tracker { display: none !important; opacity: 0 !important; visibility: hidden !important; transition: none !important; height: 0 !important; padding: 0 !important; margin: 0 !important; border: none !important; content: "" !important; } /* MONITOR STYLING */ .monitor-box { display: block !important; background-color: #1e1e2e; color: #cdd6f4; padding: 15px 20px; border-radius: 8px; border: 1px solid #45475a; font-family: 'Consolas', 'Monaco', monospace; margin-bottom: 10px; box-shadow: 0 4px 6px rgba(0,0,0,0.3); } .monitor-header { display: flex; justify-content: space-between; align-items: center; border-bottom: 1px solid #45475a; padding-bottom: 10px; margin-bottom: 10px; font-weight: bold; text-transform: uppercase; letter-spacing: 1px; } .monitor-title { color: #cd632a; /* MATCHING ORANGE ACCENT */ } .status-indicator { font-weight: bold; font-size: 0.9em; } .status-online { color: #a6e3a1; } .status-offline { color: #f38ba8; } .pipeline-mode { font-size: 1.1em; font-weight: bold; color: #89b4fa; margin-bottom: 5px; } .pairing-info { font-size: 0.9em; color: #a6adc8; display: flex; justify-content: space-between; } .progress-text { color: #a6e3a1; font-weight: bold; } /* CUSTOM STEALTH PROGRESS BAR */ .monitor-progress-track { background-color: #313244; height: 8px; border-radius: 4px; margin-top: 10px; overflow: hidden; } .monitor-progress-bar { background-color: #a6e3a1; height: 100%; width: 0%; transition: width 0.3s ease; } /* LIVE FEED DASHBOARD (3-Column Grid) */ .live-grid { display: grid; grid-template-columns: 1fr 1fr 1fr; gap: 10px; height: 300px; background-color: #181825; padding: 10px; border-radius: 8px; border: 1px solid #313244; } .live-col { background-color: #11111b; border: 1px dashed #585b70; border-radius: 6px; padding: 10px; display: flex; flex-direction: column; overflow: hidden; height: 100%; min-height: 0; } .live-header { font-family: 'Consolas', monospace; font-weight: bold; color: #89b4fa; border-bottom: 1px solid #45475a; margin-bottom: 8px; padding-bottom: 4px; text-transform: uppercase; font-size: 0.85em; display: flex; justify-content: space-between; flex-shrink: 0; } .live-content { font-family: 'Consolas', monospace; font-size: 0.75em; color: #a6e3a1; overflow-y: auto; flex-grow: 1; min-height: 0; white-space: pre-wrap; scrollbar-width: thin; scrollbar-color: #45475a #11111b; line-height: 1.4; padding-right: 5px; } .stat-tag { color: #cd632a; /* MATCHING ORANGE STATS */ font-size: 0.9em; } """ # ---------------- JSON SCHEMA ---------------- # EYE_SCHEMA = { "type": "json_schema", "json_schema": { "name": "visual_analysis_report", "strict": True, "schema": { "type": "object", "properties": { "1_rendering_physics": { "type": "object", "properties": { "medium_and_toolset": { "type": "string" }, "lighting_calculation": { "type": "string" }, "color_grading_palette": { "type": "string" }, "edge_and_line_quality": { "type": "string" }, "surface_artifacts": { "type": "string" } }, "required": ["medium_and_toolset", "lighting_calculation", "color_grading_palette", "edge_and_line_quality", "surface_artifacts"], "additionalProperties": False }, "2_scene_structure": { "type": "object", "properties": { "camera_angle": { "type": "string" }, "shot_type": { "type": "string" }, "compositional_balance": { "type": "string" }, "depth_and_focus": { "type": "string" } }, "required": ["camera_angle", "shot_type", "compositional_balance", "depth_and_focus"], "additionalProperties": False }, "3_semantic_content": { "type": "object", "properties": { "subject_identity": { "type": "string" }, "pose_and_action": { "type": "string" }, "expression_and_emotion": { "type": "string" }, "environment_setting": { "type": "string" }, "key_props": { "type": "string" } }, "required": ["subject_identity", "pose_and_action", "expression_and_emotion", "environment_setting", "key_props"], "additionalProperties": False }, "4_artistic_genre_label": { "type": "string" } }, "required": ["1_rendering_physics", "2_scene_structure", "3_semantic_content", "4_artistic_genre_label"], "additionalProperties": False } } } # ---------------- PROMPTS ---------------- # PROMPT_STYLE_DISTILLER = """ **TASK:** You are a Style Distillation Engine. **INPUT:** A full scene description. **GOAL:** Extract strictly the **Visual DNA** (Style, Medium, Lighting, Palette). **RULES:** 1. **STRIP** all Subjects, Actions, and Specific Objects (e.g., remove "man walking", "dog", "spaceship"). 2. **KEEP** only the aesthetic adjectives and technical rendering terms. 3. **OUTPUT:** A concise, comma-separated block of style tags. """ PROMPT_DREAMER_TRANSFER = """ You are The Dreamer, an Advanced Visual Synthesis Engine. **INPUTS:** 1. **VISUAL WIREFRAME:** (Subject/Action/Object). 2. **STYLE SHELL:** (Aesthetics/Physics). **TASK:** Render the **WIREFRAME** using exclusively the physics of the **STYLE SHELL**. **LOGIC:** * **Subject Integrity:** Keep the Subject/Action/Position/Pose from the Wireframe. * **Style Application:** Apply the lighting/texture of the Shell. * **Conflict:** Style colors override Subject colors. * **Positive Framing:** Describe what IS visible. """ PROMPT_DREAMER_STANDARD = """ **CONTEXT:** The input text is a "TL;DR" summary of a complex, high-fidelity image. It contains only the key aspects (the "Skeleton"). **YOUR GOAL:** Reverse-engineer the full-size, detailed description. Use input as a seed and reasoning to deduce and visualize full picture and only then describe it. **OUTPUT:** A vivid, detailed description of the result scene. Write as if you are seeing this image in 4K UHD HDR. """ PROMPT_REFINER = """ You are The Refiner. **TASK:** Synthesize inputs into a **Single, concise, high-density, Fluid Paragraph** of evocative visual prose. **RULES:** Descriptive, Cinematic, Vivid. No labels. **NO FLUFF:** Remove "you feel," "you see," metaphors, or emotional narration, or any metadata "— **End Scene** —". Describe ONLY what is visually present. **CONSTRAINT:** Limit output to approx. 200 words (300 tokens). """ # ---------------- LOGIC ---------------- # def check_connection_and_model(): """Checks API connectivity and auto-updates the CURRENT_MODEL_ID""" global CURRENT_MODEL_ID try: # Query the models endpoint to check connection and get the active model name with urllib.request.urlopen(f"{API_URL}/models", timeout=0.5) as response: if response.getcode() == 200: try: data = json.loads(response.read().decode('utf-8')) # If the API returns a list of models, pick the first one if 'data' in data and len(data['data']) > 0: CURRENT_MODEL_ID = data['data'][0]['id'] except: pass # Keep previous or default if JSON fails but 200 OK return True except: pass return False def count_file_items(file_list): if not file_list: return 0, 0, "" total_items = 0 details = [] for f in file_list: try: filename = os.path.basename(f.name) ext = os.path.splitext(filename)[1].lower() if ext in ['.jpg', '.jpeg', '.png', '.webp', '.bmp']: total_items += 1 elif ext == '.txt': with open(f.name, "r", encoding="utf-8") as txt: lines = [l for l in txt if l.strip()] count = len(lines) total_items += count details.append(f"{filename} ({count} lines)") except: pass detail_str = ", ".join(details) if details else f"{len(file_list)} image(s)" return len(file_list), total_items, detail_str def update_ui_state(style_files, content_files, radio_mode, progress=0.0): # This function handles both the Monitor HTML update AND the locking of the Mode buttons # 1. Logic for disabling Mode buttons if Content exists has_content = (content_files is not None) and (len(content_files) > 0) mode_interactive = not has_content # Disable if content exists # 2. Monitor Logic is_online = check_connection_and_model() # Display the detected model name in the status line model_name_display = CURRENT_MODEL_ID.split('/')[-1] if CURRENT_MODEL_ID else "Unknown" if len(model_name_display) > 30: model_name_display = model_name_display[:27] + "..." conn_text = f"[ONLINE: {model_name_display}]" if is_online else "[OFFLINE]" conn_class = "status-online" if is_online else "status-offline" conn_html = f'{conn_text}' s_files, s_ops, s_det = count_file_items(style_files) c_files, c_ops, c_det = count_file_items(content_files) mode_text = "[WAITING] WAITING FOR INPUTS..." pair_text = "" if s_files > 0 and c_files == 0: if radio_mode == "Style DNA Only": mode_text = "[ANALYSIS] Extracting Style DNA" else: mode_text = "[ANALYSIS] Extracting Full Prompts" pair_text = f"Source: {s_det}. Total Operations: {s_ops}" elif s_files == 0 and c_files > 0: mode_text = "[CREATION] De-Summarization" pair_text = f"Remastering from: {c_det}. Total Operations: {c_ops}" elif s_files > 0 and c_files > 0: mode_text = "[CREATION] Semantic Style Transfer (Hybrid)" if c_ops == 1 and s_ops == 1: pair_text = "[SINGLE PAIR] 1 Content -> 1 Style" elif c_ops == 1 and s_ops > 1: pair_text = f"[STUDY MODE] 1 Content x {s_ops} Styles (Variations)" elif c_ops > 1 and s_ops == 1: pair_text = f"[BATCH MODE] {c_ops} Contents -> 1 Style (Unity)" elif c_ops == s_ops: pair_text = f"[ZIPPER MODE] {c_ops} pairs (1-to-1 sequential)" else: ops = max(c_ops, s_ops) pair_text = f"[LOOP MODE] Processing {ops} operations (cycling inputs)" # Generate Progress Bar HTML pct = int(progress * 100) prog_html = "" pct_text = "" if pct > 0: pct_text = f'[PROGRESS: {pct}%]' prog_html = f"""
""" monitor_html = f"""
Semantic Image Disassembler {conn_html}
{mode_text}
{pair_text} {pct_text}
{prog_html}
""" return monitor_html, gr.update(interactive=mode_interactive) def encode_image(image_path): with open(image_path, "rb") as image_file: return base64.b64encode(image_file.read()).decode('utf-8') # --- BARE METAL API ENGINE --- def send_api_request(messages, max_tokens=1000, temperature=0.7, response_format=None): global CURRENT_MODEL_ID url = f"{API_URL}/chat/completions" payload = { "model": CURRENT_MODEL_ID, # Uses the auto-detected model ID "messages": messages, "max_tokens": max_tokens, "temperature": temperature, "stream": False } if response_format: payload["response_format"] = response_format data = json.dumps(payload).encode('utf-8') req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'}) with urllib.request.urlopen(req) as response: result = json.loads(response.read().decode('utf-8')) content = result['choices'][0]['message']['content'] # Extract usage for TPS calculation usage = result.get('usage', {}) total_tokens = usage.get('total_tokens', 0) return content, total_tokens def analyze_with_schema(file_path): base64_img = encode_image(file_path) messages = [ {"role": "system", "content": "You are The Architect."}, {"role": "user", "content": [{"type": "text", "text": "Analyze."}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}}]} ] content, tokens = send_api_request(messages, max_tokens=2000, temperature=0.1, response_format=EYE_SCHEMA) return json.loads(content), tokens def send_text_request(messages, max_tokens=1000): return send_api_request(messages, max_tokens=max_tokens, temperature=0.7) def render_live_dashboard(eye_txt, dreamer_txt, refiner_txt, eye_stats="", dreamer_stats="", refiner_stats=""): def clean(text): if not text: return "..." return text.replace("<", "<").replace(">", ">").replace("\n", "
") return f"""
I. THE EYE (Analysis) {eye_stats}
{clean(eye_txt)}
II. THE DREAMER (Synthesis) {dreamer_stats}
{clean(dreamer_txt)}
III. THE REFINER (Polish) {refiner_stats}
{clean(refiner_txt)}
""" def run_pipeline(input_files, style_ref_files, extract_mode, p_dreamer_trans, p_dreamer_std, p_refiner): # Dummy call to ensure mode button state is maintained properly in output monitor_init, mode_update = update_ui_state(style_ref_files, input_files, extract_mode) if not check_connection_and_model(): yield "ERROR: Connection Lost.", None, None, monitor_init, "**OFFLINE**", mode_update return results_text = "" # Store list of generated file paths generated_files = [] dash_state = {"eye": "Waiting...", "dreamer": "Waiting...", "refiner": "Waiting..."} stats_state = {"eye": "", "dreamer": "", "refiner": ""} current_prog_val = 0.0 current_img_display = None def calc_stats(start_t, tokens): elapsed = time.time() - start_t if elapsed < 0.1: elapsed = 0.1 tps = tokens / elapsed return f"Avg. TPS {tps:.1f} | {elapsed:.1f} s" def yield_update(log_append=None, prog_val=None): nonlocal results_text, current_prog_val if log_append: results_text += log_append if prog_val is not None: current_prog_val = prog_val mon_html, m_upd = update_ui_state(style_ref_files, input_files, extract_mode, progress=current_prog_val) return ( results_text, generated_files, # Return list of files instead of single filename current_img_display, mon_html, render_live_dashboard( dash_state["eye"], dash_state["dreamer"], dash_state["refiner"], stats_state["eye"], stats_state["dreamer"], stats_state["refiner"] ), m_upd ) # ---------------- MODE 1: ANALYSIS ONLY ---------------- if style_ref_files and not input_files: s_files, total_ops, s_det = count_file_items(style_ref_files) completed_ops = 0 for file_obj in style_ref_files: filename = os.path.basename(file_obj.name) ext = os.path.splitext(filename)[1].lower() # Determine output text path based on image name txt_output_path = os.path.splitext(file_obj.name)[0] + ".txt" try: # --- IMAGE HANDLING --- if ext in ['.jpg', '.jpeg', '.png', '.webp', '.bmp']: loop_prog = completed_ops / total_ops dash_state["eye"] = f"Scanning {filename} physics..." stats_state["eye"] = "" # Reset stats current_img_display = file_obj.name yield yield_update(prog_val=loop_prog) t_start = time.time() data, tokens = analyze_with_schema(file_obj.name) stats_state["eye"] = calc_stats(t_start, tokens) if extract_mode == "Style DNA Only": physics = data.get("1_rendering_physics", {}) block = f"[STYLE DNA: {filename}]\nMEDIUM: {physics.get('medium_and_toolset')}\nLIGHTING: {physics.get('lighting_calculation')}\nCOLORS: {physics.get('color_grading_palette')}\n" dash_state["eye"] = f"Extraction Complete:\n{block}" # Write individual file with open(txt_output_path, "w", encoding="utf-8") as f: f.write(block) generated_files.append(txt_output_path) yield yield_update(f"{block}\n\n", prog_val=loop_prog + (0.9/total_ops)) else: json_name = f"{os.path.splitext(filename)[0]}_schema.json" with open(json_name, "w", encoding="utf-8") as jf: json.dump(data, jf, indent=4) generated_files.append(json_name) # Also add JSON to downloadable files manifest = ( f"GENRE: {data.get('4_artistic_genre_label')}\n" f"PHYSICS: {data.get('1_rendering_physics')}\n" f"STRUCTURE: {data.get('2_scene_structure')}\n" f"CONTENT: {data.get('3_semantic_content')}" ) dash_state["eye"] = f"Analysis Complete. JSON saved.\n{str(data.get('1_rendering_physics'))[:100]}..." dash_state["refiner"] = "Translating JSON to Prompt..." stats_state["refiner"] = "" yield yield_update(prog_val=loop_prog + (0.5/total_ops)) t_start_ref = time.time() refine_msg = [{"role": "system", "content": p_refiner}, {"role": "user", "content": f"TASK: Narratize this visual analysis data into a high-fidelity image prompt.\nDATA:\n{manifest}"}] full_prompt, r_tokens = send_text_request(refine_msg) full_prompt = full_prompt.replace("\n", " ").strip() stats_state["refiner"] = calc_stats(t_start_ref, r_tokens) dash_state["refiner"] = f"Done.\n{full_prompt[:50]}..." # Write individual file with open(txt_output_path, "w", encoding="utf-8") as f: f.write(full_prompt) generated_files.append(txt_output_path) yield yield_update(f"[FULL PROMPT: {filename}]\n{full_prompt}\n\n", prog_val=loop_prog + (0.99/total_ops)) completed_ops += 1 # --- TEXT HANDLING --- else: dash_state["eye"] = f"Reading text file {filename}..." current_img_display = None yield yield_update(prog_val=completed_ops / total_ops) with open(file_obj.name, "r", encoding="utf-8") as f: lines = [l.strip() for l in f if l.strip()] for line_idx, line_content in enumerate(lines): loop_prog = completed_ops / total_ops safe_content = line_content[:4000].strip() if not safe_content: continue # Unique path for line-based output line_txt_path = f"{os.path.splitext(file_obj.name)[0]}_L{line_idx+1}.txt" if extract_mode == "Style DNA Only": dash_state["eye"] = f"Distilling Style DNA from {filename} (Line {line_idx+1}/{len(lines)})..." stats_state["eye"] = "" yield yield_update(prog_val=loop_prog) t_start = time.time() msg = [{"role": "system", "content": PROMPT_STYLE_DISTILLER}, {"role": "user", "content": safe_content}] style_tags, tokens = send_text_request(msg) stats_state["eye"] = calc_stats(t_start, tokens) with open(line_txt_path, "w", encoding="utf-8") as f: f.write(style_tags) generated_files.append(line_txt_path) yield yield_update(f"[STYLE DNA: {filename} L{line_idx+1}]\n{style_tags}\n\n", prog_val=loop_prog + (0.9/total_ops)) else: dash_state["refiner"] = f"Refining {filename} (Line {line_idx+1}/{len(lines)})..." stats_state["refiner"] = "" yield yield_update(prog_val=loop_prog) t_start = time.time() msg = [{"role": "system", "content": p_refiner}, {"role": "user", "content": f"TASK: Refine these notes into a high-fidelity image prompt.\nINPUT:\n{safe_content}"}] full_prompt, tokens = send_text_request(msg) full_prompt = full_prompt.replace("\n", " ").strip() stats_state["refiner"] = calc_stats(t_start, tokens) with open(line_txt_path, "w", encoding="utf-8") as f: f.write(full_prompt) generated_files.append(line_txt_path) yield yield_update(f"[FULL PROMPT: {filename} L{line_idx+1}]\n{full_prompt}\n\n", prog_val=loop_prog + (0.99/total_ops)) completed_ops += 1 except Exception as e: dash_state["eye"] = f"ERROR: {str(e)}" yield yield_update(f"[Error processing {filename}: {str(e)}]\n\n") if ext in ['.jpg', '.jpeg', '.png', '.webp', '.bmp']: completed_ops += 1 yield yield_update(prog_val=1.0) return # ---------------- MODE 2: CREATION (Hybrid) ---------------- if not input_files and not style_ref_files: mon, m_upd = update_ui_state(None, None, extract_mode) yield "Waiting...", None, None, mon, render_live_dashboard("", "Waiting for inputs...", ""), m_upd return # 1. PREPARE STYLES all_styles = [] if style_ref_files: total_style_files = len(style_ref_files) for idx, f in enumerate(style_ref_files): filename = os.path.basename(f.name) dash_state["eye"] = f"Extracting Style DNA from {filename}..." stats_state["eye"] = "" current_style_prog = (idx / total_style_files) * 0.1 ext = os.path.splitext(filename)[1].lower() if ext in ['.jpg', '.jpeg', '.png', '.webp', '.bmp']: current_img_display = f.name else: current_img_display = None yield yield_update(prog_val=current_style_prog) if ext == ".txt": try: with open(f.name, "r", encoding="utf-8") as txt: all_styles.extend([l.strip() for l in txt if l.strip()]) except: pass elif ext in ['.jpg', '.jpeg', '.png', '.webp', '.bmp']: try: t_start = time.time() data, tokens = analyze_with_schema(f.name) stats_state["eye"] = calc_stats(t_start, tokens) physics = data.get("1_rendering_physics", {}) dna_block = f"MEDIUM: {physics.get('medium_and_toolset')}, LIGHTING: {physics.get('lighting_calculation')}, PALETTE: {physics.get('color_grading_palette')}, TEXTURE: {physics.get('surface_artifacts')}" all_styles.append(dna_block) except Exception as e: yield yield_update(f"[Error extracting style from {filename}: {e}]\n") if not all_styles: all_styles = [None] # 2. PREPARE CONTENT all_content = [] if input_files: for f in input_files: if f.name.endswith(".txt"): try: with open(f.name, "r") as txt: all_content.extend([(l.strip(), f.name) for l in txt if l.strip()]) except: pass else: all_content.append((f.name, f.name)) if not all_content: return # 3. PAIRING LOGIC total_ops = max(len(all_content), len(all_styles)) if len(all_content) == len(all_styles) else (len(all_styles) if len(all_content) == 1 else len(all_content)) run_mode = "batch" if len(all_content) == 1 and len(all_styles) > 1: run_mode = "study" elif len(all_content) > 1 and len(all_styles) == 1: run_mode = "batch" elif len(all_content) == len(all_styles): run_mode = "zipper" # 4. EXECUTION LOOP for i in range(total_ops): if run_mode == "study": cur_content = all_content[0] cur_style = all_styles[i] elif run_mode == "batch": cur_content = all_content[i] cur_style = all_styles[0] else: cur_content = all_content[i % len(all_content)] cur_style = all_styles[i % len(all_styles)] content_data, content_source = cur_content base_progress = 0.1 + ((i / total_ops) * 0.9) step_slice = (1 / total_ops) * 0.9 try: # PHASE 1: THE EYE manifest_text = "" ext = os.path.splitext(content_source)[1].lower() if ext in ['.jpg', '.jpeg', '.png', '.webp', '.bmp'] and content_data == content_source: dash_state["eye"] = f"Analyzing geometry of {os.path.basename(content_source)}..." stats_state["eye"] = "" current_img_display = content_source yield yield_update(prog_val=base_progress + (step_slice * 0.2)) t_start = time.time() data, tokens = analyze_with_schema(content_source) stats_state["eye"] = calc_stats(t_start, tokens) struct = data.get("2_scene_structure", {}) content = data.get("3_semantic_content", {}) manifest_text = f"SCENE STRUCTURE:\n- Camera: {struct.get('camera_angle')}\n- Composition: {struct.get('compositional_balance')}\n\nSEMANTIC CONTENT:\n- Subject: {content.get('subject_identity')}\n- Action: {content.get('pose_and_action')}\n- Setting: {content.get('environment_setting')}" dash_state["eye"] = manifest_text else: manifest_text = content_data current_img_display = None dash_state["eye"] = f"Text Input:\n{content_data[:100]}..." # PHASE 2: THE DREAMER dash_state["dreamer"] = "Synthesizing concept draft..." stats_state["dreamer"] = "" yield yield_update(prog_val=base_progress + (step_slice * 0.5)) if cur_style: msgs = [{"role": "system", "content": PROMPT_DREAMER_TRANSFER}, {"role": "user", "content": f"VISUAL WIREFRAME:\n{manifest_text}\n\nSTYLE SHELL:\n{cur_style}"}] else: msgs = [{"role": "system", "content": PROMPT_DREAMER_STANDARD}, {"role": "user", "content": f"Here is the compressed manifest:\n{manifest_text}"}] t_start = time.time() dream_stream, tokens = send_text_request(msgs, max_tokens=1500) stats_state["dreamer"] = calc_stats(t_start, tokens) dash_state["dreamer"] = dream_stream yield yield_update(prog_val=base_progress + (step_slice * 0.8)) # PHASE 3: THE REFINER dash_state["refiner"] = "Polishing final prose..." stats_state["refiner"] = "" yield yield_update(prog_val=base_progress + (step_slice * 0.9)) t_start = time.time() refine_msg = [{"role": "system", "content": PROMPT_REFINER}, {"role": "user", "content": f"DRAFT:\n{dream_stream}"}] final_prompt, tokens = send_text_request(refine_msg) final_prompt = final_prompt.replace("\n", " ").strip() stats_state["refiner"] = calc_stats(t_start, tokens) dash_state["refiner"] = final_prompt # Save unique file for Creation mode # Use base filename and index to prevent collision in loops creation_txt_path = f"{os.path.splitext(content_source)[0]}_v{i}.txt" with open(creation_txt_path, "w", encoding="utf-8") as f: f.write(final_prompt) generated_files.append(creation_txt_path) # End of item yield yield_update(f"{final_prompt}\n\n", prog_val=0.1 + ((i + 1) / total_ops) * 0.9) except Exception as e: dash_state["refiner"] = f"ERROR: {str(e)}" yield yield_update(f"Error: {str(e)}\n") # Final 100% yield yield_update(prog_val=1.0) # ---------------- UI ---------------- # with gr.Blocks(title="Semantic Image Disassembler (Koboldcpp)") as demo: # Initialize UI Components monitor_html, _ = update_ui_state(None, None, "Style DNA Only") monitor = gr.HTML(value=monitor_html) live_status = gr.HTML(value=render_live_dashboard("", "Waiting for input...", ""), elem_classes="clean-component live-feed-box") with gr.Row(): with gr.Column(scale=1): style_ref_files = gr.File(file_count="multiple", label="Styles") extract_mode = gr.Radio(["Style DNA Only", "Full Prompt Extraction"], label="Mode", value="Style DNA Only", elem_classes="clean-component") # CHANGED: Removed fixed height to support vertical aspect ratios active_preview = gr.Image(label="Active Monitor", interactive=False, elem_classes="clean-component") with gr.Column(scale=2): file_input = gr.File(file_count="multiple", label="Content") submit_btn = gr.Button("START PROCESSING", variant="primary", elem_classes="clean-component") output_log = gr.Textbox(label="Prompts", lines=10, autoscroll=True, elem_classes="clean-component") download_file = gr.File(label="Download", elem_classes="clean-component", height=50, interactive=False) with gr.Accordion("System Prompts", open=False): p_dreamer_trans = gr.Textbox(value=PROMPT_DREAMER_TRANSFER, label="Dreamer Transfer") p_dreamer_std = gr.Textbox(value=PROMPT_DREAMER_STANDARD, label="Dreamer Standard") p_refiner = gr.Textbox(value=PROMPT_REFINER, label="Refiner") # LISTENERS def wrap_update(s, c, m): return update_ui_state(s, c, m) for comp in [style_ref_files, file_input, extract_mode]: comp.change(fn=wrap_update, inputs=[style_ref_files, file_input, extract_mode], outputs=[monitor, extract_mode]) # EVENT LISTENER submit_btn.click( fn=run_pipeline, inputs=[file_input, style_ref_files, extract_mode, p_dreamer_trans, p_dreamer_std, p_refiner], outputs=[output_log, download_file, active_preview, monitor, live_status, extract_mode], show_progress="hidden" ) if __name__ == "__main__": demo.launch(inbrowser=True, server_port=7861, css=CSS)