| |
| """ |
| File Processor Agent for GAIA Agent System |
| Handles file-based questions with intelligent processing strategies |
| """ |
|
|
| import os |
| import logging |
| from typing import Dict, List, Optional, Any |
| from pathlib import Path |
|
|
| from agents.state import GAIAAgentState, AgentRole, AgentResult, ToolResult |
| from models.qwen_client import QwenClient, ModelTier |
| from tools.file_processor import FileProcessorTool |
| from tools.calculator import CalculatorTool |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class FileProcessorAgent: |
| """ |
| Specialized agent for file processing tasks |
| Handles images, audio, CSV/Excel, Python code, and other file types |
| """ |
| |
| def __init__(self, llm_client: QwenClient): |
| self.llm_client = llm_client |
| self.file_processor = FileProcessorTool() |
| self.calculator = CalculatorTool() |
| |
| def process(self, state: GAIAAgentState) -> GAIAAgentState: |
| """ |
| Process file-based questions using file analysis tools |
| """ |
| logger.info(f"File processor processing: {state.question[:100]}...") |
| state.add_processing_step("File Processor: Starting file analysis") |
| |
| try: |
| |
| if not state.file_path or not os.path.exists(state.file_path): |
| error_msg = f"File not found: {state.file_path}" |
| state.add_error(error_msg) |
| result = self._create_failure_result(error_msg) |
| state.add_agent_result(result) |
| return state |
| |
| |
| strategy = self._determine_processing_strategy(state.question, state.file_path) |
| state.add_processing_step(f"File Processor: Strategy = {strategy}") |
| |
| |
| if strategy == "image_analysis": |
| result = self._process_image(state) |
| elif strategy == "data_analysis": |
| result = self._process_data_file(state) |
| elif strategy == "code_analysis": |
| result = self._process_code_file(state) |
| elif strategy == "audio_analysis": |
| result = self._process_audio_file(state) |
| elif strategy == "text_analysis": |
| result = self._process_text_file(state) |
| else: |
| result = self._process_generic_file(state) |
| |
| |
| state.add_agent_result(result) |
| state.add_processing_step(f"File Processor: Completed with confidence {result.confidence:.2f}") |
| |
| return state |
| |
| except Exception as e: |
| error_msg = f"File processing failed: {str(e)}" |
| state.add_error(error_msg) |
| logger.error(error_msg) |
| |
| |
| failure_result = self._create_failure_result(error_msg) |
| state.add_agent_result(failure_result) |
| return state |
| |
| def _determine_processing_strategy(self, question: str, file_path: str) -> str: |
| """Determine the best processing strategy based on file type and question""" |
| |
| file_extension = Path(file_path).suffix.lower() |
| question_lower = question.lower() |
| |
| |
| if file_extension in {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.webp'}: |
| return "image_analysis" |
| |
| |
| if file_extension in {'.mp3', '.wav', '.ogg', '.flac', '.m4a', '.aac'}: |
| return "audio_analysis" |
| |
| |
| if file_extension in {'.csv', '.xlsx', '.xls', '.json'}: |
| return "data_analysis" |
| |
| |
| if file_extension in {'.py', '.js', '.java', '.cpp', '.c', '.html', '.css'}: |
| return "code_analysis" |
| |
| |
| if file_extension in {'.txt', '.md', '.rst'}: |
| return "text_analysis" |
| |
| |
| return "generic_analysis" |
| |
| def _process_image(self, state: GAIAAgentState) -> AgentResult: |
| """Process image files and answer questions about them""" |
| |
| logger.info(f"Processing image: {state.file_path}") |
| |
| |
| file_result = self.file_processor.execute(state.file_path) |
| |
| if file_result.success and file_result.result.get('success'): |
| file_data = file_result.result['result'] |
| |
| |
| analysis_prompt = f""" |
| Based on this image analysis, please answer the following question: |
| |
| Question: {state.question} |
| |
| Image Information: |
| - File: {file_data.get('file_path', '')} |
| - Type: {file_data.get('file_type', '')} |
| - Content Description: {file_data.get('content', '')} |
| - Metadata: {file_data.get('metadata', {})} |
| |
| Please provide a direct answer based on the image analysis. |
| If the question asks about specific details that cannot be determined from the metadata alone, |
| please indicate what information is available and what would require visual analysis. |
| """ |
| |
| |
| model_tier = ModelTier.MAIN |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| confidence = 0.75 |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=llm_result.response, |
| confidence=confidence, |
| reasoning="Analyzed image metadata and properties", |
| tools_used=[ToolResult( |
| tool_name="file_processor", |
| success=True, |
| result=file_data, |
| execution_time=file_result.execution_time |
| )], |
| model_used=llm_result.model_used, |
| processing_time=file_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=file_data.get('content', 'Image analyzed'), |
| confidence=0.60, |
| reasoning="Image processed but analysis failed", |
| tools_used=[ToolResult( |
| tool_name="file_processor", |
| success=True, |
| result=file_data, |
| execution_time=file_result.execution_time |
| )], |
| model_used="fallback", |
| processing_time=file_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| else: |
| return self._create_failure_result("Image processing failed") |
| |
| def _process_data_file(self, state: GAIAAgentState) -> AgentResult: |
| """Process CSV/Excel files and perform data analysis""" |
| |
| logger.info(f"Processing data file: {state.file_path}") |
| |
| |
| file_result = self.file_processor.execute(state.file_path) |
| |
| if file_result.success and file_result.result.get('success'): |
| file_data = file_result.result['result'] |
| metadata = file_data.get('metadata', {}) |
| content = file_data.get('content', {}) |
| |
| |
| question_lower = state.question.lower() |
| needs_calculation = any(term in question_lower for term in [ |
| 'calculate', 'sum', 'total', 'average', 'mean', 'count', |
| 'maximum', 'minimum', 'how many', 'what is the' |
| ]) |
| |
| if needs_calculation and 'sample_data' in content: |
| return self._perform_data_calculations(state, file_data, file_result) |
| else: |
| return self._analyze_data_structure(state, file_data, file_result) |
| else: |
| return self._create_failure_result("Data file processing failed") |
| |
| def _perform_data_calculations(self, state: GAIAAgentState, file_data: Dict, file_result: ToolResult) -> AgentResult: |
| """Perform calculations on data file content""" |
| |
| metadata = file_data.get('metadata', {}) |
| content = file_data.get('content', {}) |
| |
| |
| sample_data = content.get('sample_data', []) |
| |
| |
| calculation_prompt = f""" |
| Based on this data file and question, determine what calculations are needed: |
| |
| Question: {state.question} |
| |
| Data Structure: |
| - Columns: {metadata.get('columns', [])} |
| - Rows: {metadata.get('row_count', 0)} |
| - Sample Data: {sample_data[:3]} # First 3 rows |
| |
| Please specify what calculations should be performed and on which columns. |
| Respond with specific calculation instructions. |
| """ |
| |
| llm_result = self.llm_client.generate(calculation_prompt, tier=ModelTier.MAIN, max_tokens=200) |
| |
| if llm_result.success: |
| |
| analysis_prompt = f""" |
| Based on this data analysis, please answer the question: |
| |
| Question: {state.question} |
| |
| Data Summary: |
| - File: {metadata.get('shape', [])} (rows x columns) |
| - Columns: {metadata.get('columns', [])} |
| - Numeric columns: {metadata.get('numeric_columns', [])} |
| - Statistics: {metadata.get('numeric_stats', {})} |
| - Sample data: {sample_data} |
| |
| Calculation guidance: {llm_result.response} |
| |
| Please provide the answer based on the data. |
| """ |
| |
| analysis_result = self.llm_client.generate(analysis_prompt, tier=ModelTier.MAIN, max_tokens=400) |
| |
| if analysis_result.success: |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=analysis_result.response, |
| confidence=0.80, |
| reasoning="Performed data analysis and calculations", |
| tools_used=[file_result], |
| model_used=analysis_result.model_used, |
| processing_time=file_result.execution_time + llm_result.response_time + analysis_result.response_time, |
| cost_estimate=llm_result.cost_estimate + analysis_result.cost_estimate |
| ) |
| |
| |
| return self._analyze_data_structure(state, file_data, file_result) |
| |
| def _analyze_data_structure(self, state: GAIAAgentState, file_data: Dict, file_result: ToolResult) -> AgentResult: |
| """Analyze data file structure and content""" |
| |
| metadata = file_data.get('metadata', {}) |
| content = file_data.get('content', {}) |
| |
| analysis_prompt = f""" |
| Based on this data file analysis, please answer the question: |
| |
| Question: {state.question} |
| |
| Data File Information: |
| - Structure: {metadata.get('shape', [])} (rows x columns) |
| - Columns: {metadata.get('columns', [])} |
| - Data types: {metadata.get('data_types', {})} |
| - Description: {content.get('description', '')} |
| - Sample data: {content.get('sample_data', [])} |
| |
| Please provide a direct answer based on the data structure and content. |
| """ |
| |
| model_tier = ModelTier.MAIN |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=llm_result.response, |
| confidence=0.75, |
| reasoning="Analyzed data file structure and content", |
| tools_used=[file_result], |
| model_used=llm_result.model_used, |
| processing_time=file_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=content.get('description', 'Data file analyzed'), |
| confidence=0.60, |
| reasoning="Data file processed but analysis failed", |
| tools_used=[file_result], |
| model_used="fallback", |
| processing_time=file_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| |
| def _process_code_file(self, state: GAIAAgentState) -> AgentResult: |
| """Process code files and analyze their content""" |
| |
| logger.info(f"Processing code file: {state.file_path}") |
| |
| |
| file_result = self.file_processor.execute(state.file_path) |
| |
| if file_result.success and file_result.result.get('success'): |
| file_data = file_result.result['result'] |
| metadata = file_data.get('metadata', {}) |
| content = file_data.get('content', {}) |
| |
| analysis_prompt = f""" |
| Based on this code analysis, please answer the question: |
| |
| Question: {state.question} |
| |
| Code File Information: |
| - Type: {file_data.get('file_type', '')} |
| - Description: {content.get('description', '')} |
| - Metadata: {metadata} |
| - Code snippet: {content.get('code_snippet', '')} |
| |
| Please analyze the code and provide a direct answer. |
| """ |
| |
| model_tier = ModelTier.MAIN |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=500) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=llm_result.response, |
| confidence=0.80, |
| reasoning="Analyzed code structure and content", |
| tools_used=[ToolResult( |
| tool_name="file_processor", |
| success=True, |
| result=file_data, |
| execution_time=file_result.execution_time |
| )], |
| model_used=llm_result.model_used, |
| processing_time=file_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| else: |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=content.get('description', 'Code file analyzed'), |
| confidence=0.60, |
| reasoning="Code file processed but analysis failed", |
| tools_used=[ToolResult( |
| tool_name="file_processor", |
| success=True, |
| result=file_data, |
| execution_time=file_result.execution_time |
| )], |
| model_used="fallback", |
| processing_time=file_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| else: |
| return self._create_failure_result("Code file processing failed") |
| |
| def _process_audio_file(self, state: GAIAAgentState) -> AgentResult: |
| """Process audio files (basic metadata for now)""" |
| |
| logger.info(f"Processing audio file: {state.file_path}") |
| |
| |
| file_result = self.file_processor.execute(state.file_path) |
| |
| if file_result.success and file_result.result.get('success'): |
| file_data = file_result.result['result'] |
| |
| analysis_prompt = f""" |
| Based on this audio file information, please answer the question: |
| |
| Question: {state.question} |
| |
| Audio File Information: |
| - Content: {file_data.get('content', '')} |
| - Metadata: {file_data.get('metadata', {})} |
| |
| Please provide an answer based on the available audio file information. |
| Note: Full audio transcription is not currently available, but file metadata is provided. |
| """ |
| |
| model_tier = ModelTier.ROUTER |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=300) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=llm_result.response, |
| confidence=0.50, |
| reasoning="Analyzed audio file metadata (transcription not available)", |
| tools_used=[ToolResult( |
| tool_name="file_processor", |
| success=True, |
| result=file_data, |
| execution_time=file_result.execution_time |
| )], |
| model_used=llm_result.model_used, |
| processing_time=file_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| |
| return self._create_failure_result("Audio file processing not fully supported") |
| |
| def _process_text_file(self, state: GAIAAgentState) -> AgentResult: |
| """Process text files and analyze their content""" |
| |
| logger.info(f"Processing text file: {state.file_path}") |
| |
| |
| file_result = self.file_processor.execute(state.file_path) |
| |
| if file_result.success and file_result.result.get('success'): |
| file_data = file_result.result['result'] |
| content = file_data.get('content', {}) |
| |
| analysis_prompt = f""" |
| Based on this text file content, please answer the question: |
| |
| Question: {state.question} |
| |
| Text Content: |
| {content.get('text', '')[:2000]}... |
| |
| File Statistics: |
| - Word count: {file_data.get('metadata', {}).get('word_count', 0)} |
| - Line count: {file_data.get('metadata', {}).get('line_count', 0)} |
| |
| Please analyze the text and provide a direct answer. |
| """ |
| |
| model_tier = ModelTier.MAIN |
| llm_result = self.llm_client.generate(analysis_prompt, tier=model_tier, max_tokens=400) |
| |
| if llm_result.success: |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=llm_result.response, |
| confidence=0.85, |
| reasoning="Analyzed text file content", |
| tools_used=[ToolResult( |
| tool_name="file_processor", |
| success=True, |
| result=file_data, |
| execution_time=file_result.execution_time |
| )], |
| model_used=llm_result.model_used, |
| processing_time=file_result.execution_time + llm_result.response_time, |
| cost_estimate=llm_result.cost_estimate |
| ) |
| |
| return self._create_failure_result("Text file processing failed") |
| |
| def _process_generic_file(self, state: GAIAAgentState) -> AgentResult: |
| """Process unknown file types with generic analysis""" |
| |
| logger.info(f"Processing generic file: {state.file_path}") |
| |
| |
| file_result = self.file_processor.execute(state.file_path) |
| |
| if file_result.success: |
| file_data = file_result.result |
| |
| |
| basic_info = f"File analyzed: {state.file_path}. " |
| if file_data.get('success'): |
| basic_info += f"File type: {file_data.get('result', {}).get('file_type', 'unknown')}. " |
| basic_info += "Generic file analysis completed." |
| else: |
| basic_info += f"Analysis result: {file_data.get('message', 'Processing completed')}" |
| |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=True, |
| result=basic_info, |
| confidence=0.40, |
| reasoning="Generic file processing attempted", |
| tools_used=[ToolResult( |
| tool_name="file_processor", |
| success=True, |
| result=file_data, |
| execution_time=file_result.execution_time |
| )], |
| model_used="basic", |
| processing_time=file_result.execution_time, |
| cost_estimate=0.0 |
| ) |
| else: |
| return self._create_failure_result("Generic file processing failed") |
| |
| def _create_failure_result(self, error_message: str) -> AgentResult: |
| """Create a failure result""" |
| return AgentResult( |
| agent_role=AgentRole.FILE_PROCESSOR, |
| success=False, |
| result=error_message, |
| confidence=0.0, |
| reasoning=error_message, |
| model_used="error", |
| processing_time=0.0, |
| cost_estimate=0.0 |
| ) |