import re import os from pathlib import Path import itertools def clean_dialogue(text): """Apply all cleaning steps to a single piece of dialogue""" # Handle name replacements case-insensitively text = re.sub(r'\[player\]', '', text, flags=re.IGNORECASE) text = re.sub(r'\[m_name\]', '', text, flags=re.IGNORECASE) # Parse sprite codes for emotional context EMOTION_MAPPING = { # Eyes emotions "e": "[neutral]", "w": "[surprised]", "s": "[excited]", "t": "[smug]", "c": "[intense]", "h": "[happy]", "d": "[sad]", "k": "[playful]", # wink "n": "[playful]", # wink "f": "[gentle]", "m": "[smug]", "g": "[smug]", # Eyebrows contribute to emotion "f": "[concerned]", # furrowed "u": "[interested]", # up "k": "[worried]", # knit "t": "[thoughtful]", # thinking # Mouth emotions "a": "[happy]", # smile "b": "[cheerful]", # open smile "c": "[smug]", # smirk "o": "[surprised]", # gasp "u": "[smug]", "w": "[excited]", # wide open "x": "[tense]", # grit teeth "p": "[pouty]", # pout "t": "[playful]" # triangle } expression_match = re.search(r'm\s+\d([a-zA-Z]+)', text) emotions = set() # Using a set to avoid duplicate emotions if expression_match: expression_code = expression_match.group(1) # Add emotions for each character in the expression code for char in expression_code: if char in EMOTION_MAPPING: emotions.add(EMOTION_MAPPING[char]) # Combine emotions, prioritizing stronger emotions emotion_indicator = ' '.join(sorted(emotions)) # Remove remaining brackets and special characters text = re.sub(r"\[.*?\]", "", text) # Remove any other bracketed text text = re.sub(r"\{.*?\}", "", text) # Remove {text} text = re.sub(r"\(.*?\)", "", text) # Remove (text) text = re.sub(r"\w{50,}", "", text) # Remove words longer than 50 chars text = re.sub(r"([?.!,])", r" \1 ", text) # Add space around punctuation text = re.sub(r"~", "", text) # Remove ~ # Clean up the text text = re.sub(r"'", "'", text) text = re.sub(r"'", "'", text) text = re.sub(r"'s", " is", text) text = re.sub(r"'m", " am", text) text = re.sub(r"'re", " are", text) text = re.sub(r"'ll", " will", text) text = re.sub(r"'ve", " have", text) text = re.sub(r"'d", " would", text) text = re.sub(r"\.\s*\.\s*\.", "...", text) text = re.sub(r"\s{2,}", " ", text) return (text.strip(), emotion_indicator) def extract_dialogue_from_rpy(rpy_content): lines = rpy_content.split('\n') lines = [line.strip() for line in lines] conversation_chunks = [] current_chunk = [] current_topic = None in_menu = False pending_choices = [] i = 0 while i < len(lines): line = lines[i].strip() # Extract topic from label if line.startswith('label '): current_topic = line.split(':')[0].replace('label ', '').strip() # Handle menu choices if line == "menu:": in_menu = True i += 1 continue if in_menu: if line.startswith('"') and line.endswith('":'): choice_text = line[1:-2] choice_text, _ = clean_dialogue(choice_text) if choice_text: pending_choices.append(f"Human: {choice_text}") elif not line.startswith((' ', '"')) and line: in_menu = False # Handle Monika's dialogue if 'm ' in line and '"' in line: match = re.search('m [^"]*"([^"]*)"', line) if match: dialogue, emotion = clean_dialogue(match.group(1)) if dialogue and len(dialogue.split()) > 2: # If we have pending choices, add them before Monika's response if pending_choices: current_chunk.extend(pending_choices) pending_choices = [] # Add emotion indicator if present if emotion: current_chunk.append(f"<|context|>{emotion}") current_chunk.append(f"Assistant: {dialogue}") # If this seems like the end of a conversation chunk if dialogue.endswith(('.', '!', '?')) and len(current_chunk) >= 2: if current_topic: current_chunk.insert(0, f"<|topic|>{current_topic}") conversation_chunks.append('\n'.join(current_chunk)) current_chunk = [] i += 1 # Add any remaining conversation if current_chunk: if current_topic: current_chunk.insert(0, f"<|topic|>{current_topic}") conversation_chunks.append('\n'.join(current_chunk)) return conversation_chunks def process_folder(input_folder, output_folder, combine=True): Path(output_folder).mkdir(parents=True, exist_ok=True) rpy_files = [f for f in os.listdir(input_folder) if f.endswith('.rpy')] print(f"Found {len(rpy_files)} .rpy files to process") all_conversations = [] for rpy_file in rpy_files: input_path = os.path.join(input_folder, rpy_file) output_path = os.path.join(output_folder, rpy_file.replace('.rpy', '_dialogue.txt')) print(f"Processing {rpy_file}...") try: with open(input_path, 'r', encoding='utf-8') as f: rpy_content = f.read() conversation_chunks = extract_dialogue_from_rpy(rpy_content) all_conversations.extend(conversation_chunks) with open(output_path, 'w', encoding='utf-8') as f: for chunk in conversation_chunks: f.write(chunk + '\n\n') print(f"Successfully processed {rpy_file}") except Exception as e: print(f"Error processing {rpy_file}: {str(e)}") if combine: combined_path = os.path.join(output_folder, "monika_dataset.txt") with open(combined_path, 'w', encoding='utf-8') as f: for chunk in all_conversations: f.write(chunk + '\n\n') print("\nDataset Statistics:") print(f"Total conversation chunks: {len(all_conversations)}") # Count turns and analyze topics total_turns = sum(chunk.count('\n') + 1 for chunk in all_conversations) topics = [re.search(r'<\|topic\|>(.+)', chunk).group(1) for chunk in all_conversations if re.search(r'<\|topic\|>(.+)', chunk)] print(f"Total dialogue turns: {total_turns}") print(f"Unique topics: {len(set(topics))}") # Word frequency analysis words = [] for chunk in all_conversations: for line in chunk.split('\n'): if line.startswith('Assistant: '): words.extend(line[11:].lower().split()) word_count = {} for word in words: word_count[word] = word_count.get(word, 0) + 1 word_count = dict(sorted(word_count.items(), key=lambda x: x[1], reverse=True)) print("\nTop 20 most common words in Monika's responses:") for word, count in list(word_count.items())[:20]: print(f"{word}: {count}") print("\nProcessing complete!") if __name__ == "__main__": input_folder = "To-convert" output_folder = "Converted" process_folder(input_folder, output_folder, combine=True)