import asyncio import json import os from typing import Any, Hashable import pandas as pd from pydantic import Field, model_validator from app.config import config from app.llm import LLM from app.logger import logger from app.tool.base import BaseTool class DataVisualization(BaseTool): name: str = "data_visualization" description: str = """Visualize statistical chart or Add insights in chart with JSON info from visualization_preparation tool. You can do steps as follows: 1. Visualize statistical chart 2. Choose insights into chart based on step 1 (Optional) Outputs: 1. Charts (png/html) 2. Charts Insights (.md)(Optional)""" parameters: dict = { "type": "object", "properties": { "json_path": { "type": "string", "description": """file path of json info with ".json" in the end""", }, "output_type": { "description": "Rendering format (html=interactive)", "type": "string", "default": "html", "enum": ["png", "html"], }, "tool_type": { "description": "visualize chart or add insights", "type": "string", "default": "visualization", "enum": ["visualization", "insight"], }, "language": { "description": "english(en) / chinese(zh)", "type": "string", "default": "en", "enum": ["zh", "en"], }, }, "required": ["code"], } llm: LLM = Field(default_factory=LLM, description="Language model instance") @model_validator(mode="after") def initialize_llm(self): """Initialize llm with default settings if not provided.""" if self.llm is None or not isinstance(self.llm, LLM): self.llm = LLM(config_name=self.name.lower()) return self def get_file_path( self, json_info: list[dict[str, str]], path_str: str, directory: str = None, ) -> list[str]: res = [] for item in json_info: if os.path.exists(item[path_str]): res.append(item[path_str]) elif os.path.exists( os.path.join(f"{directory or config.workspace_root}", item[path_str]) ): res.append( os.path.join( f"{directory or config.workspace_root}", item[path_str] ) ) else: raise Exception(f"No such file or directory: {item[path_str]}") return res def success_output_template(self, result: list[dict[str, str]]) -> str: content = "" if len(result) == 0: return "Is EMPTY!" for item in result: content += f"""## {item['title']}\nChart saved in: {item['chart_path']}""" if "insight_path" in item and item["insight_path"] and "insight_md" in item: content += "\n" + item["insight_md"] else: content += "\n" return f"Chart Generated Successful!\n{content}" async def data_visualization( self, json_info: list[dict[str, str]], output_type: str, language: str ) -> str: data_list = [] csv_file_path = self.get_file_path(json_info, "csvFilePath") for index, item in enumerate(json_info): df = pd.read_csv(csv_file_path[index], encoding="utf-8") df = df.astype(object) df = df.where(pd.notnull(df), None) data_dict_list = df.to_json(orient="records", force_ascii=False) data_list.append( { "file_name": os.path.basename(csv_file_path[index]).replace( ".csv", "" ), "dict_data": data_dict_list, "chartTitle": item["chartTitle"], } ) tasks = [ self.invoke_vmind( dict_data=item["dict_data"], chart_description=item["chartTitle"], file_name=item["file_name"], output_type=output_type, task_type="visualization", language=language, ) for item in data_list ] results = await asyncio.gather(*tasks) error_list = [] success_list = [] for index, result in enumerate(results): csv_path = csv_file_path[index] if "error" in result and "chart_path" not in result: error_list.append(f"Error in {csv_path}: {result['error']}") else: success_list.append( { **result, "title": json_info[index]["chartTitle"], } ) if len(error_list) > 0: return { "observation": f"# Error chart generated{'\n'.join(error_list)}\n{self.success_output_template(success_list)}", "success": False, } else: return {"observation": f"{self.success_output_template(success_list)}"} async def add_insighs( self, json_info: list[dict[str, str]], output_type: str ) -> str: data_list = [] chart_file_path = self.get_file_path( json_info, "chartPath", os.path.join(config.workspace_root, "visualization") ) for index, item in enumerate(json_info): if "insights_id" in item: data_list.append( { "file_name": os.path.basename(chart_file_path[index]).replace( f".{output_type}", "" ), "insights_id": item["insights_id"], } ) tasks = [ self.invoke_vmind( insights_id=item["insights_id"], file_name=item["file_name"], output_type=output_type, task_type="insight", ) for item in data_list ] results = await asyncio.gather(*tasks) error_list = [] success_list = [] for index, result in enumerate(results): chart_path = chart_file_path[index] if "error" in result and "chart_path" not in result: error_list.append(f"Error in {chart_path}: {result['error']}") else: success_list.append(chart_path) success_template = ( f"# Charts Update with Insights\n{','.join(success_list)}" if len(success_list) > 0 else "" ) if len(error_list) > 0: return { "observation": f"# Error in chart insights:{'\n'.join(error_list)}\n{success_template}", "success": False, } else: return {"observation": f"{success_template}"} async def execute( self, json_path: str, output_type: str | None = "html", tool_type: str | None = "visualization", language: str | None = "en", ) -> str: try: logger.info(f"📈 data_visualization with {json_path} in: {tool_type} ") with open(json_path, "r", encoding="utf-8") as file: json_info = json.load(file) if tool_type == "visualization": return await self.data_visualization(json_info, output_type, language) else: return await self.add_insighs(json_info, output_type) except Exception as e: return { "observation": f"Error: {e}", "success": False, } async def invoke_vmind( self, file_name: str, output_type: str, task_type: str, insights_id: list[str] = None, dict_data: list[dict[Hashable, Any]] = None, chart_description: str = None, language: str = "en", ): llm_config = { "base_url": self.llm.base_url, "model": self.llm.model, "api_key": self.llm.api_key, } vmind_params = { "llm_config": llm_config, "user_prompt": chart_description, "dataset": dict_data, "file_name": file_name, "output_type": output_type, "insights_id": insights_id, "task_type": task_type, "directory": str(config.workspace_root), "language": language, } # build async sub process process = await asyncio.create_subprocess_exec( "npx", "ts-node", "src/chartVisualize.ts", stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=os.path.dirname(__file__), ) input_json = json.dumps(vmind_params, ensure_ascii=False).encode("utf-8") try: stdout, stderr = await process.communicate(input_json) stdout_str = stdout.decode("utf-8") stderr_str = stderr.decode("utf-8") if process.returncode == 0: return json.loads(stdout_str) else: return {"error": f"Node.js Error: {stderr_str}"} except Exception as e: return {"error": f"Subprocess Error: {str(e)}"}