|
|
import sys |
|
|
sys.path.append('../core') |
|
|
|
|
|
|
|
|
import os |
|
|
import shutil |
|
|
from datetime import datetime |
|
|
import json |
|
|
import re |
|
|
|
|
|
|
|
|
from llm.llm import LLM |
|
|
|
|
|
from input.problem import problem_input |
|
|
from agent.problem_analysis import ProblemAnalysis |
|
|
|
|
|
from agent.problem_modeling import ProblemModeling |
|
|
from agent.task_decompse import TaskDecompose |
|
|
from agent.task import Task |
|
|
from agent.create_charts import Chart |
|
|
from agent.coordinator import Coordinator |
|
|
from utils.utils import read_json_file, write_json_file, write_text_file, json_to_markdown |
|
|
from prompt.template import TASK_ANALYSIS_APPEND_PROMPT, TASK_FORMULAS_APPEND_PROMPT, TASK_MODELING_APPEND_PROMPT |
|
|
from utils.generate_paper import generate_paper_from_json |
|
|
|
|
|
from prompt.constants import modeling_methods |
|
|
|
|
|
def mkdir_output(path): |
|
|
"""Creates the necessary output directories.""" |
|
|
os.makedirs(path, exist_ok=True) |
|
|
os.makedirs(os.path.join(path, 'json'), exist_ok=True) |
|
|
os.makedirs(os.path.join(path, 'markdown'), exist_ok=True) |
|
|
os.makedirs(os.path.join(path, 'latex'), exist_ok=True) |
|
|
os.makedirs(os.path.join(path, 'code'), exist_ok=True) |
|
|
os.makedirs(os.path.join(path, 'usage'), exist_ok=True) |
|
|
os.makedirs(os.path.join(path, 'intermediate'), exist_ok=True) |
|
|
|
|
|
class ModelingAgentSystem: |
|
|
""" |
|
|
Manages the step-by-step generation of a mathematical modeling report. |
|
|
Allows for granular control over section generation and tracks progress. |
|
|
""" |
|
|
def __init__(self, problem_path: str, config: dict, dataset_path: str, output_path: str, name: str): |
|
|
""" |
|
|
Initializes the Modeling Agent System. |
|
|
|
|
|
Args: |
|
|
problem_path: Path to the problem description file (e.g., JSON). |
|
|
config: Dictionary containing configuration parameters (model_name, rounds, etc.). |
|
|
dataset_path: Path to the dataset directory associated with the problem. |
|
|
output_path: Path where generated outputs (json, md, code, etc.) will be saved. |
|
|
name: A unique name for this run/problem (used in filenames). |
|
|
""" |
|
|
self.problem_path = problem_path |
|
|
self.config = config |
|
|
self.dataset_path = dataset_path |
|
|
self.output_path = output_path |
|
|
self.name = name |
|
|
|
|
|
|
|
|
self.paper = {'tasks': []} |
|
|
self.completed_steps = set() |
|
|
self.planned_steps = [] |
|
|
self.dependencies = self._define_dependencies() |
|
|
|
|
|
|
|
|
self.llm = LLM(config['model_name']) |
|
|
self.pa = ProblemAnalysis(self.llm) |
|
|
|
|
|
self.pm = ProblemModeling(self.llm) |
|
|
self.td = TaskDecompose(self.llm) |
|
|
self.task = Task(self.llm) |
|
|
self.chart = Chart(self.llm) |
|
|
self.coordinator = Coordinator(self.llm) |
|
|
|
|
|
|
|
|
self.problem_str: str | None = None |
|
|
self.problem: dict | None = None |
|
|
self.problem_type: str | None = None |
|
|
self.problem_year: str | None = None |
|
|
self.problem_analysis: str | None = None |
|
|
self.modeling_solution: str | None = None |
|
|
self.task_descriptions: list[str] | None = None |
|
|
self.order: list[int] | None = None |
|
|
self.with_code: bool = False |
|
|
|
|
|
|
|
|
mkdir_output(self.output_path) |
|
|
self._initialize_problem_and_steps() |
|
|
print(f"Initialization complete. Starting steps: {self.planned_steps}") |
|
|
print(f"Already completed: {self.completed_steps}") |
|
|
|
|
|
|
|
|
def _define_dependencies(self): |
|
|
"""Defines the prerequisite steps for each generation step.""" |
|
|
|
|
|
deps = { |
|
|
'Problem Background': [], |
|
|
'Problem Requirement': [], |
|
|
'Problem Analysis': ['Problem Background', 'Problem Requirement'], |
|
|
'High-Level Modeling': ['Problem Analysis'], |
|
|
'Task Decomposition': ['High-Level Modeling'], |
|
|
'Dependency Analysis': ['Task Decomposition'], |
|
|
|
|
|
} |
|
|
return deps |
|
|
|
|
|
def _update_dependencies_after_decomp(self): |
|
|
"""Updates dependencies for task-specific steps after decomposition and dependency analysis.""" |
|
|
if not self.order: |
|
|
print("Warning: Task order not determined. Cannot update task dependencies.") |
|
|
return |
|
|
|
|
|
num_tasks = len(self.task_descriptions) |
|
|
for i in range(1, num_tasks + 1): |
|
|
task_id = str(i) |
|
|
task_prereqs = [f'Task {dep_id} Subtask Outcome Analysis' for dep_id in self.coordinator.DAG.get(task_id, [])] |
|
|
|
|
|
base_task_prereqs = ['Dependency Analysis'] + task_prereqs |
|
|
|
|
|
self.dependencies[f'Task {i} Description'] = ['Task Decomposition'] |
|
|
self.dependencies[f'Task {i} Analysis'] = [f'Task {i} Description'] + base_task_prereqs |
|
|
self.dependencies[f'Task {i} Preliminary Formulas'] = [f'Task {i} Analysis'] |
|
|
self.dependencies[f'Task {i} Mathematical Modeling Process'] = [f'Task {i} Preliminary Formulas'] |
|
|
if self.with_code: |
|
|
self.dependencies[f'Task {i} Code'] = [f'Task {i} Mathematical Modeling Process'] |
|
|
self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Code'] |
|
|
else: |
|
|
|
|
|
self.dependencies[f'Task {i} Solution Interpretation'] = [f'Task {i} Mathematical Modeling Process'] |
|
|
self.dependencies[f'Task {i} Subtask Outcome Analysis'] = [f'Task {i} Solution Interpretation'] |
|
|
self.dependencies[f'Task {i} Charts'] = [f'Task {i} Subtask Outcome Analysis'] |
|
|
|
|
|
|
|
|
def _initialize_problem_and_steps(self): |
|
|
"""Loads the problem input and sets up the initial state.""" |
|
|
print("Loading problem input...") |
|
|
self.problem_str, self.problem = problem_input(self.problem_path, self.llm) |
|
|
filename = os.path.splitext(os.path.basename(self.problem_path))[0] |
|
|
if '_' in filename: |
|
|
self.problem_year, self.problem_type = filename.split('_')[:2] |
|
|
else: |
|
|
self.problem_type = 'X' |
|
|
self.problem_year = 'XXXX' |
|
|
|
|
|
self.paper['problem_background'] = self.problem['background'] |
|
|
self.paper['problem_requirement'] = self.problem['problem_requirement'] |
|
|
self.completed_steps.add('Problem Background') |
|
|
self.completed_steps.add('Problem Requirement') |
|
|
|
|
|
self.with_code = len(self.problem.get('dataset_path', '')) > 0 or len(self.dataset_path) > 0 |
|
|
|
|
|
if self.with_code and os.path.exists(self.dataset_path): |
|
|
print(f"Copying dataset from {self.dataset_path} to {os.path.join(self.output_path, 'code')}") |
|
|
shutil.copytree(self.dataset_path, os.path.join(self.output_path, 'code'), dirs_exist_ok=True) |
|
|
elif self.with_code: |
|
|
print(f"Warning: Code execution expected, but dataset path '{self.dataset_path}' not found.") |
|
|
|
|
|
|
|
|
|
|
|
self.planned_steps = [ |
|
|
'Problem Background', |
|
|
'Problem Requirement', |
|
|
'Problem Analysis', |
|
|
'High-Level Modeling', |
|
|
'Task Decomposition', |
|
|
'Dependency Analysis' |
|
|
] |
|
|
|
|
|
def _check_dependencies(self, step_name: str) -> bool: |
|
|
"""Checks if all prerequisites for a given step are completed.""" |
|
|
if step_name not in self.dependencies: |
|
|
print(f"Warning: No dependency information defined for step '{step_name}'. Assuming runnable.") |
|
|
return True |
|
|
|
|
|
prerequisites = self.dependencies.get(step_name, []) |
|
|
for prereq in prerequisites: |
|
|
if prereq not in self.completed_steps: |
|
|
print(f"Dependency Error: Step '{step_name}' requires '{prereq}', which is not completed.") |
|
|
return False |
|
|
return True |
|
|
|
|
|
def _update_planned_steps_after_decomp(self): |
|
|
"""Adds all task-specific steps to the planned steps list.""" |
|
|
if not self.task_descriptions or self.order is None: |
|
|
print("Error: Cannot update planned steps. Task decomposition or dependency analysis incomplete.") |
|
|
return |
|
|
|
|
|
task_step_templates = [ |
|
|
'Description', |
|
|
'Analysis', |
|
|
'Preliminary Formulas', |
|
|
'Mathematical Modeling Process', |
|
|
'Code' if self.with_code else None, |
|
|
'Solution Interpretation', |
|
|
'Subtask Outcome Analysis', |
|
|
'Charts', |
|
|
] |
|
|
|
|
|
task_step_templates = [t for t in task_step_templates if t] |
|
|
|
|
|
new_task_steps = [] |
|
|
|
|
|
for task_id_int in self.order: |
|
|
for template in task_step_templates: |
|
|
new_task_steps.append(f'Task {task_id_int} {template}') |
|
|
|
|
|
|
|
|
dep_analysis_index = self.planned_steps.index('Dependency Analysis') |
|
|
self.planned_steps = self.planned_steps[:dep_analysis_index+1] + new_task_steps |
|
|
|
|
|
|
|
|
self.paper['tasks'] = [{} for _ in range(len(self.task_descriptions))] |
|
|
|
|
|
|
|
|
|
|
|
def get_completed_steps(self) -> set: |
|
|
"""Returns the set of names of completed steps.""" |
|
|
return self.completed_steps |
|
|
|
|
|
def get_planned_steps(self) -> list: |
|
|
"""Returns the list of names of planned steps (including completed).""" |
|
|
return self.planned_steps |
|
|
|
|
|
def get_paper(self) -> dict: |
|
|
"""Returns the current state of the generated paper dictionary.""" |
|
|
|
|
|
|
|
|
return self.paper |
|
|
|
|
|
def save_paper(self, intermediate=False): |
|
|
"""Saves the current paper state to files.""" |
|
|
filename = f"{self.name}_intermediate_{datetime.now().strftime('%Y%m%d%H%M%S')}" if intermediate else self.name |
|
|
json_path = os.path.join(self.output_path, 'json', f"{filename}.json") |
|
|
md_path = os.path.join(self.output_path, 'markdown', f"{filename}.md") |
|
|
|
|
|
|
|
|
write_json_file(json_path, self.paper) |
|
|
markdown_str = json_to_markdown(self.paper) |
|
|
write_text_file(md_path, markdown_str) |
|
|
|
|
|
print(f"Saved paper snapshot to {json_path} and {md_path}") |
|
|
|
|
|
def save_usage(self): |
|
|
"""Saves the LLM usage statistics.""" |
|
|
usage_path = os.path.join(self.output_path, 'usage', f"{self.name}.json") |
|
|
write_json_file(usage_path, self.llm.get_total_usage()) |
|
|
print(f"Saved LLM usage to {usage_path}") |
|
|
print(f"Total Usage: {self.llm.get_total_usage()}") |
|
|
|
|
|
|
|
|
|
|
|
def _generate_problem_analysis(self, user_prompt: str = '', round: int = 0): |
|
|
print("Generating: Problem Analysis") |
|
|
self.problem_analysis = self.pa.analysis( |
|
|
self.problem_str, |
|
|
round=round if round > 0 else self.config.get('problem_analysis_round', 0), |
|
|
user_prompt=user_prompt |
|
|
) |
|
|
self.paper['problem_analysis'] = self.problem_analysis |
|
|
print("Completed: Problem Analysis") |
|
|
|
|
|
def _generate_high_level_modeling(self, user_prompt: str = '', round: int = 0): |
|
|
print("Generating: High-Level Modeling") |
|
|
|
|
|
self.modeling_solution = self.pm.modeling( |
|
|
self.problem_str, |
|
|
self.problem_analysis, |
|
|
"", |
|
|
round=round if round > 0 else self.config.get('problem_modeling_round', 0), |
|
|
user_prompt=user_prompt |
|
|
) |
|
|
self.paper['high_level_modeling'] = self.modeling_solution |
|
|
print("Completed: High-Level Modeling") |
|
|
|
|
|
def _generate_task_decomposition(self, user_prompt: str = ''): |
|
|
print("Generating: Task Decomposition") |
|
|
self.task_descriptions = self.td.decompose_and_refine( |
|
|
self.problem_str, |
|
|
self.problem_analysis, |
|
|
self.modeling_solution, |
|
|
self.problem_type, |
|
|
self.config.get('tasknum', 4), |
|
|
user_prompt=user_prompt |
|
|
) |
|
|
self.paper['task_decomposition_summary'] = "\n".join([f"Task {i+1}: {desc}" for i, desc in enumerate(self.task_descriptions)]) |
|
|
print(f"Completed: Task Decomposition ({len(self.task_descriptions)} tasks)") |
|
|
|
|
|
|
|
|
|
|
|
def _generate_dependency_analysis(self): |
|
|
print("Generating: Dependency Analysis") |
|
|
self.order = self.coordinator.analyze_dependencies( |
|
|
self.problem_str, |
|
|
self.problem_analysis, |
|
|
self.modeling_solution, |
|
|
self.task_descriptions, |
|
|
self.with_code |
|
|
) |
|
|
self.order = [int(i) for i in self.order] |
|
|
self.paper['task_execution_order'] = self.order |
|
|
self.paper['task_dependency_analysis'] = self.coordinator.task_dependency_analysis |
|
|
print(f"Completed: Dependency Analysis. Execution order: {self.order}") |
|
|
|
|
|
self._update_planned_steps_after_decomp() |
|
|
self._update_dependencies_after_decomp() |
|
|
print(f"Updated planned steps: {self.planned_steps}") |
|
|
|
|
|
def _generate_task_step(self, task_id: int, step_type: str, user_prompt: str = '', round: int = 0): |
|
|
"""Handles generation for a specific step within a specific task.""" |
|
|
print(f"Generating: Task {task_id} {step_type}") |
|
|
task_index = task_id - 1 |
|
|
|
|
|
|
|
|
if task_index >= len(self.paper['tasks']): |
|
|
print(f"Error: Task index {task_index} out of bounds for self.paper['tasks'].") |
|
|
return False |
|
|
|
|
|
|
|
|
task_description = self.task_descriptions[task_index] |
|
|
|
|
|
current_task_dict = self.paper['tasks'][task_index] |
|
|
task_analysis = current_task_dict.get('task_analysis') |
|
|
task_formulas = current_task_dict.get('preliminary_formulas') |
|
|
task_modeling = current_task_dict.get('mathematical_modeling_process') |
|
|
task_code = current_task_dict.get('task_code') |
|
|
execution_result = current_task_dict.get('execution_result') |
|
|
task_result = current_task_dict.get('solution_interpretation') |
|
|
|
|
|
|
|
|
|
|
|
task_dependency_ids = [int(i) for i in self.coordinator.DAG.get(str(task_id), [])] |
|
|
dependency_prompt = "" |
|
|
dependent_file_prompt = "" |
|
|
|
|
|
if len(task_dependency_ids) > 0: |
|
|
|
|
|
rationale = "" |
|
|
if self.coordinator.task_dependency_analysis and task_index < len(self.coordinator.task_dependency_analysis): |
|
|
rationale = self.coordinator.task_dependency_analysis[task_index] |
|
|
else: |
|
|
print(f"Warning: Could not find dependency rationale for Task {task_id}") |
|
|
|
|
|
dependency_prompt = f"This task is Task {task_id}, which depends on the following tasks: {task_dependency_ids}. The dependencies for this task are analyzed as follows: {rationale}\n" |
|
|
|
|
|
for dep_id in task_dependency_ids: |
|
|
dep_task_index = dep_id - 1 |
|
|
if dep_task_index < 0 or dep_task_index >= len(self.paper['tasks']): |
|
|
print(f"Warning: Cannot build dependency prompt. Dependent Task {dep_id} data not found.") |
|
|
continue |
|
|
|
|
|
dep_task_dict = self.paper['tasks'][dep_task_index] |
|
|
|
|
|
dep_mem_dict = self.coordinator.memory.get(str(dep_id), {}) |
|
|
dep_code_mem_dict = self.coordinator.code_memory.get(str(dep_id), {}) |
|
|
|
|
|
|
|
|
dependency_prompt += f"---\n# The Description of Task {dep_id}:\n{dep_task_dict.get('task_description', dep_mem_dict.get('task_description', 'N/A'))}\n" |
|
|
dependency_prompt += f"# The modeling method for Task {dep_id}:\n{dep_task_dict.get('mathematical_modeling_process', dep_mem_dict.get('mathematical_modeling_process', 'N/A'))}\n" |
|
|
|
|
|
if self.with_code: |
|
|
|
|
|
code_structure_str = json.dumps(dep_task_dict.get('code_structure', dep_code_mem_dict), indent=2) if dep_task_dict.get('code_structure', dep_code_mem_dict) else "{}" |
|
|
dependency_prompt += f"# The structure of code for Task {dep_id}:\n{code_structure_str}\n" |
|
|
dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n" |
|
|
|
|
|
dependent_file_prompt += f"# The files generated by code for Task {dep_id}:\n{code_structure_str}\n" |
|
|
else: |
|
|
dependency_prompt += f"# The result for Task {dep_id}:\n{dep_task_dict.get('solution_interpretation', dep_mem_dict.get('solution_interpretation', 'N/A'))}\n---\n" |
|
|
|
|
|
|
|
|
task_analysis_prompt = dependency_prompt + TASK_ANALYSIS_APPEND_PROMPT if step_type == 'Analysis' else dependency_prompt |
|
|
task_formulas_prompt = dependency_prompt + TASK_FORMULAS_APPEND_PROMPT if step_type == 'Preliminary Formulas' else dependency_prompt |
|
|
task_modeling_prompt = dependency_prompt + TASK_MODELING_APPEND_PROMPT if step_type == 'Mathematical Modeling Process' else dependency_prompt |
|
|
|
|
|
|
|
|
success = True |
|
|
try: |
|
|
if step_type == 'Description': |
|
|
|
|
|
self.paper['tasks'][task_index]['task_description'] = task_description |
|
|
|
|
|
if str(task_id) not in self.coordinator.memory: self.coordinator.memory[str(task_id)] = {} |
|
|
self.coordinator.memory[str(task_id)]['task_description'] = task_description |
|
|
|
|
|
elif step_type == 'Analysis': |
|
|
task_analysis = self.task.analysis( |
|
|
task_analysis_prompt, |
|
|
task_description, |
|
|
user_prompt=user_prompt |
|
|
) |
|
|
self.paper['tasks'][task_index]['task_analysis'] = task_analysis |
|
|
self.coordinator.memory[str(task_id)]['task_analysis'] = task_analysis |
|
|
|
|
|
|
|
|
elif step_type == 'Preliminary Formulas': |
|
|
if not task_analysis: raise ValueError(f"Task {task_id} Analysis is missing.") |
|
|
description_and_analysis = f'## Task Description\n{task_description}\n\n## Task Analysis\n{task_analysis}' |
|
|
top_modeling_methods = modeling_methods |
|
|
task_formulas = self.task.formulas( |
|
|
task_formulas_prompt, |
|
|
self.problem.get('data_description', ''), |
|
|
task_description, |
|
|
task_analysis, |
|
|
top_modeling_methods, |
|
|
round=round if round > 0 else self.config.get('task_formulas_round', 0), |
|
|
user_prompt=user_prompt |
|
|
) |
|
|
self.paper['tasks'][task_index]['preliminary_formulas'] = task_formulas |
|
|
self.coordinator.memory[str(task_id)]['preliminary_formulas'] = task_formulas |
|
|
|
|
|
|
|
|
elif step_type == 'Mathematical Modeling Process': |
|
|
if not task_analysis or not task_formulas: raise ValueError(f"Task {task_id} Analysis or Formulas missing.") |
|
|
task_modeling = self.task.modeling( |
|
|
task_modeling_prompt, |
|
|
self.problem.get('data_description', ''), |
|
|
task_description, |
|
|
task_analysis, |
|
|
task_formulas, |
|
|
round=round if round > 0 else self.config.get('task_modeling_round', 0), |
|
|
user_prompt=user_prompt |
|
|
) |
|
|
self.paper['tasks'][task_index]['mathematical_modeling_process'] = task_modeling |
|
|
self.coordinator.memory[str(task_id)]['mathematical_modeling_process'] = task_modeling |
|
|
|
|
|
|
|
|
elif step_type == 'Code' and self.with_code: |
|
|
if not task_analysis or not task_formulas or not task_modeling: |
|
|
raise ValueError(f"Task {task_id} Analysis, Formulas, or Modeling missing for coding.") |
|
|
|
|
|
code_template_path = os.path.join('../data/actor_data/input/code_template', f'main{task_id}.py') |
|
|
code_template = "" |
|
|
if os.path.exists(code_template_path): |
|
|
with open(code_template_path, 'r') as f: |
|
|
code_template = f.read() |
|
|
else: |
|
|
print(f"Warning: Code template not found at {code_template_path}. Using empty template.") |
|
|
|
|
|
save_path = os.path.join(self.output_path, 'code', f'main{task_id}.py') |
|
|
work_dir = os.path.join(self.output_path, 'code') |
|
|
script_name = f'main{task_id}.py' |
|
|
dataset_input_path = self.problem.get('dataset_path') or self.dataset_path |
|
|
|
|
|
task_code, is_pass, execution_result = self.task.coding( |
|
|
dataset_input_path, |
|
|
self.problem.get('data_description', ''), |
|
|
self.problem.get('variable_description', ''), |
|
|
task_description, |
|
|
task_analysis, |
|
|
task_formulas, |
|
|
task_modeling, |
|
|
dependent_file_prompt, |
|
|
code_template, |
|
|
script_name, |
|
|
work_dir, |
|
|
try_num=5, |
|
|
round=round if round > 0 else 1, |
|
|
user_prompt=user_prompt |
|
|
) |
|
|
code_structure = self.task.extract_code_structure(task_id, task_code, save_path) |
|
|
|
|
|
|
|
|
self.paper['tasks'][task_index]['task_code'] = '```Python\n' + task_code + '\n```' |
|
|
self.paper['tasks'][task_index]['is_pass'] = is_pass |
|
|
self.paper['tasks'][task_index]['execution_result'] = execution_result |
|
|
self.paper['tasks'][task_index]['code_structure'] = code_structure |
|
|
|
|
|
self.coordinator.code_memory[str(task_id)] = code_structure |
|
|
|
|
|
|
|
|
elif step_type == 'Solution Interpretation': |
|
|
if not task_modeling: raise ValueError(f"Task {task_id} Modeling is missing.") |
|
|
if self.with_code and execution_result is None: raise ValueError(f"Task {task_id} Code execution result is missing.") |
|
|
|
|
|
task_result = self.task.result( |
|
|
task_description, |
|
|
task_analysis, |
|
|
task_formulas, |
|
|
task_modeling, |
|
|
user_prompt=user_prompt, |
|
|
execution_result=execution_result if self.with_code else '' |
|
|
) |
|
|
self.paper['tasks'][task_index]['solution_interpretation'] = task_result |
|
|
self.coordinator.memory[str(task_id)]['solution_interpretation'] = task_result |
|
|
|
|
|
|
|
|
elif step_type == 'Subtask Outcome Analysis': |
|
|
if not task_result: raise ValueError(f"Task {task_id} Solution Interpretation is missing.") |
|
|
task_answer = self.task.answer( |
|
|
task_description, |
|
|
task_analysis, |
|
|
task_formulas, |
|
|
task_modeling, |
|
|
task_result, |
|
|
user_prompt=user_prompt |
|
|
) |
|
|
self.paper['tasks'][task_index]['subtask_outcome_analysis'] = task_answer |
|
|
self.coordinator.memory[str(task_id)]['subtask_outcome_analysis'] = task_answer |
|
|
|
|
|
|
|
|
elif step_type == 'Charts': |
|
|
|
|
|
full_task_dict_str = json.dumps(self.paper['tasks'][task_index], indent=2) |
|
|
charts = self.chart.create_charts( |
|
|
full_task_dict_str, |
|
|
self.config.get('chart_num', 0), |
|
|
user_prompt=user_prompt |
|
|
) |
|
|
self.paper['tasks'][task_index]['charts'] = charts |
|
|
self.coordinator.memory[str(task_id)]['charts'] = charts |
|
|
|
|
|
else: |
|
|
print(f"Warning: Unknown step type '{step_type}' for Task {task_id}.") |
|
|
success = False |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error generating Task {task_id} {step_type}: {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
success = False |
|
|
|
|
|
if success: |
|
|
print(f"Completed: Task {task_id} {step_type}") |
|
|
return success |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_step(self, step_name: str, user_prompt: str = '', round: int = 0, force_regenerate: bool = True) -> bool: |
|
|
""" |
|
|
Generates the content for a specific step, checking dependencies first. |
|
|
|
|
|
Args: |
|
|
step_name: The name of the step to generate (e.g., 'Problem Analysis', 'Task 1 Preliminary Formulas'). |
|
|
user_prompt: Optional user guidance to influence the generation. |
|
|
round: Number of improvement rounds to apply (where applicable). |
|
|
force_regenerate: If True, regenerate the step even if it's already completed. |
|
|
|
|
|
Returns: |
|
|
True if the step was generated successfully (or was already complete), False otherwise. |
|
|
""" |
|
|
if step_name in self.completed_steps and not force_regenerate: |
|
|
print(f"Skipping already completed step: '{step_name}'") |
|
|
return True |
|
|
|
|
|
if step_name in self.completed_steps and force_regenerate: |
|
|
print(f"Regenerating step: '{step_name}'") |
|
|
|
|
|
self.completed_steps.remove(step_name) |
|
|
|
|
|
if not self._check_dependencies(step_name): |
|
|
print(f"Cannot generate step '{step_name}' due to unmet dependencies.") |
|
|
return False |
|
|
|
|
|
|
|
|
success = False |
|
|
try: |
|
|
if step_name == 'Problem Analysis': |
|
|
self._generate_problem_analysis(user_prompt, round) |
|
|
success = True |
|
|
elif step_name == 'High-Level Modeling': |
|
|
self._generate_high_level_modeling(user_prompt, round) |
|
|
success = True |
|
|
elif step_name == 'Task Decomposition': |
|
|
self._generate_task_decomposition(user_prompt) |
|
|
success = True |
|
|
elif step_name == 'Dependency Analysis': |
|
|
self._generate_dependency_analysis() |
|
|
success = True |
|
|
elif step_name.startswith('Task '): |
|
|
|
|
|
match = re.match(r"Task (\d+) (.*)", step_name) |
|
|
if match: |
|
|
task_id = int(match.group(1)) |
|
|
step_type = match.group(2) |
|
|
|
|
|
if self.order and task_id in self.order: |
|
|
success = self._generate_task_step(task_id, step_type, user_prompt, round) |
|
|
elif not self.order: |
|
|
print(f"Error: Cannot generate task step '{step_name}'. Task order not determined yet.") |
|
|
success = False |
|
|
else: |
|
|
print(f"Error: Cannot generate task step '{step_name}'. Task ID {task_id} not found in execution order {self.order}.") |
|
|
success = False |
|
|
else: |
|
|
print(f"Error: Could not parse task step name: '{step_name}'") |
|
|
success = False |
|
|
else: |
|
|
|
|
|
if step_name in ['Problem Background', 'Problem Requirement']: |
|
|
print(f"Step '{step_name}' completed during initialization.") |
|
|
success = True |
|
|
else: |
|
|
print(f"Error: Unknown step name: '{step_name}'") |
|
|
success = False |
|
|
|
|
|
if success: |
|
|
self.completed_steps.add(step_name) |
|
|
|
|
|
|
|
|
except Exception as e: |
|
|
print(f"Critical error during generation of step '{step_name}': {e}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
success = False |
|
|
|
|
|
return success |
|
|
|
|
|
def run_sequential(self, force_regenerate_all: bool = False): |
|
|
""" |
|
|
Runs the entire generation process sequentially, step by step. |
|
|
|
|
|
Args: |
|
|
force_regenerate_all: If True, regenerate all steps even if already completed. |
|
|
""" |
|
|
print("Starting sequential generation...") |
|
|
current_step_index = 0 |
|
|
|
|
|
|
|
|
if force_regenerate_all: |
|
|
print("Force regenerating all steps...") |
|
|
self.completed_steps.clear() |
|
|
|
|
|
while current_step_index < len(self.planned_steps): |
|
|
|
|
|
if current_step_index >= len(self.planned_steps): |
|
|
print("Reached end of planned steps.") |
|
|
break |
|
|
|
|
|
step_name = self.planned_steps[current_step_index] |
|
|
|
|
|
print(f"\n--- Attempting Step: {step_name} ({current_step_index + 1}/{len(self.planned_steps)}) ---") |
|
|
|
|
|
if step_name in self.completed_steps: |
|
|
print(f"Skipping already completed step: '{step_name}'") |
|
|
current_step_index += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
length_before = len(self.planned_steps) |
|
|
success = self.generate_step(step_name, force_regenerate_all) |
|
|
length_after = len(self.planned_steps) |
|
|
|
|
|
if success: |
|
|
print(f"--- Successfully completed step: '{step_name}' ---") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current_step_index += 1 |
|
|
else: |
|
|
print(f"--- Failed to complete step: '{step_name}'. Stopping generation. ---") |
|
|
break |
|
|
|
|
|
print("\nSequential generation process finished.") |
|
|
self.save_paper() |
|
|
self.save_usage() |
|
|
print(f"Final paper saved for run '{self.name}' in '{self.output_path}'.") |
|
|
print(f"Completed steps: {self.completed_steps}") |
|
|
if current_step_index < len(self.planned_steps): |
|
|
print(f"Next planned step was: {self.planned_steps[current_step_index]}") |
|
|
|
|
|
def generate_paper(self, project_dir: str): |
|
|
|
|
|
metadata = { |
|
|
"team": "Agent", |
|
|
"year": self.problem_year, |
|
|
"problem_type": self.problem_type |
|
|
} |
|
|
json_file_path = f"{project_dir}/json/{self.problem_year}_{self.problem_type}.json" |
|
|
with open(json_file_path, 'w+') as f: |
|
|
json.dump(self.paper, f, indent=2) |
|
|
code_dir = f'{project_dir}/code' |
|
|
metadata['figures'] = [os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['png', 'jpg', 'jpeg']] |
|
|
metadata['codes'] = sorted([os.path.join(code_dir, f) for f in os.listdir(code_dir) if f.lower().split('.')[-1] in ['py']]) |
|
|
generate_paper_from_json(self.llm, self.paper, metadata, os.path.join(project_dir, 'latex'), 'solution') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_generator(name): |
|
|
"""Helper function to set up configuration and create the agent system.""" |
|
|
config = { |
|
|
'top_method_num': 6, |
|
|
'problem_analysis_round': 0, |
|
|
'problem_modeling_round': 0, |
|
|
'task_formulas_round': 0, |
|
|
'tasknum': 4, |
|
|
'chart_num': 0, |
|
|
'model_name': 'gpt-4o-mini', |
|
|
"method_name": "MM-Agent-Refactored" |
|
|
} |
|
|
|
|
|
|
|
|
base_data_path = '../data/actor_data' |
|
|
problem_file = os.path.join(base_data_path, 'input', 'problem', f'{name}.json') |
|
|
dataset_input_path = os.path.join(base_data_path, 'input', 'dataset', name) |
|
|
output_dir = os.path.join(base_data_path, 'exps', config["method_name"]) |
|
|
|
|
|
run_output_path = os.path.join(output_dir, f"{name}_{datetime.now().strftime('%Y%m%d-%H%M%S')}") |
|
|
|
|
|
if not os.path.exists(problem_file): |
|
|
print(f"Error: Problem file not found at {problem_file}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
multi_agent = ModelingAgentSystem( |
|
|
problem_path=problem_file, |
|
|
config=config, |
|
|
dataset_path=dataset_input_path, |
|
|
output_path=run_output_path, |
|
|
name=name |
|
|
) |
|
|
return multi_agent |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
problem_name = "2024_C" |
|
|
agent_system = create_generator(problem_name) |
|
|
|
|
|
if agent_system: |
|
|
|
|
|
agent_system.run_sequential() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|