MathematicalModelingAgent / core /run_batch_wo_coo.py
from llm.llm import LLM
from prompt.constants import modeling_methods
from input.problem import problem_input
# from input.test_middle_result import problem_str, problem_analysis, selected_models, modeling_solution, modeling_solution, task_descriptions
from agent.problem_analysis import ProblemAnalysis
from agent.method_ranking import MethodRanking
from agent.problem_modeling import ProblemModeling
from agent.task_decompse import TaskDecompose
from agent.task import Task
from agent.create_charts import Chart
from agent.coordinator import Coordinator
from utils.utils import read_json_file, write_json_file, write_text_file, json_to_markdown
from prompt.template import TASK_ANALYSIS_APPEND_PROMPT, TASK_FORMULAS_APPEND_PROMPT, TASK_MODELING_APPEND_PROMPT
# from utils.convert_format import markdown_to_latex
import os
from datetime import datetime
import shutil
def run_batch(problem_path, config, name, dataset_path, output_path):
# Initialize LLM
llm = LLM(config['model_name'])
# Get problem input
problem_str, problem = problem_input(problem_path, llm)
problem_type = ""
# Initialize paper dictionary
paper = {'tasks': []}
paper['problem_background'] = problem['background']
paper['problem_requirement'] = problem['problem_requirement']
# Problem analysis
problem_analysis = ""
paper['problem_analysis'] = problem_analysis
# Problem modeling
modeling_solution = ""
# Task decomposition
td = TaskDecompose(llm, coo = False)
task_descriptions = td.decompose_and_refine(problem_str, problem_analysis, modeling_solution, problem_type, config['tasknum'])
# Analyze dependency
with_code = len(problem['dataset_path']) > 0
if with_code:
shutil.copytree(dataset_path, os.path.join(output_path,'code'), dirs_exist_ok=True)
# Process tasks
task = Task(llm, coo = False)
mr = MethodRanking(llm)
chart = Chart(llm)
for id in range(1, len(task_descriptions)+1):
code_template = open(os.path.join('data/actor_data/input/code_template','main{}.py'.format(id))).read()
work_dir = os.path.join(output_path,'code')
script_name = 'main{}.py'.format(id)
task_description = task_descriptions[id - 1]
task_analysis = task.analysis("", task_description)
description_and_analysis = f'## Task Description\n{task_description}\n\n## Task Analysis\n{task_analysis}'
top_modeling_methods = mr.top_methods(description_and_analysis, top_k=config['top_method_num'])
task_formulas = task.formulas("", problem['data_description'], task_description, task_analysis, top_modeling_methods, round=config['task_formulas_round'])
task_modeling = task.modeling("", problem['data_description'], task_description, task_analysis, task_formulas)
if with_code:
task_code, is_pass, execution_result = task.coding(problem['dataset_path'], problem['data_description'], problem['variable_description'], task_description, task_analysis, task_formulas, task_modeling, "", code_template, script_name, work_dir)
task_result = task.result(task_description, task_analysis, task_formulas, task_modeling, execution_result)
task_answer = task.answer(task_description, task_analysis, task_formulas, task_modeling, task_result)
task_dict = {
'task_description': task_description,
'task_analysis': task_analysis,
'preliminary_formulas': task_formulas,
'mathematical_modeling_process': task_modeling,
'task_code': task_code,
'is_pass': is_pass,
'execution_result': execution_result,
'solution_interpretation': task_result,
'subtask_outcome_analysis': task_answer
}
else:
task_result = task.result(task_description, task_analysis, task_formulas, task_modeling)
task_answer = task.answer(task_description, task_analysis, task_formulas, task_modeling, task_result)
task_dict = {
'task_description': task_description,
'task_analysis': task_analysis,
'preliminary_formulas': task_formulas,
'mathematical_modeling_process': task_modeling,
'solution_interpretation': task_result,
'subtask_outcome_analysis': task_answer
}
charts = chart.create_charts(str(task_dict), config['chart_num'])
task_dict['charts'] = charts
paper['tasks'].append(task_dict)
save_paper(paper, name, output_path)
print(paper)
print('Usage:', llm.get_total_usage())
write_json_file(f'{output_path}/usage/{name}.json', llm.get_total_usage())
return paper
def save_paper(paper, name, path):
write_json_file(f'{path}/json/{name}.json', paper)
markdown_str = json_to_markdown(paper)
write_text_file(f'{path}/markdown/{name}.md', markdown_str)
# write_text_file(f'data/actor_data/output/latex/{name}.tex', markdown_to_latex(markdown_str))
def mkdir(path):
os.mkdir(path)
os.mkdir(path + '/json')
os.mkdir(path + '/markdown')
os.mkdir(path + '/latex')
os.mkdir(path + '/code')
os.mkdir(path + '/usage')
if __name__ == "__main__":
import glob
# files = glob.glob('data/actor_data/input/problem/2024*')
files = glob.glob('data/actor_data/input/problem/2024_C*')
# config_list = [{
# 'top_method_num': 6,
# 'problem_analysis_round': 1,
# 'problem_modeling_round': 1,
# 'task_formulas_round': 1,
# 'tasknum': 4,
# 'chart_num': 3,
# 'model_name': 'gpt-4'
# # 'model_name': 'chatgpt-4o-latest'
# }, {
# 'top_method_num': 6,
# 'problem_analysis_round': 1,
# 'problem_modeling_round': 1,
# 'task_formulas_round': 1,
# 'tasknum': 4,
# 'chart_num': 3,
# 'model_name': 'deepseek-reasoner'
# }, {
# 'top_method_num': 6,
# 'problem_analysis_round': 1,
# 'problem_modeling_round': 1,
# 'task_formulas_round': 1,
# 'tasknum': 4,
# 'chart_num': 3,
# 'model_name': 'DeepSeek-R1-671B'
# }][0:]
config_list = [{
'top_method_num': 6,
'problem_analysis_round': 1,
'problem_modeling_round': 1,
'task_formulas_round': 1,
'tasknum': 4,
'chart_num': 3,
'model_name': 'gpt-4'
}][0:]
for i, config in enumerate(config_list, start=1):
for file in files:
try:
name = file.split('/')[-1].split('.')[0] + '_wo_coo'
dataset_path = os.path.join('data/actor_data/input/dataset', file.split('/')[-1].split('.')[0])
output_dir = 'data/actor_data/output'
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_path = os.path.join(output_dir, name + '_{}'.format(datetime.now().strftime('%Y%m%d-%H%M%S')))
if not os.path.exists(output_path):
mkdir(output_path)
print(f'Processing {file}..., config: {config}')
paper = run_batch(problem_path=file, config=config, name=name, dataset_path=dataset_path, output_path=output_path)
# save_paper(paper, name)
except Exception as e:
raise
print(f'Error: {e}')
continue