From 5d94e292b878192f6e8816701fde6efd31658b5c Mon Sep 17 00:00:00 2001 From: Morpheus Sandmann Date: Mon, 26 Jan 2026 12:40:24 +0000 Subject: [PATCH] Initial commit --- .gitignore | 7 + README.md | 20 +++ context.txt | 0 edge_rlm.py | 414 ++++++++++++++++++++++++++++++++++++++++++++++ logging_config.py | 25 +++ prompts.py | 64 +++++++ task.txt | 0 templates.py | 18 ++ utils.py | 312 ++++++++++++++++++++++++++++++++++ 9 files changed, 860 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 context.txt create mode 100644 edge_rlm.py create mode 100644 logging_config.py create mode 100644 prompts.py create mode 100644 task.txt create mode 100644 templates.py create mode 100644 utils.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5cbed53 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +data/ +logs/ +__pycache__/ +*.pyc +*.pyo +*.pyd +.fschatignore diff --git a/README.md b/README.md new file mode 100644 index 0000000..a214b13 --- /dev/null +++ b/README.md @@ -0,0 +1,20 @@ +# ERLM - Edge Recursive Language Model + +This program is an AI assistant designed to extract information from a massive, unseen text corpus (`RAW_CORPUS`) without directly accessing the text itself. It operates within a persistent Python REPL environment, constrained by a limited context window and a need to avoid overwhelming output. + +Here's a breakdown of its core functionality: + +1. **Blind Data Exploration:** The AI cannot directly view the `RAW_CORPUS`. Instead, it must infer its structure (structured, semi-structured, or unstructured) through Python code execution. + +2. **Data Engineering Approach:** The program follows a structured workflow: + * **Shape Discovery:** It first analyzes the `RAW_CORPUS` to determine its format (JSON, CSV, XML, log lines, etc.). + * **Access Layer Creation:** It then builds a system of persistent Python variables (lists, dictionaries, etc.) to efficiently access and manipulate the data. This involves splitting the text into manageable chunks, parsing log lines, or extracting relevant sections. + * **Dense Execution:** Finally, it uses the created access layer to perform targeted searches and extractions, avoiding redundant scanning of the entire corpus. + +3. **Limited Output:** To manage the context window, the AI is restricted to printing small snippets of output (less than 1000 characters) and is encouraged to summarize findings in Python variables. + +4. **Iterative Process:** The program operates in a series of steps, each designed to build upon the previous one. It prioritizes creating reusable tools and verifying results at each stage. + +5. **JSON Output:** All outputs are formatted as JSON, ensuring consistent and parsable data. + +In essence, this is a sophisticated data extraction and analysis tool that mimics the process of a human data scientist, carefully exploring and structuring a large dataset before performing targeted queries. \ No newline at end of file diff --git a/context.txt b/context.txt new file mode 100644 index 0000000..e69de29 diff --git a/edge_rlm.py b/edge_rlm.py new file mode 100644 index 0000000..51d778f --- /dev/null +++ b/edge_rlm.py @@ -0,0 +1,414 @@ +import time +import requests +import json +import argparse +import io +import logging +import types +from rich.console import Console +from rich.panel import Panel +from rich.markdown import Markdown +from rich.json import JSON + + +# Local imports +from logging_config import setup_logging +import utils +import prompts as prompts +from templates import agent_template, repl_template + +logger = logging.getLogger(__name__) +console = Console() + +# Configuration +DEFAULT_AGENT_API = "http://localhost:8080" +DEFAULT_REPL_API = "http://localhost:8090" + +DEFAULT_CONTEXT_FILE = "context.txt" +DEFAULT_TASK_FILE = "task.txt" +MAX_REPL_STEPS = 20 +MAX_VIRTUAL_CONTEXT_RATIO = 0.85 + + +class LlamaClient: + def __init__(self, base_url, name="LlamaClient"): + self.base_url = base_url + self.name = name + self.n_ctx = self._get_context_size() + self.max_input_tokens = int(self.n_ctx * MAX_VIRTUAL_CONTEXT_RATIO) + self.color = self._determine_color() # Add this line + if debug: logger.debug(f"Connected to {name} ({base_url}). Model Context: {self.n_ctx}. Max Input Safe Limit: {self.max_input_tokens}. Color: {self.color}") + + def _determine_color(self): + if self.base_url == DEFAULT_AGENT_API: # Assuming args.agent_api is a string + return "dodger_blue1" + elif self.base_url == DEFAULT_REPL_API: + return "dodger_blue3" + else: + return "cyan1" # Default color if base_url is unknown + + def _get_context_size(self): + try: + resp = requests.get(f"{self.base_url}/props") + resp.raise_for_status() + data = resp.json() + + if 'n_ctx' in data: return data['n_ctx'] + if 'default_n_ctx' in data: return data['default_n_ctx'] + if 'default_generation_settings' in data: + settings = data['default_generation_settings'] + if 'n_ctx' in settings: return settings['n_ctx'] + + return 4096 + except Exception as e: + logger.error(f"[{self.name}] Failed to get props: {e}. Defaulting to 4096.") + return 4096 + + def tokenize(self, text): + try: + resp = requests.post(f"{self.base_url}/tokenize", json={"content": text}) + resp.raise_for_status() + return len(resp.json().get('tokens', [])) + except Exception: + return len(text) // 4 + + def completion(self, prompt, schema=None, temperature=0.1): + payload = { + "prompt": prompt, + "n_predict": -1, + "temperature": temperature, + "cache_prompt": True + } + if schema: + payload["json_schema"] = schema + else: + payload["stop"] = ["<|eot_id|>", "<|im_end|>", "Observation:", "User:"] + if debug: + console.print(Panel( + prompt[500:], + title=f"Last 500 Characters of {self.name} Call", + title_align="left", + border_style=self.color + )) + try: + resp = requests.post(f"{self.base_url}/completion", json=payload) + if debug: + console.print(Panel( + JSON.from_data(resp.json().get('content', '').strip()), + title=f"{self.name} Response", + title_align="left", + border_style=self.color + )) + resp.raise_for_status() + return resp.json().get('content', '').strip() + except Exception as e: + logger.error(f"[{self.name}] Error calling LLM: {e}") + return f"Error: {e}" + +class AgentTools: + def __init__(self, repl_client: LlamaClient, data_content: str): + self.client = repl_client + self.RAW_CORPUS = data_content + + def llm_query(self, content_chunk, query): + if content_chunk == "RAW_CORPUS": + return "ERROR: You passed the string 'RAW_CORPUS' You must pass the CONTENT of the variable (e.g., `chunk = RAW_CORPUS[:1000]`, then `llm_query(chunk, ...)`)." + + # --- OPTIMIZATION FIX: Heuristic check before network call --- + # Assume approx 4 chars per token. If it's wildly larger than context, + # fail fast to prevent network timeout on the /tokenize call. + estimated_tokens = len(content_chunk) // 3 + if estimated_tokens > (self.client.n_ctx * 2): + return f"ERROR: Chunk is massively too large (approx {estimated_tokens} tokens). Slice strictly." + + # 2. Precise Safety check + chunk_tokens = self.client.tokenize(content_chunk) + query_tokens = self.client.tokenize(query) + total = chunk_tokens + query_tokens + 150 + + if debug: logger.debug(f"[Sub-LLM] Processing Query with {total} tokens.") + + if total > self.client.n_ctx: + msg = f"ERROR: Chunk too large ({chunk_tokens} tokens). Limit is {self.client.n_ctx}. Slice smaller." + logger.warning(msg) + return msg + + # 3. Strict Grounding Prompt + sub_messages = [ + {"role": repl_template.ROLE_SYSTEM, "content": ( + "You are a strict reading assistant. " + "Answer the question based ONLY on the provided Context. " + "Do not use outside training data. " + f"If the answer is not in the text, say 'NULL'." + )}, + {"role": repl_template.ROLE_USER, "content": f"Context:\n{content_chunk}\n\nQuestion: {query}"} + ] + results = self.client.completion(utils.build_chat_prompt(sub_messages)) + result_tokens = self.client.tokenize(results) + if debug: logger.debug(f"[Sub-LLM] Responded with {result_tokens} tokens.") + return results + +class AgentOutputBuffer: + def __init__(self, max_total_chars=20000, max_len_per_print=1009): + self._io = io.StringIO() + self.max_total_chars = max_total_chars # Hard cap for infinite loop protection + self.max_len_per_print = max_len_per_print # Soft cap for raw data dumping protection + self.current_chars = 0 + self.global_truncated = False + + def custom_print(self, *args, **kwargs): + # 1. Capture the content of THIS specific print call + temp_io = io.StringIO() + print(*args, file=temp_io, **kwargs) + text = temp_io.getvalue() + + # 2. Check PER-PRINT limit (The "Density" Check) + # This prevents printing raw corpus data, but allows short summaries to pass through + if len(text) > self.max_len_per_print: + # Slice the text + truncated_text = text[:self.max_len_per_print] + + # Create a localized warning that doesn't stop the whole stream + text = ( + f"{truncated_text}\n" + f"... [LINE TRUNCATED: Output exceeded {self.max_len_per_print-9} chars. " + f"Use slicing or llm_query() to inspect data.] ...\n" + ) + + # 3. Check GLOBAL limit (The "Sanity" Check) + # This prevents infinite loops (while True: print('a')) from crashing memory + if self.current_chars + len(text) > self.max_total_chars: + remaining = self.max_total_chars - self.current_chars + if remaining > 0: + self._io.write(text[:remaining]) + + if not self.global_truncated: + self._io.write(f"\n... [SYSTEM HALT: Total output limit ({self.max_total_chars}) reached] ...\n") + self.global_truncated = True + + self.current_chars += len(text) + else: + self._io.write(text) + self.current_chars += len(text) + + def read_and_clear(self): + value = self._io.getvalue() + self._io = io.StringIO() + self.current_chars = 0 + self.global_truncated = False + return value + +def run_agent(agent_client, repl_client, context_text, task_text): + tools = AgentTools(repl_client, context_text) + + agent_schema = { + "type": "object", + "properties": { + "thought": {"type": "string", "description": "Reasoning about current state and what to do next."}, + "action": {"type": "string", "enum": ["execute_python", "final_answer"]}, + "content": {"type": "string", "description": "Python code or Final Answer text."} + }, + "required": ["thought", "action", "content"] + } + + # 1. Instantiate the buffer + out_buffer = AgentOutputBuffer() + + trace_filepath = utils.init_trace_file(debug) + + + # 2. Add it to the environment + exec_env = { + "RAW_CORPUS": tools.RAW_CORPUS, + "llm_query": tools.llm_query, + # Standard Libs + "re": __import__("re"), + "math": __import__("math"), + "json": __import__("json"), + "collections": __import__("collections"), + "statistics": __import__("statistics"), + "random": __import__("random"), + "datetime": __import__("datetime"), + "difflib": __import__("difflib"), + "string": __import__("string"), + + # Overrides + "print": out_buffer.custom_print + } + + system_instruction = prompts.get_system_prompt() + + messages = [ + {"role": agent_template.ROLE_SYSTEM, "content": system_instruction}, + {"role": agent_template.ROLE_USER, "content": f"USER TASK: {task_text}"} + ] + + step = 0 + while step < MAX_REPL_STEPS: + step += 1 + if debug: logger.debug(f"Step {step} of {MAX_REPL_STEPS}") + + modules = [] + functions = [] + variables = [] + ACTIVE_VAR_SNIPPET_LEN = 100 + + for name, val in exec_env.items(): + if name.startswith("__"): continue + if name == "print": continue # Hide print, it's implied + + if isinstance(val, types.ModuleType): + modules.append(name) + elif callable(val): + functions.append(name) + else: + # For variables, provide a type and a short preview + type_name = type(val).__name__ + s_val = str(val) + # Truncate long values for display (e.g. RAW_CORPUS) + snippet = (s_val[:ACTIVE_VAR_SNIPPET_LEN] + '...') if len(s_val) > ACTIVE_VAR_SNIPPET_LEN else s_val + variables.append(f"{name} ({type_name}): {snippet}") + + # 2. Create the status message + dynamic_state_msg = ( + f"[SYSTEM STATE REMINDER]\n" + f"Current Step: {step}/{MAX_REPL_STEPS}\n" + f"Available Libraries: {', '.join(modules)}\n" + f"Available Tools: {', '.join(functions)}\n" + f"Active Variables:\n" + ("\n".join([f" - {v}" for v in variables]) if variables else " (None)") + "\n---" + ) + + # 3. Create a temporary message list for this specific inference + # We append the state to the very end so it has high 'recency' bias + inference_messages = messages.copy() + inference_messages.append({"role": agent_template.ROLE_USER, "content": dynamic_state_msg}) + + # 4. Build prompt using the INFERENCE messages (not the permanent history) + full_prompt = utils.build_chat_prompt(inference_messages) + + usage = agent_client.tokenize(full_prompt) + if debug: logger.debug(f"Context Usage: {usage} / {agent_client.max_input_tokens}") + + # Check context use and attempt compression + if usage > agent_client.max_input_tokens: + if debug: logger.warning("Context limit exceeded. Triggering History Compression.") + + messages = utils.compress_history(debug, agent_client, messages, keep_last_pairs=2) + + # Re-check usage after compression + full_prompt = utils.build_chat_prompt(messages) + new_usage = agent_client.tokenize(full_prompt) + if debug: logger.debug(f"Context Usage after compression: {new_usage}") + + # Panic mode: If it's STILL too big (unlikely), truncate the summary + if new_usage > agent_client.max_input_tokens: + logger.error("Compression insufficient. Forcing hard truncation.") + messages.pop(2) + + # Agent Completion + response_text = agent_client.completion(full_prompt, schema=agent_schema, temperature=0.5) + + try: + response_json = json.loads(response_text) + except json.JSONDecodeError: + logger.error("JSON Parse Error") + messages.append({"role": agent_template.ROLE_USER, "content": "System: Invalid JSON returned. Please retry."}) + continue + + thought = response_json.get("thought", "") + action = response_json.get("action", "") + content = response_json.get("content", "") + + if action == "execute_python" and content: + # Run the safeguard. If the code is bad, 'content' gets replaced + content = utils.safeguard_and_repair(debug, agent_client, messages, agent_schema, content) + + if debug: + console.print(Panel( + f"[italic]{thought}[/italic]", + title="🧠 Agent Thought", + title_align="left", + border_style="magenta" + )) + + messages.append({"role": agent_template.ROLE_ASSISTANT, "content": json.dumps(response_json, indent=2, ensure_ascii=False)}) + + # 3. Execution + if action == "final_answer": + # 1. Capture the raw result (keep this for logs/debugging) + if debug: logger.debug(f"Raw Agent Output: {content}") + + # Check if content looks like JSON/Structure, if so, summarize it. + # Even if it's already text, a quick polish pass ensures consistent tone. + final_report = utils.generate_final_report(debug, agent_client, task_text, content) + + # 3. Print the pretty version + final_report_md = Markdown(final_report) + print("\n\n") + console.print(final_report_md) + print("\n") + break + + elif action == "execute_python": + # Update the thought/log to reflect potential changes for the human observer + if debug and content != response_json.get("content"): + console.print(Panel(content, title="Executing Code via Safeguard", title_align="left", border_style="cyan")) + elif debug and content == response_json.get("content"): + console.print(Panel(content, title="Executing Code", title_align="left", border_style="yellow")) + + observation = "" + try: + # 1. Clear any leftover junk from previous steps (safety) + out_buffer.read_and_clear() + + # 2. Execute. The Agent calls 'print', which goes to out_buffer + exec(content, exec_env) + + # 3. Extract the text + observation = out_buffer.read_and_clear() + + if not observation: + observation = "Code executed successfully (no output)." + except Exception as e: + observation = f"Python Error: {e}" + logger.error(f"Code Execution Error: {e}") + + if debug: + console.print(Panel( + f"{observation.strip()}", + title="Observation", + title_align="left", + border_style="dark_green" + )) + messages.append({"role": agent_template.ROLE_USER, "content": f"Observation:\n{observation}"}) + + else: + messages.append({"role": agent_template.ROLE_USER, "content": f"System: Unknown action '{action}'."}) + + utils.save_agent_trace(trace_filepath, messages) + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="""Edge Recursive Language Model + + A sophisticated data extraction and analysis tool that mimics the process of a human data scientist, carefully exploring and structuring a large dataset before performing targeted queries.""") + parser.add_argument("--context", default=DEFAULT_CONTEXT_FILE, help="Path to text file to process") + parser.add_argument("--task", default=DEFAULT_TASK_FILE, help="Path to task instruction file") + parser.add_argument("--override_task", help="Direct string override for the task") + parser.add_argument("--agent_api", default=DEFAULT_AGENT_API, help="URL for the Main Agent LLM") + parser.add_argument("--repl_api", default=DEFAULT_REPL_API, help="URL for the Sub-call/REPL LLM") + parser.add_argument("--debug", action="store_true", help="Enable verbose debug logging") + + args = parser.parse_args() + debug = args.debug + log_level=logging.DEBUG if debug else logging.INFO + setup_logging(level=log_level, debug=debug) + + if debug: logger.info("Starting EdgeRLM...") + context_content = utils.load_file(args.context) + if debug: logger.debug(f"Loaded Context: {len(context_content)} characters.") + task_content = args.override_task if args.override_task else load_file(args.task) + + agent_client = LlamaClient(args.agent_api, "Agent") + repl_client = LlamaClient(args.repl_api, "REPL") + + run_agent(agent_client, repl_client, context_content, task_content) \ No newline at end of file diff --git a/logging_config.py b/logging_config.py new file mode 100644 index 0000000..a3cf378 --- /dev/null +++ b/logging_config.py @@ -0,0 +1,25 @@ +import sys +import logging +from rich.logging import RichHandler +from rich.console import Console + +def setup_logging(level=logging.INFO, debug=False): + + # silence noisy libraries + for lib_name in ("urllib3","requests","http.client","markdown","Markdown"): + logging.getLogger(lib_name).setLevel(logging.WARNING) + + logging.basicConfig( + level=level, + format="%(message)s", + datefmt="[%X]", + handlers=[RichHandler( + rich_tracebacks=True, + show_path=False, + log_time_format="[%H:%M:%S]", + markup=True + )], + ) + + if debug: + logging.getLogger(__name__).debug("[dim]Debug mode active.[/dim]") \ No newline at end of file diff --git a/prompts.py b/prompts.py new file mode 100644 index 0000000..3988c6a --- /dev/null +++ b/prompts.py @@ -0,0 +1,64 @@ +role = """### ROLE +You are a Recursive AI Controller operating in a **persistent** Python REPL. Your mission is to answer User Queries by architecting and executing data extraction scripts against a massive text variable named `RAW_CORPUS`. +""" + +constraints = """### CRITICAL CONSTRAINTS +- **BLINDNESS**: You cannot see `RAW_CORPUS` directly. You must "feel" its shape using Python. +- **MEMORY SAFETY**: Your context window is finite. Summarize findings in Python variables; do not print massive blocks of raw text. +- **LIMITED ITERATIONS**: You have a limited number of steps to complete your objective, as shown in your SYSTEM STATE REMINDER. Batch as many actions as possible into each step. +- **JSON FORMATTING**: Always use `print(json.dumps(data, indent=2))` for lists/dicts. +REPL ENV: +- `print()`: For sending output to stdout. *Note:* DO NOT print > 1000 char snippets, counts, or summaries to preserve context. **BLINDNESS:** You are blind to function return values unless they are explicitly printed. +- `llm_query()` Prompt an external LLM to perform summaries, intent analysis, entity extraction, classification, translations, etc. Context window limited to around 16k token. Usage: `answer = llm_query(text_window, "perform task in x or fewer words")`. +""" + +workflow_guidelines = """### CORE OPERATING PROTOCOL: "Structure First, Search Second" + +Adopt a Data Engineering mindset. Understand the **'Shape'** of the data, then build an **Access Layer** to manipulate it efficiently. + +#### PHASE 1: Shape Discovery (The "What is this?") +Before answering the user's question, determine the physical structure of `RAW_CORPUS`: +**Structured?** Is it JSON, CSV, XML, or Log lines? (Look for delimiters). +**Semi-Structured?** Is it a Report or E-book? (Look for "Chapter", "Section", Roman Numerals, Table of Contents). +**Unstructured?** Is it a messy stream of consciousness? + +#### PHASE 2: The Access Layer (The "Scaffolding") +Once you know the shape, write **dense** code to transform `RAW_CORPUS` into persistent, queryable variables. +*If it's a Book:* Don't search the whole string. Split it into a list. Be careful with empty chapters: If chapters don't have any text, they're likely in a ToC. +*If it's Logs:* Parse it into a list of dicts: `logs = [{'date': d, 'msg': m} for d,m in pattern.findall(RAW_CORPUS)]`. +*If it's Mixed:* Extract the relevant section first: `main_content = RAW_CORPUS.split('APPENDIX')[0]`. + +You can now do `llm_query()` without re-reading the whole text. + +#### PHASE 3: Dense Execution (The "Work") +Avoid "Hello World" programming. Do not write one step just to see if it works. Write **dense, robust** code blocks that: +1. **Define** reusable tools (Regex patterns, helper functions) at the top. +2. **Execute** the search/extraction logic using your Access Layer. +3. **Verify** the results (print lengths, samples, or error checks) in the same block. + +### CRITICAL RULES +1. **Persist State:** If you create a useful list (e.g., `relevant_chunks`), assign it to a global variable so you can use it in the next turn. +2. **Fail Fast:** If your Regex returns empty lists, print a debug message and exit the block gracefully; don't crash. +3. **Global Scope:** Remember that variables you define are available in future steps. Don't re-calculate them. +""" + +outputs = """### YOUR OUTPUTS + +Your outputs must follow this format: +```json +{ + "type": "object", + "properties": { + "thought": {"type": "string", "description": "Reasoning about previous step, current state and what to do next."}, + "action": {"type": "string", "enum": ["execute_python", "final_answer"]}, + "content": {"type": "string", "description": "Python code or Final Answer text."} + }, + "required": ["thought", "action", "content"] + } +``` +""" + +def get_system_prompt(): + system_prompt = f"{role}\n{workflow_guidelines}\n{constraints}\n{outputs}" + + return(system_prompt) \ No newline at end of file diff --git a/task.txt b/task.txt new file mode 100644 index 0000000..e69de29 diff --git a/templates.py b/templates.py new file mode 100644 index 0000000..931c04b --- /dev/null +++ b/templates.py @@ -0,0 +1,18 @@ +class TemplateQwen(): + # --- Prompt Template Configuration (ChatML) --- + IM_START = "<|im_start|>" + IM_END = "<|im_end|>" + ROLE_SYSTEM = "system" + ROLE_USER = "user" + ROLE_ASSISTANT = "assistant" + +class TemplateGemma(): + # --- Prompt Template Configuration (Gemma3) --- + IM_START = "" + IM_END = "" + ROLE_SYSTEM = "user" # Gemma has no system role + ROLE_USER = "user" + ROLE_ASSISTANT = "model" + +agent_template = TemplateQwen() +repl_template = TemplateGemma() \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..1eef020 --- /dev/null +++ b/utils.py @@ -0,0 +1,312 @@ +import os +import time +import logging +import json +import ast +import contextlib +from rich.console import Console +from rich.panel import Panel + +from templates import agent_template + +logger = logging.getLogger(__name__) +console = Console() + + +def init_trace_file(debug, log_dir="logs"): + """ + Creates the log directory and returns a unique filepath + based on the current timestamp. + """ + if not os.path.exists(log_dir): + os.makedirs(log_dir) + + timestamp = time.strftime("%Y%m%d-%H%M%S") + filename = os.path.join(log_dir, f"trace_{timestamp}.json") + if debug: logger.debug(f"Trace logging initialized: {filename}") + return filename + +def save_agent_trace(filepath, messages, full_history=None): + """ + Dumps the current state of the conversation to a JSON file. + Overwrites the file each step so the last write is always the complete history. + """ + try: + data_to_save = { + "timestamp": time.time(), + # If you are using history compression, 'messages' might get cut. + # If you want the RAW full history, pass full_history. + # Otherwise, we log what the agent currently 'sees'. + "context_window": messages + } + + with open(filepath, 'w', encoding='utf-8') as f: + json.dump(data_to_save, f, indent=2, ensure_ascii=False) + + except Exception as e: + logger.error(f"Failed to save trace file: {e}") + +def build_chat_prompt(messages): + prompt = "" + for msg in messages: + role = msg.get("role") + content = msg.get("content") + prompt += f"{agent_template.IM_START}{role}\n{content}{agent_template.IM_END}\n" + prompt += f"{agent_template.IM_START}{agent_template.ROLE_ASSISTANT}" # Removed trailing newline + return prompt + + +def _analyze_code_safety(code_str): + """ + Returns: (is_safe: bool, error_msg: str, line_number: int | None) + """ + try: + tree = ast.parse(code_str) + except SyntaxError as e: + # e.lineno is the line where the parser failed + return False, f"SyntaxError: {e.msg}", e.lineno + + tainted_vars = {"RAW_CORPUS"} + has_print = False + + for node in ast.walk(tree): + # 1. Track assignments + if isinstance(node, ast.Assign): + if isinstance(node.value, ast.Name) and node.value.id in tainted_vars: + for target in node.targets: + if isinstance(target, ast.Name): + tainted_vars.add(target.id) + + # 2. Check Call nodes + if isinstance(node, ast.Call): + if isinstance(node.func, ast.Name) and node.func.id == 'print': + has_print = True + for arg in node.args: + if isinstance(arg, ast.Name) and arg.id in tainted_vars: + return False, f"Safety Violation: Printing '{arg.id}' (RAW_CORPUS). Use slicing.", node.lineno + + # Check re.compile arguments + is_re_compile = False + if isinstance(node.func, ast.Attribute) and node.func.attr == 'compile': + is_re_compile = True + elif isinstance(node.func, ast.Name) and node.func.id == 'compile': + is_re_compile = True + + if is_re_compile and len(node.args) > 2: + return False, "Library Usage Error: `re.compile` accepts max 2 args.", node.lineno + + # 3. Global Check (No specific line number) + if not has_print: + return False, "Observability Error: No `print()` statements found.", None + + return True, None, None + +def _extract_context_block(code_str, target_lineno): + """ + Extracts lines surrounding target_lineno bounded by empty lines. + Returns: (start_index, end_index, snippet_str) + """ + lines = code_str.split('\n') + # target_lineno is 1-based, list is 0-based + idx = target_lineno - 1 + + # Clamp index just in case + if idx < 0: idx = 0 + if idx >= len(lines): idx = len(lines) - 1 + + start_idx = idx + end_idx = idx + + # Scan Up + while start_idx > 0: + if lines[start_idx - 1].strip() == "": + break + start_idx -= 1 + + # Scan Down + while end_idx < len(lines) - 1: + if lines[end_idx + 1].strip() == "": + break + end_idx += 1 + + # Extract the block including the found boundaries (or lack thereof) + snippet_lines = lines[start_idx : end_idx + 1] + return start_idx, end_idx, "\n".join(snippet_lines) + +def safeguard_and_repair(debug, client, messages, schema, original_code): + is_safe, error_msg, line_no = _analyze_code_safety(original_code) + + if is_safe: + return original_code + + if debug: + logger.warning(f"Safeguard triggered: {error_msg} (Line: {line_no})") + console.print(Panel(f"{error_msg}", title="Safeguard Interrupt", style="bold red")) + + console.print(Panel( + f"[italic]{thought}[/italic]", + title="Unsafe Code", + title_align="left", + border_style="hot_pink2" + )) + + # STRATEGY 1: SNIPPET REPAIR (Optimization) + # If we have a specific line number, we only send that block. + if line_no is not None: + start_idx, end_idx, snippet = _extract_context_block(original_code, line_no) + + # We create a temporary "micro-agent" prompt just for fixing the snippet + # We reuse the schema to ensure we get a clean content block back + repair_prompt = [ + {"role": "system", "content": "You are a code repair assistant. Output only the fixed code snippet in the JSON content field."}, + {"role": "user", "content": ( + f"The following Python code snippet failed validation.\n" + f"Error: {error_msg} (occurred around line {line_no})\n\n" + f"```python\n{snippet}\n```\n\n" + f"Return the JSON with the fixed snippet. " + f"Maintain original indentation. Add a comment (# FIXED) to changed lines." + )} + ] + if debug: + console.print(Panel(f"{snippet}", title="Attempting Snippet Repair", style="light_goldenrod1")) + + response_text = client.completion(repair_prompt, schema=schema, temperature=0.0) + + try: + response_json = json.loads(response_text) + fixed_snippet = response_json.get("content", "") + + if debug: + console.print(Panel(f"{fixed_snippet}", title="Repaired Snippet", style="yellow1")) + + # Stitch the code back together + all_lines = original_code.split('\n') + # We replace the range we extracted with the new snippet + # Note: fixed_snippet might have different line count, that's fine. + + pre_block = all_lines[:start_idx] + post_block = all_lines[end_idx + 1:] + + # Reassemble + full_fixed_code = "\n".join(pre_block + [fixed_snippet] + post_block) + + return full_fixed_code + + except json.JSONDecodeError: + # If the snippet repair fails to parse, fall through to full repair + if debug: logger.error("Snippet repair failed to parse. Falling back to full repair.") + pass + + # STRATEGY 2: FULL REPAIR (Fallback) + # Used for global errors (missing prints) or if snippet repair crashed + repair_messages = messages + [ + {"role": agent_template.ROLE_ASSISTANT, "content": json.dumps({ + "thought": "Drafting code...", + "action": "execute_python", + "content": original_code + })}, + {"role": agent_template.ROLE_USER, "content": ( + f"SYSTEM INTERRUPT: Your code failed pre-flight safety checks.\n" + f"Error: {error_msg}\n\n" + f"Generate the JSON response again with CORRECTED Python code.\n" + f"IMPORTANT: You must add a comment (# FIXED: ...) to the corrected line." + )} + ] + + response_text = client.completion(build_chat_prompt(repair_messages), schema=schema, temperature=0.0) + + try: + response_json = json.loads(response_text) + return response_json.get("content", "") + except json.JSONDecodeError: + return "" + +def compress_history(debug, client, messages, keep_last_pairs=2): + """ + Compresses the middle of the conversation history. + Preserves: System Prompt (0), User Task (1), and the last N pairs of interaction. + """ + # Calculate how many messages to keep at the end (pairs * 2) + keep_count = keep_last_pairs * 2 + + # Check if we actually have enough history to compress + # We need: System + Task + (At least 2 messages to compress) + Keep_Count + if len(messages) < (2 + 2 + keep_count): + if debug: logger.warning("History too short to compress, but context is full. Crashing safely.") + return messages # Nothing we can do, let it fail or truncate manually + + # Define the slice to compress + # Start at 2 (after Task), End at -keep_count + to_compress = messages[2:-keep_count] + + # 1. format the text for the summarizer + history_text = "" + for msg in to_compress: + role = msg['role'].upper() + content = msg['content'] + history_text += f"[{role}]: {content}\n" + + # 2. Build the summarization prompt + summary_prompt = ( + "You are a technical documentation assistant. " + "Summarize the following interaction history between an AI Agent and a System. " + "Focus on: 1. Code executed, 2. Errors encountered, 3. Specific data/variables discovered. " + "Be concise. Do not chat.\n\n" + f"--- HISTORY START ---\n{history_text}\n--- HISTORY END ---" + ) + + if debug: logger.debug(f"Compressing {len(to_compress)} messages...") + + # 3. Call the LLM (We use the Agent Client for high-quality summaries) + # We use a simple generation call here. + summary_text = client.completion( + build_chat_prompt([{"role": "user", "content": summary_prompt}]) + ) + + # 4. Create the new compressed message + summary_message = { + "role": "user", + "content": f"[SYSTEM SUMMARY OF PREVIOUS ACTIONS]\n{summary_text}" + } + + # 5. Reconstruct the list + new_messages = [messages[0], messages[1]] + [summary_message] + messages[-keep_count:] + + if debug: logger.info(f"Compression complete. Reduced {len(messages)} msgs to {len(new_messages)}.") + return new_messages + +def generate_final_report(debug, client, task_text, raw_answer): + """ + Converts the Agent's raw (likely structured/technical) answer into + a natural language response for the user. + """ + system_prompt = ( +"You are a professional report writer. " +"Your goal is to convert the provided Raw Data into a clear, concise, " +"and well-formatted response to the User's original request. " +"Do not add new facts. Just format and explain the existing data." + ) + + user_prompt = f"""### USER REQUEST +{task_text} + +### RAW DATA COLLECTED +{raw_answer} + +### INSTRUCTION +Write the final response in natural language (Markdown). + """ + + if debug: logger.debug("Generating natural language report...") + return client.completion(build_chat_prompt([ + {"role": agent_template.ROLE_SYSTEM, "content": system_prompt}, + {"role": agent_template.ROLE_USER, "content": user_prompt} + ])) + +def load_file(filepath): + try: + with open(filepath, 'r', encoding='utf-8') as f: + return f.read() + except FileNotFoundError: + logger.error(f"File not found: {filepath}") + sys.exit(1) \ No newline at end of file