import sys
sys.path.append('../core')
import os
import shutil
from datetime import datetime
import json
import re
# Assuming these imports are correctly set up in your project structure
from llm.llm import LLM
# from prompt.constants import modeling_methods # This was unused in the original code
from input.problem import problem_input
from agent.problem_analysis import ProblemAnalysis
# from agent.method_ranking import MethodRanking
from agent.problem_modeling import ProblemModeling
from agent.task_decompse import TaskDecompose
from agent.task import Task
from agent.create_charts import Chart
from agent.coordinator import Coordinator
from utils.utils import read_json_file, write_json_file, write_text_file, json_to_markdown
from prompt.template import TASK_ANALYSIS_APPEND_PROMPT, TASK_FORMULAS_APPEND_PROMPT, TASK_MODELING_APPEND_PROMPT
from utils.generate_paper import generate_paper_from_json
# from utils.convert_format import markdown_to_latex # Uncomment if needed
from prompt.constants import modeling_methods
def mkdir_output(path):
"""Creates the necessary output directories."""
os.makedirs(path, exist_ok=True)
os.makedirs(os.path.join(path, 'json'), exist_ok=True)
os.makedirs(os.path.join(path, 'markdown'), exist_ok=True)
os.makedirs(os.path.join(path, 'latex'), exist_ok=True) # Assuming latex might be used later
os.makedirs(os.path.join(path, 'code'), exist_ok=True)
os.makedirs(os.path.join(path, 'usage'), exist_ok=True)
os.makedirs(os.path.join(path, 'intermediate'), exist_ok=True) # For intermediate coordinator state if needed
class ModelingAgentSystem:
"""
Manages the step-by-step generation of a mathematical modeling report.
Allows for granular control over section generation and tracks progress.
"""
def __init__(self, problem_path: str, config: dict, dataset_path: str, output_path: str, name: str):
"""
Initializes the Modeling Agent System.
Args:
problem_path: Path to the problem description file (e.g., JSON).
config: Dictionary containing configuration parameters (model_name, rounds, etc.).
dataset_path: Path to the dataset directory associated with the problem.
output_path: Path where generated outputs (json, md, code, etc.) will be saved.
name: A unique name for this run/problem (used in filenames).
"""
self.problem_path = problem_path
self.config = config
self.dataset_path = dataset_path
self.output_path = output_path
self.name = name
# --- Essential State ---
self.paper = {'tasks': []} # Holds the final generated content
self.completed_steps = set() # Tracks completed step names
self.planned_steps = [] # Dynamically updated list of step names
self.dependencies = self._define_dependencies() # Map of step -> prerequisites
# --- LLM & Agents ---
self.llm = LLM(config['model_name'])
self.pa = ProblemAnalysis(self.llm)
# self.mr = MethodRanking(self.llm)
self.pm = ProblemModeling(self.llm)
self.td = TaskDecompose(self.llm)
self.task = Task(self.llm)
self.chart = Chart(self.llm)
self.coordinator = Coordinator(self.llm) # Manages task dependencies and intermediate results
# --- Intermediate Data (Populated during generation) ---
self.problem_str: str | None = None
self.problem: dict | None = None
self.problem_type: str | None = None
self.problem_year: str | None = None
self.problem_analysis: str | None = None
self.modeling_solution: str | None = None
self.task_descriptions: list[str] | None = None
self.order: list[int] | None = None # Execution order of tasks
self.with_code: bool = False
# --- Setup ---
mkdir_output(self.output_path)
self._initialize_problem_and_steps()
print(f"Initialization complete. Starting steps: {self.planned_steps}")
print(f"Already completed: {self.completed_steps}")
def _define_dependencies(self):
"""Defines the prerequisite steps for each generation step."""
# Basic structure, will be expanded after task decomposition
deps = {
'Problem Background': [],
'Problem Requirement': [],
'Problem Analysis': ['Problem Background', 'Problem Requirement'],
'High-Level Modeling': ['Problem Analysis'],
'Task Decomposition': ['High-Level Modeling'],
'Dependency Analysis': ['Task Decomposition'], # Added explicit dependency analysis step
# Task dependencies will be added dynamically
}
return deps
def _update_dependencies_after_decomp(self):
"""Updates dependencies for task-specific steps after decomposition and dependency analysis."""
if not self.order:
print("Warning: Task order not determined. Cannot update task dependencies.")
return
num_tasks = len(self.task_descriptions)
for i in range(1, num_tasks + 1):
task_id = str(i)
task_prereqs = [f'Task {dep_id} Subtask Outcome Analysis' for dep_id in self.coordinator.DAG.get(task_id, [])]
# Add 'Dependency Analysis' as a prerequisite for the *first* step of *any* task
base_task_prereqs = ['Dependency Analysis'] + task_prereqs
self.dependencies[f'Task {i} Description'] = ['Task Decomposition'] # Description comes directly from decomp
self.dependencies[f'Task {i} Analysis'] = [f'Task {i} Description'] + base_task_prereqs
self.dependencies[f'Task {i} Preliminary Formulas'] = [f'Task {i} Analysis']
self.dependencies[f'Task {i} Mathematical Modeling Process'] = [f'Task {i} Preliminary Formulas']
if self.with_code:
self.dependencies[f'Task {i} Code'] = [f'Task {i} Mathematical Modeling Process']
self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Code']
else:
# If no code, interpretation depends directly on modeling
self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Mathematical Modeling Process']
self.dependencies[f'Task {i} Subtask Outcome Analysis'] = [f'Task {i} Solution Interpretation']
self.dependencies[f'Task {i} Charts'] = [f'Task {i} Subtask Outcome Analysis']
def _initialize_problem_and_steps(self):
"""Loads the problem input and sets up the initial state."""
print("Loading problem input...")
self.problem_str, self.problem = problem_input(self.problem_path, self.llm)
filename = os.path.splitext(os.path.basename(self.problem_path))[0]
if '_' in filename:
self.problem_year, self.problem_type = filename.split('_')[:2]
else:
self.problem_type = 'X'
self.problem_year = 'XXXX'
self.paper['problem_background'] = self.problem['background']
self.paper['problem_requirement'] = self.problem['problem_requirement']
self.completed_steps.add('Problem Background')
self.completed_steps.add('Problem Requirement')
self.with_code = len(self.problem.get('dataset_path', '')) > 0 or len(self.dataset_path) > 0 # Check both problem spec and explicit path
if self.with_code and os.path.exists(self.dataset_path):
print(f"Copying dataset from {self.dataset_path} to {os.path.join(self.output_path, 'code')}")
shutil.copytree(self.dataset_path, os.path.join(self.output_path, 'code'), dirs_exist_ok=True)
elif self.with_code:
print(f"Warning: Code execution expected, but dataset path '{self.dataset_path}' not found.")
# Initial plan before task decomposition
self.planned_steps = [
'Problem Background',
'Problem Requirement',
'Problem Analysis',
'High-Level Modeling',
'Task Decomposition',
'Dependency Analysis' # Added explicit step
]
def _check_dependencies(self, step_name: str) -> bool:
"""Checks if all prerequisites for a given step are completed."""
if step_name not in self.dependencies:
print(f"Warning: No dependency information defined for step '{step_name}'. Assuming runnable.")
return True # Or False, depending on desired strictness
prerequisites = self.dependencies.get(step_name, [])
for prereq in prerequisites:
if prereq not in self.completed_steps:
print(f"Dependency Error: Step '{step_name}' requires '{prereq}', which is not completed.")
return False
return True
def _update_planned_steps_after_decomp(self):
"""Adds all task-specific steps to the planned steps list."""
if not self.task_descriptions or self.order is None:
print("Error: Cannot update planned steps. Task decomposition or dependency analysis incomplete.")
return
task_step_templates = [
'Description',
'Analysis',
'Preliminary Formulas',
'Mathematical Modeling Process',
'Code' if self.with_code else None, # Add code step only if needed
'Solution Interpretation',
'Subtask Outcome Analysis',
'Charts',
]
# Filter out None template (for no-code case)
task_step_templates = [t for t in task_step_templates if t]
new_task_steps = []
# Add steps in the determined execution order
for task_id_int in self.order:
for template in task_step_templates:
new_task_steps.append(f'Task {task_id_int} {template}')
# Append new task steps after the 'Dependency Analysis' step
dep_analysis_index = self.planned_steps.index('Dependency Analysis')
self.planned_steps = self.planned_steps[:dep_analysis_index+1] + new_task_steps
# Initialize paper['tasks'] structure
self.paper['tasks'] = [{} for _ in range(len(self.task_descriptions))]
# --- Getters ---
def get_completed_steps(self) -> set:
"""Returns the set of names of completed steps."""
return self.completed_steps
def get_planned_steps(self) -> list:
"""Returns the list of names of planned steps (including completed)."""
return self.planned_steps
def get_paper(self) -> dict:
"""Returns the current state of the generated paper dictionary."""
# Ensure tasks are ordered correctly in the final output if needed,
# although appending them in self.order sequence should handle this.
return self.paper
def save_paper(self, intermediate=False):
"""Saves the current paper state to files."""
filename = f"{self.name}_intermediate_{datetime.now().strftime('%Y%m%d%H%M%S')}" if intermediate else self.name
json_path = os.path.join(self.output_path, 'json', f"{filename}.json")
md_path = os.path.join(self.output_path, 'markdown', f"{filename}.md")
# latex_path = os.path.join(self.output_path, 'latex', f"{filename}.tex") # Uncomment if needed
write_json_file(json_path, self.paper)
markdown_str = json_to_markdown(self.paper)
write_text_file(md_path, markdown_str)
# write_text_file(latex_path, markdown_to_latex(markdown_str)) # Uncomment if needed
print(f"Saved paper snapshot to {json_path} and {md_path}")
def save_usage(self):
"""Saves the LLM usage statistics."""
usage_path = os.path.join(self.output_path, 'usage', f"{self.name}.json")
write_json_file(usage_path, self.llm.get_total_usage())
print(f"Saved LLM usage to {usage_path}")
print(f"Total Usage: {self.llm.get_total_usage()}")
# --- Step Generation Methods ---
def _generate_problem_analysis(self, user_prompt: str = '', round: int = 0):
print("Generating: Problem Analysis")
self.problem_analysis = self.pa.analysis(
self.problem_str,
round=round if round > 0 else self.config.get('problem_analysis_round', 0),
user_prompt=user_prompt
)
self.paper['problem_analysis'] = self.problem_analysis
print("Completed: Problem Analysis")
def _generate_high_level_modeling(self, user_prompt: str = '', round: int = 0):
print("Generating: High-Level Modeling")
# modeling_methods = "" # Load from constants if needed, currently unused
self.modeling_solution = self.pm.modeling(
self.problem_str,
self.problem_analysis,
"", # modeling_methods placeholder
round=round if round > 0 else self.config.get('problem_modeling_round', 0),
user_prompt=user_prompt
)
self.paper['high_level_modeling'] = self.modeling_solution # Use a consistent key
print("Completed: High-Level Modeling")
def _generate_task_decomposition(self, user_prompt: str = ''):
print("Generating: Task Decomposition")
self.task_descriptions = self.td.decompose_and_refine(
self.problem_str,
self.problem_analysis,
self.modeling_solution,
self.problem_type,
self.config.get('tasknum', 4), # Default to 4 tasks if not specified
user_prompt=user_prompt
)
self.paper['task_decomposition_summary'] = "\n".join([f"Task {i+1}: {desc}" for i, desc in enumerate(self.task_descriptions)]) # Add summary to paper
print(f"Completed: Task Decomposition ({len(self.task_descriptions)} tasks)")
# Now that we know the tasks, update the planned steps
# self._update_planned_steps_after_decomp() # This will be called after dependency analysis
def _generate_dependency_analysis(self):
print("Generating: Dependency Analysis")
self.order = self.coordinator.analyze_dependencies(
self.problem_str,
self.problem_analysis,
self.modeling_solution,
self.task_descriptions,
self.with_code
)
self.order = [int(i) for i in self.order] # Ensure integer IDs
self.paper['task_execution_order'] = self.order # Store the order
self.paper['task_dependency_analysis'] = self.coordinator.task_dependency_analysis # Store rationale
print(f"Completed: Dependency Analysis. Execution order: {self.order}")
# Update planned steps and dependencies now that order and DAG are known
self._update_planned_steps_after_decomp()
self._update_dependencies_after_decomp()
print(f"Updated planned steps: {self.planned_steps}")
def _generate_task_step(self, task_id: int, step_type: str, user_prompt: str = '', round: int = 0):
"""Handles generation for a specific step within a specific task."""
print(f"Generating: Task {task_id} {step_type}")
task_index = task_id - 1 # 0-based index
# Ensure the task dictionary exists
if task_index >= len(self.paper['tasks']):
print(f"Error: Task index {task_index} out of bounds for self.paper['tasks'].")
return False # Indicate failure
# --- Prepare common inputs for task steps ---
task_description = self.task_descriptions[task_index]
# Retrieve previously generated parts for this task, if they exist
current_task_dict = self.paper['tasks'][task_index]
task_analysis = current_task_dict.get('task_analysis')
task_formulas = current_task_dict.get('preliminary_formulas')
task_modeling = current_task_dict.get('mathematical_modeling_process')
task_code = current_task_dict.get('task_code')
execution_result = current_task_dict.get('execution_result')
task_result = current_task_dict.get('solution_interpretation')
# --- Construct Dependency Prompt ---
task_dependency_ids = [int(i) for i in self.coordinator.DAG.get(str(task_id), [])]
dependency_prompt = ""
dependent_file_prompt = "" # Specifically for coding step
if len(task_dependency_ids) > 0:
# Fetch dependency analysis rationale for the current task
rationale = ""
if self.coordinator.task_dependency_analysis and task_index < len(self.coordinator.task_dependency_analysis):
rationale = self.coordinator.task_dependency_analysis[task_index]
else:
print(f"Warning: Could not find dependency rationale for Task {task_id}")
dependency_prompt = f"This task is Task {task_id}, which depends on the following tasks: {task_dependency_ids}. The dependencies for this task are analyzed as follows: {rationale}\n"
for dep_id in task_dependency_ids:
dep_task_index = dep_id - 1
if dep_task_index < 0 or dep_task_index >= len(self.paper['tasks']):
print(f"Warning: Cannot build dependency prompt. Dependent Task {dep_id} data not found.")
continue
dep_task_dict = self.paper['tasks'][dep_task_index]
# Also try fetching from coordinator memory as a fallback if paper is not updated yet (shouldn't happen with dependency checks)
dep_mem_dict = self.coordinator.memory.get(str(dep_id), {})
dep_code_mem_dict = self.coordinator.code_memory.get(str(dep_id), {})
dependency_prompt += f"---\n# The Description of Task {dep_id}:\n{dep_task_dict.get('task_description', dep_mem_dict.get('task_description', 'N/A'))}\n"
dependency_prompt += f"# The modeling method for Task {dep_id}:\n{dep_task_dict.get('mathematical_modeling_process', dep_mem_dict.get('mathematical_modeling_process', 'N/A'))}\n"
if self.with_code:
# Try getting code structure from paper first, then coordinator memory
code_structure_str = json.dumps(dep_task_dict.get('code_structure', dep_code_mem_dict), indent=2) if dep_task_dict.get('code_structure', dep_code_mem_dict) else "{}" # Default to empty json object string
dependency_prompt += f"# The structure of code for Task {dep_id}:\n{code_structure_str}\n"
dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n"
dependent_file_prompt += f"# The files generated by code for Task {dep_id}:\n{code_structure_str}\n" # Use the same structure info
else:
dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n"
# Append general instructions based on the step type
task_analysis_prompt = dependency_prompt + TASK_ANALYSIS_APPEND_PROMPT if step_type == 'Analysis' else dependency_prompt
task_formulas_prompt = dependency_prompt + TASK_FORMULAS_APPEND_PROMPT if step_type == 'Preliminary Formulas' else dependency_prompt
task_modeling_prompt = dependency_prompt + TASK_MODELING_APPEND_PROMPT if step_type == 'Mathematical Modeling Process' else dependency_prompt
# --- Execute Specific Step Logic ---
success = True
try:
if step_type == 'Description':
# Description is directly from task_descriptions, just assign it
self.paper['tasks'][task_index]['task_description'] = task_description
# Store in coordinator memory as well for prompt building if needed later
if str(task_id) not in self.coordinator.memory: self.coordinator.memory[str(task_id)] = {}
self.coordinator.memory[str(task_id)]['task_description'] = task_description
elif step_type == 'Analysis':
task_analysis = self.task.analysis(
task_analysis_prompt, # Includes dependency info
task_description,
user_prompt=user_prompt
)
self.paper['tasks'][task_index]['task_analysis'] = task_analysis
self.coordinator.memory[str(task_id)]['task_analysis'] = task_analysis
elif step_type == 'Preliminary Formulas':
if not task_analysis: raise ValueError(f"Task {task_id} Analysis is missing.")
description_and_analysis = f'## Task Description\n{task_description}\n\n## Task Analysis\n{task_analysis}'
top_modeling_methods = modeling_methods # self.mr.top_methods(description_and_analysis, top_k=self.config.get('top_method_num', 6))
task_formulas = self.task.formulas(
task_formulas_prompt, # Includes dependency info
self.problem.get('data_description', ''),
task_description,
task_analysis,
top_modeling_methods,
round=round if round > 0 else self.config.get('task_formulas_round', 0),
user_prompt=user_prompt
)
self.paper['tasks'][task_index]['preliminary_formulas'] = task_formulas
self.coordinator.memory[str(task_id)]['preliminary_formulas'] = task_formulas
elif step_type == 'Mathematical Modeling Process':
if not task_analysis or not task_formulas: raise ValueError(f"Task {task_id} Analysis or Formulas missing.")
task_modeling = self.task.modeling(
task_modeling_prompt, # Includes dependency info
self.problem.get('data_description', ''),
task_description,
task_analysis,
task_formulas,
round=round if round > 0 else self.config.get('task_modeling_round', 0),
user_prompt=user_prompt
)
self.paper['tasks'][task_index]['mathematical_modeling_process'] = task_modeling
self.coordinator.memory[str(task_id)]['mathematical_modeling_process'] = task_modeling
elif step_type == 'Code' and self.with_code:
if not task_analysis or not task_formulas or not task_modeling:
raise ValueError(f"Task {task_id} Analysis, Formulas, or Modeling missing for coding.")
code_template_path = os.path.join('../data/actor_data/input/code_template', f'main{task_id}.py')
code_template = ""
if os.path.exists(code_template_path):
with open(code_template_path, 'r') as f:
code_template = f.read()
else:
print(f"Warning: Code template not found at {code_template_path}. Using empty template.")
save_path = os.path.join(self.output_path, 'code', f'main{task_id}.py')
work_dir = os.path.join(self.output_path, 'code')
script_name = f'main{task_id}.py'
dataset_input_path = self.problem.get('dataset_path') or self.dataset_path # Prefer path from problem spec
task_code, is_pass, execution_result = self.task.coding(
dataset_input_path, # Use actual dataset path
self.problem.get('data_description', ''),
self.problem.get('variable_description', ''),
task_description,
task_analysis,
task_formulas,
task_modeling,
dependent_file_prompt, # Pass file dependency info
code_template,
script_name,
work_dir,
try_num=5,
round=round if round > 0 else 1,
user_prompt=user_prompt
)
code_structure = self.task.extract_code_structure(task_id, task_code, save_path) # Uses save_path now
# self.paper['tasks'][task_index]['task_code'] = task_code
self.paper['tasks'][task_index]['task_code'] = '```Python\n' + task_code + '\n```'
self.paper['tasks'][task_index]['is_pass'] = is_pass
self.paper['tasks'][task_index]['execution_result'] = execution_result
self.paper['tasks'][task_index]['code_structure'] = code_structure # Store structure in paper
# Update coordinator's code memory as well
self.coordinator.code_memory[str(task_id)] = code_structure
elif step_type == 'Solution Interpretation':
if not task_modeling: raise ValueError(f"Task {task_id} Modeling is missing.")
if self.with_code and execution_result is None: raise ValueError(f"Task {task_id} Code execution result is missing.")
task_result = self.task.result(
task_description,
task_analysis,
task_formulas,
task_modeling,
user_prompt=user_prompt,
execution_result=execution_result if self.with_code else ''
)
self.paper['tasks'][task_index]['solution_interpretation'] = task_result
self.coordinator.memory[str(task_id)]['solution_interpretation'] = task_result
elif step_type == 'Subtask Outcome Analysis':
if not task_result: raise ValueError(f"Task {task_id} Solution Interpretation is missing.")
task_answer = self.task.answer(
task_description,
task_analysis,
task_formulas,
task_modeling,
task_result,
user_prompt=user_prompt
)
self.paper['tasks'][task_index]['subtask_outcome_analysis'] = task_answer
self.coordinator.memory[str(task_id)]['subtask_outcome_analysis'] = task_answer
elif step_type == 'Charts':
# Charts depend on the full task dictionary being available
full_task_dict_str = json.dumps(self.paper['tasks'][task_index], indent=2)
charts = self.chart.create_charts(
full_task_dict_str,
self.config.get('chart_num', 0),
user_prompt=user_prompt
)
self.paper['tasks'][task_index]['charts'] = charts
self.coordinator.memory[str(task_id)]['charts'] = charts # Also save to coordinator memory if needed elsewhere
else:
print(f"Warning: Unknown step type '{step_type}' for Task {task_id}.")
success = False
except Exception as e:
print(f"Error generating Task {task_id} {step_type}: {e}")
import traceback
traceback.print_exc()
success = False # Mark step as failed
if success:
print(f"Completed: Task {task_id} {step_type}")
return success
# --- Main Generation Control ---
def generate_step(self, step_name: str, user_prompt: str = '', round: int = 0, force_regenerate: bool = True) -> bool:
"""
Generates the content for a specific step, checking dependencies first.
Args:
step_name: The name of the step to generate (e.g., 'Problem Analysis', 'Task 1 Preliminary Formulas').
user_prompt: Optional user guidance to influence the generation.
round: Number of improvement rounds to apply (where applicable).
force_regenerate: If True, regenerate the step even if it's already completed.
Returns:
True if the step was generated successfully (or was already complete), False otherwise.
"""
if step_name in self.completed_steps and not force_regenerate:
print(f"Skipping already completed step: '{step_name}'")
return True
if step_name in self.completed_steps and force_regenerate:
print(f"Regenerating step: '{step_name}'")
# Remove the step from completed_steps to allow regeneration
self.completed_steps.remove(step_name)
if not self._check_dependencies(step_name):
print(f"Cannot generate step '{step_name}' due to unmet dependencies.")
return False
# Dispatch to the appropriate generation method
success = False
try:
if step_name == 'Problem Analysis':
self._generate_problem_analysis(user_prompt, round)
success = True
elif step_name == 'High-Level Modeling':
self._generate_high_level_modeling(user_prompt, round)
success = True
elif step_name == 'Task Decomposition':
self._generate_task_decomposition(user_prompt)
success = True # Decomp itself is done, planning/deps updated later
elif step_name == 'Dependency Analysis':
self._generate_dependency_analysis()
success = True # Analysis itself is done
elif step_name.startswith('Task '):
# Parse task ID and step type
match = re.match(r"Task (\d+) (.*)", step_name)
if match:
task_id = int(match.group(1))
step_type = match.group(2)
# Ensure task steps are only generated if their task ID is valid
if self.order and task_id in self.order:
success = self._generate_task_step(task_id, step_type, user_prompt, round)
elif not self.order:
print(f"Error: Cannot generate task step '{step_name}'. Task order not determined yet.")
success = False
else:
print(f"Error: Cannot generate task step '{step_name}'. Task ID {task_id} not found in execution order {self.order}.")
success = False
else:
print(f"Error: Could not parse task step name: '{step_name}'")
success = False
else:
# Handle Problem Background and Requirement (already done in init)
if step_name in ['Problem Background', 'Problem Requirement']:
print(f"Step '{step_name}' completed during initialization.")
success = True # Mark as successful completion
else:
print(f"Error: Unknown step name: '{step_name}'")
success = False
if success:
self.completed_steps.add(step_name)
# Optional: Save intermediate state after each successful step
# self.save_paper(intermediate=True)
except Exception as e:
print(f"Critical error during generation of step '{step_name}': {e}")
import traceback
traceback.print_exc()
success = False
return success
def run_sequential(self, force_regenerate_all: bool = False):
"""
Runs the entire generation process sequentially, step by step.
Args:
force_regenerate_all: If True, regenerate all steps even if already completed.
"""
print("Starting sequential generation...")
current_step_index = 0
# Clear completed steps if regenerating all
if force_regenerate_all:
print("Force regenerating all steps...")
self.completed_steps.clear()
while current_step_index < len(self.planned_steps):
# Check if planned_steps was modified (e.g., by task decomp/dependency analysis)
if current_step_index >= len(self.planned_steps):
print("Reached end of planned steps.")
break # Avoid index error if list shrinks unexpectedly
step_name = self.planned_steps[current_step_index]
print(f"\n--- Attempting Step: {step_name} ({current_step_index + 1}/{len(self.planned_steps)}) ---")
if step_name in self.completed_steps:
print(f"Skipping already completed step: '{step_name}'")
current_step_index += 1
continue
# Record length before generation in case planned_steps changes
length_before = len(self.planned_steps)
success = self.generate_step(step_name, force_regenerate_all)
length_after = len(self.planned_steps)
if success:
print(f"--- Successfully completed step: '{step_name}' ---")
# If the number of planned steps increased, it means task steps were added.
# The loop condition `current_step_index < len(self.planned_steps)`
# will naturally handle iterating through the newly added steps.
# We just need to increment the index to move to the *next* step
# in the potentially updated list.
current_step_index += 1
else:
print(f"--- Failed to complete step: '{step_name}'. Stopping generation. ---")
break # Stop processing if a step fails
print("\nSequential generation process finished.")
self.save_paper() # Save final result
self.save_usage()
print(f"Final paper saved for run '{self.name}' in '{self.output_path}'.")
print(f"Completed steps: {self.completed_steps}")
if current_step_index < len(self.planned_steps):
print(f"Next planned step was: {self.planned_steps[current_step_index]}")
def generate_paper(self, project_dir: str):
# Example usage
metadata = {
"team": "Agent",
"year": self.problem_year,
"problem_type": self.problem_type
}
json_file_path = f"{project_dir}/json/{self.problem_year}_{self.problem_type}.json"
with open(json_file_path, 'w+') as f:
json.dump(self.paper, f, indent=2)
code_dir = f'{project_dir}/code'
metadata['figures'] = [os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['png', 'jpg', 'jpeg']]
metadata['codes'] = sorted([os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['py']])
generate_paper_from_json(self.llm, self.paper, metadata, os.path.join(project_dir, 'latex'), 'solution')
# --- Example Usage ---
def create_generator(name):
"""Helper function to set up configuration and create the agent system."""
config = {
'top_method_num': 6,
'problem_analysis_round': 0,
'problem_modeling_round': 0,
'task_formulas_round': 0,
'tasknum': 4, # Default task number if not inferred
'chart_num': 0, # Set to > 0 to generate charts
'model_name': 'gpt-4o-mini', # Or your preferred model
"method_name": "MM-Agent-Refactored" # Name for the experiment/output folder
}
# Adjust paths relative to the script location or use absolute paths
base_data_path = '../data/actor_data' # Adjust if necessary
problem_file = os.path.join(base_data_path, 'input', 'problem', f'{name}.json')
dataset_input_path = os.path.join(base_data_path, 'input', 'dataset', name) # Path to check for dataset files
output_dir = os.path.join(base_data_path, 'exps', config["method_name"])
# Create a unique output path for this specific run
run_output_path = os.path.join(output_dir, f"{name}_{datetime.now().strftime('%Y%m%d-%H%M%S')}")
if not os.path.exists(problem_file):
print(f"Error: Problem file not found at {problem_file}")
return None
# Output path is created inside the class constructor now
# if not os.path.exists(output_dir):
# os.makedirs(output_dir)
multi_agent = ModelingAgentSystem(
problem_path=problem_file,
config=config,
dataset_path=dataset_input_path, # Pass the specific dataset path
output_path=run_output_path,
name=name
)
return multi_agent
if __name__ == "__main__":
problem_name = "2024_C" # Example problem name
agent_system = create_generator(problem_name)
if agent_system:
# --- Option 1: Run the whole process sequentially ---
agent_system.run_sequential()
# --- Option 2: Generate specific steps manually (Example) ---
# print("\n--- Manual Step Generation Example ---")
# # Assuming initialization is done in create_generator
# agent_system.generate_step('Problem Analysis')
# agent_system.generate_step('High-Level Modeling')
# agent_system.generate_step('Task Decomposition')
# agent_system.generate_step('Dependency Analysis') # Needed before task steps
# # Now planned_steps and dependencies should be updated
# print("Planned steps after decomp/dep analysis:", agent_system.get_planned_steps())
# print("Dependencies:", agent_system.dependencies) # View the updated dependencies
# # Try generating the first step of the first task in the order
# if agent_system.order:
# first_task_id = agent_system.order[0]
# agent_system.generate_step(f'Task {first_task_id} Description')
# agent_system.generate_step(f'Task {first_task_id} Analysis')
# # ... and so on
# else:
# print("Cannot run manual task steps, order not determined.")
# print("\n--- Final State after Manual Steps ---")
# print("Completed Steps:", agent_system.get_completed_steps())
# final_paper = agent_system.get_paper()
# print("Generated Paper Content (summary):")
# print(json.dumps(final_paper, indent=2, default=str)[:1000] + "\n...") # Print partial paper
# agent_system.save_paper() # Save the result
# agent_system.save_usage()
# --- Option 3: Iterate using the provided loop structure ---
# print("\n--- Iterative Generation Example ---")
# current_step_index = 0
# while current_step_index < len(agent_system.planned_steps):
# # Check if planned_steps changed during iteration
# if current_step_index >= len(agent_system.planned_steps):
# print("Reached end due to plan changes.")
# break
# step_name = agent_system.planned_steps[current_step_index]
# print(f"\nAttempting step ({current_step_index+1}/{len(agent_system.planned_steps)}): {step_name}")
# if step_name in agent_system.completed_steps:
# print(f"Skipping already completed step: '{step_name}'")
# current_step_index += 1
# continue
# success = agent_system.generate_step(step_name)
# if not success:
# print(f"Failed on step: {step_name}. Stopping.")
# break
# # Increment index regardless of whether plan changed,
# # the loop condition handles the updated length
# current_step_index += 1
# print("\n--- Final State after Iterative Loop ---")
# print("Completed Steps:", agent_system.get_completed_steps())
# final_paper = agent_system.get_paper()
# # print("Generated Paper Content (full):")
# # print(json.dumps(final_paper, indent=2, default=str))
# agent_system.save_paper() # Save the result
# agent_system.save_usage()