from llm.llm import LLM
from prompt.constants import modeling_methods
from input.problem import problem_input
# from input.test_middle_result import problem_str, problem_analysis, selected_models, modeling_solution, modeling_solution, task_descriptions
from agent.problem_analysis import ProblemAnalysis
from agent.method_ranking import MethodRanking
from agent.problem_modeling import ProblemModeling
from agent.task_decompse import TaskDecompose
from agent.task import Task
from agent.create_charts import Chart
from agent.coordinator import Coordinator
from utils.utils import read_json_file, write_json_file, write_text_file, json_to_markdown
from prompt.template import TASK_ANALYSIS_APPEND_PROMPT, TASK_FORMULAS_APPEND_PROMPT, TASK_MODELING_APPEND_PROMPT
# from utils.convert_format import markdown_to_latex
import os
from datetime import datetime
import shutil
import time
def run_batch(problem_path, config, name, dataset_path, output_path):
# Initialize LLM
llm = LLM(config['model_name'])
# Get problem input
problem_str, problem = problem_input(problem_path, llm)
problem_type = os.path.splitext(os.path.basename(problem_path))[0].split('_')[-1]
# Initialize paper dictionary
paper = {'tasks': []}
paper['problem_background'] = problem['background']
paper['problem_requirement'] = problem['problem_requirement']
# Problem analysis
pa = ProblemAnalysis(llm)
problem_analysis = pa.analysis(problem_str, round=config['problem_analysis_round'])
paper['problem_analysis'] = problem_analysis
modeling_methods = ""
# High level probelm understanding modeling
pm = ProblemModeling(llm)
modeling_solution = pm.modeling(problem_str, problem_analysis, modeling_methods, round=config['problem_modeling_round'])
# Task decomposition
td = TaskDecompose(llm)
task_descriptions = td.decompose_and_refine(problem_str, problem_analysis, modeling_solution, problem_type, config['tasknum'])
# Analyze dependency
with_code = len(problem['dataset_path']) > 0
coordinator = Coordinator(llm)
order = coordinator.analyze_dependencies(problem_str, problem_analysis, modeling_solution, task_descriptions, with_code)
order = [int(i) for i in order]
if with_code:
shutil.copytree(dataset_path, os.path.join(output_path,'code'), dirs_exist_ok=True)
# Process tasks
task = Task(llm)
mr = MethodRanking(llm)
chart = Chart(llm)
for id in order:
task_dependency = [int(i) for i in coordinator.DAG[str(id)]]
dependent_file_prompt = ""
if len(task_dependency) > 0:
dependency_prompt = f"""\
This task is Task {id}, which depends on the following tasks: {task_dependency}. The dependencies for this task are analyzed as follows: {coordinator.task_dependency_analysis[id - 1]}
"""
for task_id in task_dependency:
dependency_prompt += f"""\
---
# The Description of Task {task_id}:
{coordinator.memory[str(task_id)]['task_description']}
# The modeling method for Task {task_id}:
{coordinator.memory[str(task_id)]['mathematical_modeling_process']}
"""
if with_code:
dependency_prompt += f"""\
# The structure of code for Task {task_id}:
{coordinator.code_memory[str(task_id)]}
# The result for Task {task_id}:
{coordinator.memory[str(task_id)]['solution_interpretation']}
---
"""
dependent_file_prompt += f"""\
# The files generated by code for Task {task_id}:
{coordinator.code_memory[str(task_id)]}
"""
coordinator.code_memory[str(task_id)]['file_outputs']
else:
dependency_prompt += f"""\
# The result for Task {task_id}:
{coordinator.memory[str(task_id)]['solution_interpretation']}
---
"""
task_analysis_prompt = dependency_prompt + TASK_ANALYSIS_APPEND_PROMPT
task_formulas_prompt = dependency_prompt + TASK_FORMULAS_APPEND_PROMPT
task_modeling_prompt = dependency_prompt + TASK_MODELING_APPEND_PROMPT
else:
task_analysis_prompt = ""
task_formulas_prompt = ""
task_modeling_prompt = ""
code_template = open(os.path.join('data/actor_data/input/code_template','main{}.py'.format(id))).read()
save_path = os.path.join(output_path,'code/main{}.py'.format(id))
work_dir = os.path.join(output_path,'code')
script_name = 'main{}.py'.format(id)
task_description = task_descriptions[id - 1]
task_analysis = task.analysis(task_analysis_prompt, task_description)
description_and_analysis = f'## Task Description\n{task_description}\n\n## Task Analysis\n{task_analysis}'
top_modeling_methods = mr.top_methods(description_and_analysis, top_k=config['top_method_num'])
task_formulas = task.formulas(task_formulas_prompt, problem['data_description'], task_description, task_analysis, top_modeling_methods, round=config['task_formulas_round'])
task_modeling = task.modeling(task_modeling_prompt, problem['data_description'], task_description, task_analysis, task_formulas)
if with_code:
task_code, is_pass, execution_result = task.coding(problem['dataset_path'], problem['data_description'], problem['variable_description'], task_description, task_analysis, task_formulas, task_modeling, dependent_file_prompt, code_template, script_name, work_dir)
code_structure = task.extract_code_structure(id, task_code, save_path)
task_result = task.result(task_description, task_analysis, task_formulas, task_modeling, execution_result)
task_answer = task.answer(task_description, task_analysis, task_formulas, task_modeling, task_result)
task_dict = {
'task_description': task_description,
'task_analysis': task_analysis,
'preliminary_formulas': task_formulas,
'mathematical_modeling_process': task_modeling,
'task_code': task_code,
'is_pass': is_pass,
'execution_result': execution_result,
'solution_interpretation': task_result,
'subtask_outcome_analysis': task_answer
}
coordinator.code_memory[str(id)] = code_structure
else:
task_result = task.result(task_description, task_analysis, task_formulas, task_modeling)
task_answer = task.answer(task_description, task_analysis, task_formulas, task_modeling, task_result)
task_dict = {
'task_description': task_description,
'task_analysis': task_analysis,
'preliminary_formulas': task_formulas,
'mathematical_modeling_process': task_modeling,
'solution_interpretation': task_result,
'subtask_outcome_analysis': task_answer
}
coordinator.memory[str(id)] = task_dict
charts = chart.create_charts(str(task_dict), config['chart_num'])
task_dict['charts'] = charts
paper['tasks'].append(task_dict)
save_paper(paper, name, output_path)
print(paper)
print('Usage:', llm.get_total_usage())
write_json_file(f'{output_path}/usage/{name}.json', llm.get_total_usage())
return paper
def save_paper(paper, name, path):
write_json_file(f'{path}/json/{name}.json', paper)
markdown_str = json_to_markdown(paper)
write_text_file(f'{path}/markdown/{name}.md', markdown_str)
# write_text_file(f'data/actor_data/output/latex/{name}.tex', markdown_to_latex(markdown_str))
def mkdir(path):
os.mkdir(path)
os.mkdir(path + '/json')
os.mkdir(path + '/markdown')
os.mkdir(path + '/latex')
os.mkdir(path + '/code')
os.mkdir(path + '/usage')
if __name__ == "__main__":
import glob
file_name_list = []
for year in range(2025, 2026):
if year == 2025:
letters = "CDEF"
else:
letters = "ABCDEF"
for letter in letters:
file_name_list.append(f'data/actor_data/input/problem/{year}_{letter}*')
files = []
for pattern in file_name_list:
files.extend(glob.glob(pattern))
config_list = [{
'top_method_num': 6,
'problem_analysis_round': 1,
'problem_modeling_round': 1,
'task_formulas_round': 1,
'tasknum': 4,
'chart_num': 3,
'model_name': 'gpt-4o',
"method_name": "MM-Agent-gpt-4o-v3-probelm-modleing"
# 'model_name': 'chatgpt-4o-latest'
}]
for i, config in enumerate(config_list, start=1):
for file in files:
try:
name = file.split('/')[-1].split('.')[0]
dataset_path = os.path.join('data/actor_data/input/dataset', file.split('/')[-1].split('.')[0])
output_dir = 'data/actor_data/exps/{}'.format(config["method_name"])
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_path = os.path.join(output_dir, name + '_{}'.format(datetime.now().strftime('%Y%m%d-%H%M%S')))
if not os.path.exists(output_path):
mkdir(output_path)
print(f'Processing {file}..., config: {config}')
start = time.time()
paper = run_batch(problem_path=file, config=config, name=name, dataset_path=dataset_path, output_path=output_path)
end = time.time()
with open(output_path + '/usage/runtime.txt', 'w') as f:
f.write("{:.2f}s".format(end - start))
# save_paper(paper, name)
except Exception as e:
raise
print(f'Error: {e}')
continue