from typing import List from pathlib import Path from .base_agent import BaseAgent from prompt.template import TASK_DECOMPOSE_PROMPT, TASK_DESCRIPTION_PROMPT, TASK_DECOMPOSE_WO_COO_PROMPT from utils.utils import read_json_file class TaskDecompose(BaseAgent): def __init__(self, llm, coo=True): super().__init__(llm) self.coo = coo current_file = Path(__file__).resolve() path = current_file.parent.parent.parent / 'data/actor_data/input/decompose_prompt.json' self.decomposed_principles = read_json_file(str(path)) def decompose(self, modeling_problem: str, problem_analysis: str, modeling_solution: str, problem_type: str, tasknum: int, user_prompt: str=''): if self.coo: decomposed_principle = self.decomposed_principles.get(problem_type, self.decomposed_principles['C']) decomposed_principle = decomposed_principle.get(str(tasknum), decomposed_principle['4']) prompt = TASK_DECOMPOSE_PROMPT.format(modeling_problem=modeling_problem, problem_analysis=problem_analysis, modeling_solution=modeling_solution, decomposed_principle=decomposed_principle, tasknum=tasknum, user_prompt=user_prompt) # print(prompt) answer = self.llm.generate(prompt) tasks = [task.strip() for task in answer.split('---') if task.strip()] else: prompt = TASK_DECOMPOSE_WO_COO_PROMPT.format(modeling_problem=modeling_problem, tasknum=tasknum, user_prompt=user_prompt) # print(prompt) answer = self.llm.generate(prompt) tasks = [task.strip() for task in answer.split('---') if task.strip()] return tasks def refine(self, modeling_problem: str, problem_analysis: str, modeling_solution: str, decomposed_subtasks: List[str], task_i: int): decomposed_subtasks_str = '\n'.join(decomposed_subtasks) prompt = TASK_DESCRIPTION_PROMPT.format(modeling_problem=modeling_problem, problem_analysis=problem_analysis, modeling_solution=modeling_solution, decomposed_subtasks=decomposed_subtasks_str, task_i=task_i+1) answer = self.llm.generate(prompt) return answer def decompose_and_refine(self, modeling_problem: str, problem_analysis: str, modeling_solution: str, decomposed_principle: str, tasknum: int, user_prompt: str=''): if self.coo: decomposed_subtasks = self.decompose(modeling_problem, problem_analysis, modeling_solution, decomposed_principle, tasknum, user_prompt) for task_i in range(len(decomposed_subtasks)): refined_subtask = self.refine(modeling_problem, problem_analysis, modeling_solution, decomposed_subtasks, task_i) decomposed_subtasks[task_i] = refined_subtask else: decomposed_subtasks = self.decompose(modeling_problem, problem_analysis, modeling_solution, decomposed_principle, tasknum, user_prompt) return decomposed_subtasks