import sys sys.path.append('../core') import os import shutil from datetime import datetime import json import re # Assuming these imports are correctly set up in your project structure from llm.llm import LLM # from prompt.constants import modeling_methods # This was unused in the original code from input.problem import problem_input from agent.problem_analysis import ProblemAnalysis # from agent.method_ranking import MethodRanking from agent.problem_modeling import ProblemModeling from agent.task_decompse import TaskDecompose from agent.task import Task from agent.create_charts import Chart from agent.coordinator import Coordinator from utils.utils import read_json_file, write_json_file, write_text_file, json_to_markdown from prompt.template import TASK_ANALYSIS_APPEND_PROMPT, TASK_FORMULAS_APPEND_PROMPT, TASK_MODELING_APPEND_PROMPT from utils.generate_paper import generate_paper_from_json # from utils.convert_format import markdown_to_latex # Uncomment if needed from prompt.constants import modeling_methods def mkdir_output(path): """Creates the necessary output directories.""" os.makedirs(path, exist_ok=True) os.makedirs(os.path.join(path, 'json'), exist_ok=True) os.makedirs(os.path.join(path, 'markdown'), exist_ok=True) os.makedirs(os.path.join(path, 'latex'), exist_ok=True) # Assuming latex might be used later os.makedirs(os.path.join(path, 'code'), exist_ok=True) os.makedirs(os.path.join(path, 'usage'), exist_ok=True) os.makedirs(os.path.join(path, 'intermediate'), exist_ok=True) # For intermediate coordinator state if needed class ModelingAgentSystem: """ Manages the step-by-step generation of a mathematical modeling report. Allows for granular control over section generation and tracks progress. """ def __init__(self, problem_path: str, config: dict, dataset_path: str, output_path: str, name: str): """ Initializes the Modeling Agent System. Args: problem_path: Path to the problem description file (e.g., JSON). config: Dictionary containing configuration parameters (model_name, rounds, etc.). dataset_path: Path to the dataset directory associated with the problem. output_path: Path where generated outputs (json, md, code, etc.) will be saved. name: A unique name for this run/problem (used in filenames). """ self.problem_path = problem_path self.config = config self.dataset_path = dataset_path self.output_path = output_path self.name = name # --- Essential State --- self.paper = {'tasks': []} # Holds the final generated content self.completed_steps = set() # Tracks completed step names self.planned_steps = [] # Dynamically updated list of step names self.dependencies = self._define_dependencies() # Map of step -> prerequisites # --- LLM & Agents --- self.llm = LLM(config['model_name']) self.pa = ProblemAnalysis(self.llm) # self.mr = MethodRanking(self.llm) self.pm = ProblemModeling(self.llm) self.td = TaskDecompose(self.llm) self.task = Task(self.llm) self.chart = Chart(self.llm) self.coordinator = Coordinator(self.llm) # Manages task dependencies and intermediate results # --- Intermediate Data (Populated during generation) --- self.problem_str: str | None = None self.problem: dict | None = None self.problem_type: str | None = None self.problem_year: str | None = None self.problem_analysis: str | None = None self.modeling_solution: str | None = None self.task_descriptions: list[str] | None = None self.order: list[int] | None = None # Execution order of tasks self.with_code: bool = False # --- Setup --- mkdir_output(self.output_path) self._initialize_problem_and_steps() print(f"Initialization complete. Starting steps: {self.planned_steps}") print(f"Already completed: {self.completed_steps}") def _define_dependencies(self): """Defines the prerequisite steps for each generation step.""" # Basic structure, will be expanded after task decomposition deps = { 'Problem Background': [], 'Problem Requirement': [], 'Problem Analysis': ['Problem Background', 'Problem Requirement'], 'High-Level Modeling': ['Problem Analysis'], 'Task Decomposition': ['High-Level Modeling'], 'Dependency Analysis': ['Task Decomposition'], # Added explicit dependency analysis step # Task dependencies will be added dynamically } return deps def _update_dependencies_after_decomp(self): """Updates dependencies for task-specific steps after decomposition and dependency analysis.""" if not self.order: print("Warning: Task order not determined. Cannot update task dependencies.") return num_tasks = len(self.task_descriptions) for i in range(1, num_tasks + 1): task_id = str(i) task_prereqs = [f'Task {dep_id} Subtask Outcome Analysis' for dep_id in self.coordinator.DAG.get(task_id, [])] # Add 'Dependency Analysis' as a prerequisite for the *first* step of *any* task base_task_prereqs = ['Dependency Analysis'] + task_prereqs self.dependencies[f'Task {i} Description'] = ['Task Decomposition'] # Description comes directly from decomp self.dependencies[f'Task {i} Analysis'] = [f'Task {i} Description'] + base_task_prereqs self.dependencies[f'Task {i} Preliminary Formulas'] = [f'Task {i} Analysis'] self.dependencies[f'Task {i} Mathematical Modeling Process'] = [f'Task {i} Preliminary Formulas'] if self.with_code: self.dependencies[f'Task {i} Code'] = [f'Task {i} Mathematical Modeling Process'] self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Code'] else: # If no code, interpretation depends directly on modeling self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Mathematical Modeling Process'] self.dependencies[f'Task {i} Subtask Outcome Analysis'] = [f'Task {i} Solution Interpretation'] self.dependencies[f'Task {i} Charts'] = [f'Task {i} Subtask Outcome Analysis'] def _initialize_problem_and_steps(self): """Loads the problem input and sets up the initial state.""" print("Loading problem input...") self.problem_str, self.problem = problem_input(self.problem_path, self.llm) filename = os.path.splitext(os.path.basename(self.problem_path))[0] if '_' in filename: self.problem_year, self.problem_type = filename.split('_')[:2] else: self.problem_type = 'X' self.problem_year = 'XXXX' self.paper['problem_background'] = self.problem['background'] self.paper['problem_requirement'] = self.problem['problem_requirement'] self.completed_steps.add('Problem Background') self.completed_steps.add('Problem Requirement') self.with_code = len(self.problem.get('dataset_path', '')) > 0 or len(self.dataset_path) > 0 # Check both problem spec and explicit path if self.with_code and os.path.exists(self.dataset_path): print(f"Copying dataset from {self.dataset_path} to {os.path.join(self.output_path, 'code')}") shutil.copytree(self.dataset_path, os.path.join(self.output_path, 'code'), dirs_exist_ok=True) elif self.with_code: print(f"Warning: Code execution expected, but dataset path '{self.dataset_path}' not found.") # Initial plan before task decomposition self.planned_steps = [ 'Problem Background', 'Problem Requirement', 'Problem Analysis', 'High-Level Modeling', 'Task Decomposition', 'Dependency Analysis' # Added explicit step ] def _check_dependencies(self, step_name: str) -> bool: """Checks if all prerequisites for a given step are completed.""" if step_name not in self.dependencies: print(f"Warning: No dependency information defined for step '{step_name}'. Assuming runnable.") return True # Or False, depending on desired strictness prerequisites = self.dependencies.get(step_name, []) for prereq in prerequisites: if prereq not in self.completed_steps: print(f"Dependency Error: Step '{step_name}' requires '{prereq}', which is not completed.") return False return True def _update_planned_steps_after_decomp(self): """Adds all task-specific steps to the planned steps list.""" if not self.task_descriptions or self.order is None: print("Error: Cannot update planned steps. Task decomposition or dependency analysis incomplete.") return task_step_templates = [ 'Description', 'Analysis', 'Preliminary Formulas', 'Mathematical Modeling Process', 'Code' if self.with_code else None, # Add code step only if needed 'Solution Interpretation', 'Subtask Outcome Analysis', 'Charts', ] # Filter out None template (for no-code case) task_step_templates = [t for t in task_step_templates if t] new_task_steps = [] # Add steps in the determined execution order for task_id_int in self.order: for template in task_step_templates: new_task_steps.append(f'Task {task_id_int} {template}') # Append new task steps after the 'Dependency Analysis' step dep_analysis_index = self.planned_steps.index('Dependency Analysis') self.planned_steps = self.planned_steps[:dep_analysis_index+1] + new_task_steps # Initialize paper['tasks'] structure self.paper['tasks'] = [{} for _ in range(len(self.task_descriptions))] # --- Getters --- def get_completed_steps(self) -> set: """Returns the set of names of completed steps.""" return self.completed_steps def get_planned_steps(self) -> list: """Returns the list of names of planned steps (including completed).""" return self.planned_steps def get_paper(self) -> dict: """Returns the current state of the generated paper dictionary.""" # Ensure tasks are ordered correctly in the final output if needed, # although appending them in self.order sequence should handle this. return self.paper def save_paper(self, intermediate=False): """Saves the current paper state to files.""" filename = f"{self.name}_intermediate_{datetime.now().strftime('%Y%m%d%H%M%S')}" if intermediate else self.name json_path = os.path.join(self.output_path, 'json', f"{filename}.json") md_path = os.path.join(self.output_path, 'markdown', f"{filename}.md") # latex_path = os.path.join(self.output_path, 'latex', f"{filename}.tex") # Uncomment if needed write_json_file(json_path, self.paper) markdown_str = json_to_markdown(self.paper) write_text_file(md_path, markdown_str) # write_text_file(latex_path, markdown_to_latex(markdown_str)) # Uncomment if needed print(f"Saved paper snapshot to {json_path} and {md_path}") def save_usage(self): """Saves the LLM usage statistics.""" usage_path = os.path.join(self.output_path, 'usage', f"{self.name}.json") write_json_file(usage_path, self.llm.get_total_usage()) print(f"Saved LLM usage to {usage_path}") print(f"Total Usage: {self.llm.get_total_usage()}") # --- Step Generation Methods --- def _generate_problem_analysis(self, user_prompt: str = '', round: int = 0): print("Generating: Problem Analysis") self.problem_analysis = self.pa.analysis( self.problem_str, round=round if round > 0 else self.config.get('problem_analysis_round', 0), user_prompt=user_prompt ) self.paper['problem_analysis'] = self.problem_analysis print("Completed: Problem Analysis") def _generate_high_level_modeling(self, user_prompt: str = '', round: int = 0): print("Generating: High-Level Modeling") # modeling_methods = "" # Load from constants if needed, currently unused self.modeling_solution = self.pm.modeling( self.problem_str, self.problem_analysis, "", # modeling_methods placeholder round=round if round > 0 else self.config.get('problem_modeling_round', 0), user_prompt=user_prompt ) self.paper['high_level_modeling'] = self.modeling_solution # Use a consistent key print("Completed: High-Level Modeling") def _generate_task_decomposition(self, user_prompt: str = ''): print("Generating: Task Decomposition") self.task_descriptions = self.td.decompose_and_refine( self.problem_str, self.problem_analysis, self.modeling_solution, self.problem_type, self.config.get('tasknum', 4), # Default to 4 tasks if not specified user_prompt=user_prompt ) self.paper['task_decomposition_summary'] = "\n".join([f"Task {i+1}: {desc}" for i, desc in enumerate(self.task_descriptions)]) # Add summary to paper print(f"Completed: Task Decomposition ({len(self.task_descriptions)} tasks)") # Now that we know the tasks, update the planned steps # self._update_planned_steps_after_decomp() # This will be called after dependency analysis def _generate_dependency_analysis(self): print("Generating: Dependency Analysis") self.order = self.coordinator.analyze_dependencies( self.problem_str, self.problem_analysis, self.modeling_solution, self.task_descriptions, self.with_code ) self.order = [int(i) for i in self.order] # Ensure integer IDs self.paper['task_execution_order'] = self.order # Store the order self.paper['task_dependency_analysis'] = self.coordinator.task_dependency_analysis # Store rationale print(f"Completed: Dependency Analysis. Execution order: {self.order}") # Update planned steps and dependencies now that order and DAG are known self._update_planned_steps_after_decomp() self._update_dependencies_after_decomp() print(f"Updated planned steps: {self.planned_steps}") def _generate_task_step(self, task_id: int, step_type: str, user_prompt: str = '', round: int = 0): """Handles generation for a specific step within a specific task.""" print(f"Generating: Task {task_id} {step_type}") task_index = task_id - 1 # 0-based index # Ensure the task dictionary exists if task_index >= len(self.paper['tasks']): print(f"Error: Task index {task_index} out of bounds for self.paper['tasks'].") return False # Indicate failure # --- Prepare common inputs for task steps --- task_description = self.task_descriptions[task_index] # Retrieve previously generated parts for this task, if they exist current_task_dict = self.paper['tasks'][task_index] task_analysis = current_task_dict.get('task_analysis') task_formulas = current_task_dict.get('preliminary_formulas') task_modeling = current_task_dict.get('mathematical_modeling_process') task_code = current_task_dict.get('task_code') execution_result = current_task_dict.get('execution_result') task_result = current_task_dict.get('solution_interpretation') # --- Construct Dependency Prompt --- task_dependency_ids = [int(i) for i in self.coordinator.DAG.get(str(task_id), [])] dependency_prompt = "" dependent_file_prompt = "" # Specifically for coding step if len(task_dependency_ids) > 0: # Fetch dependency analysis rationale for the current task rationale = "" if self.coordinator.task_dependency_analysis and task_index < len(self.coordinator.task_dependency_analysis): rationale = self.coordinator.task_dependency_analysis[task_index] else: print(f"Warning: Could not find dependency rationale for Task {task_id}") dependency_prompt = f"This task is Task {task_id}, which depends on the following tasks: {task_dependency_ids}. The dependencies for this task are analyzed as follows: {rationale}\n" for dep_id in task_dependency_ids: dep_task_index = dep_id - 1 if dep_task_index < 0 or dep_task_index >= len(self.paper['tasks']): print(f"Warning: Cannot build dependency prompt. Dependent Task {dep_id} data not found.") continue dep_task_dict = self.paper['tasks'][dep_task_index] # Also try fetching from coordinator memory as a fallback if paper is not updated yet (shouldn't happen with dependency checks) dep_mem_dict = self.coordinator.memory.get(str(dep_id), {}) dep_code_mem_dict = self.coordinator.code_memory.get(str(dep_id), {}) dependency_prompt += f"---\n# The Description of Task {dep_id}:\n{dep_task_dict.get('task_description', dep_mem_dict.get('task_description', 'N/A'))}\n" dependency_prompt += f"# The modeling method for Task {dep_id}:\n{dep_task_dict.get('mathematical_modeling_process', dep_mem_dict.get('mathematical_modeling_process', 'N/A'))}\n" if self.with_code: # Try getting code structure from paper first, then coordinator memory code_structure_str = json.dumps(dep_task_dict.get('code_structure', dep_code_mem_dict), indent=2) if dep_task_dict.get('code_structure', dep_code_mem_dict) else "{}" # Default to empty json object string dependency_prompt += f"# The structure of code for Task {dep_id}:\n{code_structure_str}\n" dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n" dependent_file_prompt += f"# The files generated by code for Task {dep_id}:\n{code_structure_str}\n" # Use the same structure info else: dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n" # Append general instructions based on the step type task_analysis_prompt = dependency_prompt + TASK_ANALYSIS_APPEND_PROMPT if step_type == 'Analysis' else dependency_prompt task_formulas_prompt = dependency_prompt + TASK_FORMULAS_APPEND_PROMPT if step_type == 'Preliminary Formulas' else dependency_prompt task_modeling_prompt = dependency_prompt + TASK_MODELING_APPEND_PROMPT if step_type == 'Mathematical Modeling Process' else dependency_prompt # --- Execute Specific Step Logic --- success = True try: if step_type == 'Description': # Description is directly from task_descriptions, just assign it self.paper['tasks'][task_index]['task_description'] = task_description # Store in coordinator memory as well for prompt building if needed later if str(task_id) not in self.coordinator.memory: self.coordinator.memory[str(task_id)] = {} self.coordinator.memory[str(task_id)]['task_description'] = task_description elif step_type == 'Analysis': task_analysis = self.task.analysis( task_analysis_prompt, # Includes dependency info task_description, user_prompt=user_prompt ) self.paper['tasks'][task_index]['task_analysis'] = task_analysis self.coordinator.memory[str(task_id)]['task_analysis'] = task_analysis elif step_type == 'Preliminary Formulas': if not task_analysis: raise ValueError(f"Task {task_id} Analysis is missing.") description_and_analysis = f'## Task Description\n{task_description}\n\n## Task Analysis\n{task_analysis}' top_modeling_methods = modeling_methods # self.mr.top_methods(description_and_analysis, top_k=self.config.get('top_method_num', 6)) task_formulas = self.task.formulas( task_formulas_prompt, # Includes dependency info self.problem.get('data_description', ''), task_description, task_analysis, top_modeling_methods, round=round if round > 0 else self.config.get('task_formulas_round', 0), user_prompt=user_prompt ) self.paper['tasks'][task_index]['preliminary_formulas'] = task_formulas self.coordinator.memory[str(task_id)]['preliminary_formulas'] = task_formulas elif step_type == 'Mathematical Modeling Process': if not task_analysis or not task_formulas: raise ValueError(f"Task {task_id} Analysis or Formulas missing.") task_modeling = self.task.modeling( task_modeling_prompt, # Includes dependency info self.problem.get('data_description', ''), task_description, task_analysis, task_formulas, round=round if round > 0 else self.config.get('task_modeling_round', 0), user_prompt=user_prompt ) self.paper['tasks'][task_index]['mathematical_modeling_process'] = task_modeling self.coordinator.memory[str(task_id)]['mathematical_modeling_process'] = task_modeling elif step_type == 'Code' and self.with_code: if not task_analysis or not task_formulas or not task_modeling: raise ValueError(f"Task {task_id} Analysis, Formulas, or Modeling missing for coding.") code_template_path = os.path.join('../data/actor_data/input/code_template', f'main{task_id}.py') code_template = "" if os.path.exists(code_template_path): with open(code_template_path, 'r') as f: code_template = f.read() else: print(f"Warning: Code template not found at {code_template_path}. Using empty template.") save_path = os.path.join(self.output_path, 'code', f'main{task_id}.py') work_dir = os.path.join(self.output_path, 'code') script_name = f'main{task_id}.py' dataset_input_path = self.problem.get('dataset_path') or self.dataset_path # Prefer path from problem spec task_code, is_pass, execution_result = self.task.coding( dataset_input_path, # Use actual dataset path self.problem.get('data_description', ''), self.problem.get('variable_description', ''), task_description, task_analysis, task_formulas, task_modeling, dependent_file_prompt, # Pass file dependency info code_template, script_name, work_dir, try_num=5, round=round if round > 0 else 1, user_prompt=user_prompt ) code_structure = self.task.extract_code_structure(task_id, task_code, save_path) # Uses save_path now # self.paper['tasks'][task_index]['task_code'] = task_code self.paper['tasks'][task_index]['task_code'] = '```Python\n' + task_code + '\n```' self.paper['tasks'][task_index]['is_pass'] = is_pass self.paper['tasks'][task_index]['execution_result'] = execution_result self.paper['tasks'][task_index]['code_structure'] = code_structure # Store structure in paper # Update coordinator's code memory as well self.coordinator.code_memory[str(task_id)] = code_structure elif step_type == 'Solution Interpretation': if not task_modeling: raise ValueError(f"Task {task_id} Modeling is missing.") if self.with_code and execution_result is None: raise ValueError(f"Task {task_id} Code execution result is missing.") task_result = self.task.result( task_description, task_analysis, task_formulas, task_modeling, user_prompt=user_prompt, execution_result=execution_result if self.with_code else '' ) self.paper['tasks'][task_index]['solution_interpretation'] = task_result self.coordinator.memory[str(task_id)]['solution_interpretation'] = task_result elif step_type == 'Subtask Outcome Analysis': if not task_result: raise ValueError(f"Task {task_id} Solution Interpretation is missing.") task_answer = self.task.answer( task_description, task_analysis, task_formulas, task_modeling, task_result, user_prompt=user_prompt ) self.paper['tasks'][task_index]['subtask_outcome_analysis'] = task_answer self.coordinator.memory[str(task_id)]['subtask_outcome_analysis'] = task_answer elif step_type == 'Charts': # Charts depend on the full task dictionary being available full_task_dict_str = json.dumps(self.paper['tasks'][task_index], indent=2) charts = self.chart.create_charts( full_task_dict_str, self.config.get('chart_num', 0), user_prompt=user_prompt ) self.paper['tasks'][task_index]['charts'] = charts self.coordinator.memory[str(task_id)]['charts'] = charts # Also save to coordinator memory if needed elsewhere else: print(f"Warning: Unknown step type '{step_type}' for Task {task_id}.") success = False except Exception as e: print(f"Error generating Task {task_id} {step_type}: {e}") import traceback traceback.print_exc() success = False # Mark step as failed if success: print(f"Completed: Task {task_id} {step_type}") return success # --- Main Generation Control --- def generate_step(self, step_name: str, user_prompt: str = '', round: int = 0, force_regenerate: bool = True) -> bool: """ Generates the content for a specific step, checking dependencies first. Args: step_name: The name of the step to generate (e.g., 'Problem Analysis', 'Task 1 Preliminary Formulas'). user_prompt: Optional user guidance to influence the generation. round: Number of improvement rounds to apply (where applicable). force_regenerate: If True, regenerate the step even if it's already completed. Returns: True if the step was generated successfully (or was already complete), False otherwise. """ if step_name in self.completed_steps and not force_regenerate: print(f"Skipping already completed step: '{step_name}'") return True if step_name in self.completed_steps and force_regenerate: print(f"Regenerating step: '{step_name}'") # Remove the step from completed_steps to allow regeneration self.completed_steps.remove(step_name) if not self._check_dependencies(step_name): print(f"Cannot generate step '{step_name}' due to unmet dependencies.") return False # Dispatch to the appropriate generation method success = False try: if step_name == 'Problem Analysis': self._generate_problem_analysis(user_prompt, round) success = True elif step_name == 'High-Level Modeling': self._generate_high_level_modeling(user_prompt, round) success = True elif step_name == 'Task Decomposition': self._generate_task_decomposition(user_prompt) success = True # Decomp itself is done, planning/deps updated later elif step_name == 'Dependency Analysis': self._generate_dependency_analysis() success = True # Analysis itself is done elif step_name.startswith('Task '): # Parse task ID and step type match = re.match(r"Task (\d+) (.*)", step_name) if match: task_id = int(match.group(1)) step_type = match.group(2) # Ensure task steps are only generated if their task ID is valid if self.order and task_id in self.order: success = self._generate_task_step(task_id, step_type, user_prompt, round) elif not self.order: print(f"Error: Cannot generate task step '{step_name}'. Task order not determined yet.") success = False else: print(f"Error: Cannot generate task step '{step_name}'. Task ID {task_id} not found in execution order {self.order}.") success = False else: print(f"Error: Could not parse task step name: '{step_name}'") success = False else: # Handle Problem Background and Requirement (already done in init) if step_name in ['Problem Background', 'Problem Requirement']: print(f"Step '{step_name}' completed during initialization.") success = True # Mark as successful completion else: print(f"Error: Unknown step name: '{step_name}'") success = False if success: self.completed_steps.add(step_name) # Optional: Save intermediate state after each successful step # self.save_paper(intermediate=True) except Exception as e: print(f"Critical error during generation of step '{step_name}': {e}") import traceback traceback.print_exc() success = False return success def run_sequential(self, force_regenerate_all: bool = False): """ Runs the entire generation process sequentially, step by step. Args: force_regenerate_all: If True, regenerate all steps even if already completed. """ print("Starting sequential generation...") current_step_index = 0 # Clear completed steps if regenerating all if force_regenerate_all: print("Force regenerating all steps...") self.completed_steps.clear() while current_step_index < len(self.planned_steps): # Check if planned_steps was modified (e.g., by task decomp/dependency analysis) if current_step_index >= len(self.planned_steps): print("Reached end of planned steps.") break # Avoid index error if list shrinks unexpectedly step_name = self.planned_steps[current_step_index] print(f"\n--- Attempting Step: {step_name} ({current_step_index + 1}/{len(self.planned_steps)}) ---") if step_name in self.completed_steps: print(f"Skipping already completed step: '{step_name}'") current_step_index += 1 continue # Record length before generation in case planned_steps changes length_before = len(self.planned_steps) success = self.generate_step(step_name, force_regenerate_all) length_after = len(self.planned_steps) if success: print(f"--- Successfully completed step: '{step_name}' ---") # If the number of planned steps increased, it means task steps were added. # The loop condition `current_step_index < len(self.planned_steps)` # will naturally handle iterating through the newly added steps. # We just need to increment the index to move to the *next* step # in the potentially updated list. current_step_index += 1 else: print(f"--- Failed to complete step: '{step_name}'. Stopping generation. ---") break # Stop processing if a step fails print("\nSequential generation process finished.") self.save_paper() # Save final result self.save_usage() print(f"Final paper saved for run '{self.name}' in '{self.output_path}'.") print(f"Completed steps: {self.completed_steps}") if current_step_index < len(self.planned_steps): print(f"Next planned step was: {self.planned_steps[current_step_index]}") def generate_paper(self, project_dir: str): # Example usage metadata = { "team": "Agent", "year": self.problem_year, "problem_type": self.problem_type } json_file_path = f"{project_dir}/json/{self.problem_year}_{self.problem_type}.json" with open(json_file_path, 'w+') as f: json.dump(self.paper, f, indent=2) code_dir = f'{project_dir}/code' metadata['figures'] = [os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['png', 'jpg', 'jpeg']] metadata['codes'] = sorted([os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['py']]) generate_paper_from_json(self.llm, self.paper, metadata, os.path.join(project_dir, 'latex'), 'solution') # --- Example Usage --- def create_generator(name): """Helper function to set up configuration and create the agent system.""" config = { 'top_method_num': 6, 'problem_analysis_round': 0, 'problem_modeling_round': 0, 'task_formulas_round': 0, 'tasknum': 4, # Default task number if not inferred 'chart_num': 0, # Set to > 0 to generate charts 'model_name': 'gpt-4o-mini', # Or your preferred model "method_name": "MM-Agent-Refactored" # Name for the experiment/output folder } # Adjust paths relative to the script location or use absolute paths base_data_path = '../data/actor_data' # Adjust if necessary problem_file = os.path.join(base_data_path, 'input', 'problem', f'{name}.json') dataset_input_path = os.path.join(base_data_path, 'input', 'dataset', name) # Path to check for dataset files output_dir = os.path.join(base_data_path, 'exps', config["method_name"]) # Create a unique output path for this specific run run_output_path = os.path.join(output_dir, f"{name}_{datetime.now().strftime('%Y%m%d-%H%M%S')}") if not os.path.exists(problem_file): print(f"Error: Problem file not found at {problem_file}") return None # Output path is created inside the class constructor now # if not os.path.exists(output_dir): # os.makedirs(output_dir) multi_agent = ModelingAgentSystem( problem_path=problem_file, config=config, dataset_path=dataset_input_path, # Pass the specific dataset path output_path=run_output_path, name=name ) return multi_agent if __name__ == "__main__": problem_name = "2024_C" # Example problem name agent_system = create_generator(problem_name) if agent_system: # --- Option 1: Run the whole process sequentially --- agent_system.run_sequential() # --- Option 2: Generate specific steps manually (Example) --- # print("\n--- Manual Step Generation Example ---") # # Assuming initialization is done in create_generator # agent_system.generate_step('Problem Analysis') # agent_system.generate_step('High-Level Modeling') # agent_system.generate_step('Task Decomposition') # agent_system.generate_step('Dependency Analysis') # Needed before task steps # # Now planned_steps and dependencies should be updated # print("Planned steps after decomp/dep analysis:", agent_system.get_planned_steps()) # print("Dependencies:", agent_system.dependencies) # View the updated dependencies # # Try generating the first step of the first task in the order # if agent_system.order: # first_task_id = agent_system.order[0] # agent_system.generate_step(f'Task {first_task_id} Description') # agent_system.generate_step(f'Task {first_task_id} Analysis') # # ... and so on # else: # print("Cannot run manual task steps, order not determined.") # print("\n--- Final State after Manual Steps ---") # print("Completed Steps:", agent_system.get_completed_steps()) # final_paper = agent_system.get_paper() # print("Generated Paper Content (summary):") # print(json.dumps(final_paper, indent=2, default=str)[:1000] + "\n...") # Print partial paper # agent_system.save_paper() # Save the result # agent_system.save_usage() # --- Option 3: Iterate using the provided loop structure --- # print("\n--- Iterative Generation Example ---") # current_step_index = 0 # while current_step_index < len(agent_system.planned_steps): # # Check if planned_steps changed during iteration # if current_step_index >= len(agent_system.planned_steps): # print("Reached end due to plan changes.") # break # step_name = agent_system.planned_steps[current_step_index] # print(f"\nAttempting step ({current_step_index+1}/{len(agent_system.planned_steps)}): {step_name}") # if step_name in agent_system.completed_steps: # print(f"Skipping already completed step: '{step_name}'") # current_step_index += 1 # continue # success = agent_system.generate_step(step_name) # if not success: # print(f"Failed on step: {step_name}. Stopping.") # break # # Increment index regardless of whether plan changed, # # the loop condition handles the updated length # current_step_index += 1 # print("\n--- Final State after Iterative Loop ---") # print("Completed Steps:", agent_system.get_completed_steps()) # final_paper = agent_system.get_paper() # # print("Generated Paper Content (full):") # # print(json.dumps(final_paper, indent=2, default=str)) # agent_system.save_paper() # Save the result # agent_system.save_usage()