|
|
import asyncio |
|
|
import json |
|
|
import os |
|
|
from typing import Any, Hashable |
|
|
|
|
|
import pandas as pd |
|
|
from pydantic import Field, model_validator |
|
|
|
|
|
from app.config import config |
|
|
from app.llm import LLM |
|
|
from app.logger import logger |
|
|
from app.tool.base import BaseTool |
|
|
|
|
|
|
|
|
class DataVisualization(BaseTool): |
|
|
name: str = "data_visualization" |
|
|
description: str = """Visualize statistical chart or Add insights in chart with JSON info from visualization_preparation tool. You can do steps as follows: |
|
|
1. Visualize statistical chart |
|
|
2. Choose insights into chart based on step 1 (Optional) |
|
|
Outputs: |
|
|
1. Charts (png/html) |
|
|
2. Charts Insights (.md)(Optional)""" |
|
|
parameters: dict = { |
|
|
"type": "object", |
|
|
"properties": { |
|
|
"json_path": { |
|
|
"type": "string", |
|
|
"description": """file path of json info with ".json" in the end""", |
|
|
}, |
|
|
"output_type": { |
|
|
"description": "Rendering format (html=interactive)", |
|
|
"type": "string", |
|
|
"default": "html", |
|
|
"enum": ["png", "html"], |
|
|
}, |
|
|
"tool_type": { |
|
|
"description": "visualize chart or add insights", |
|
|
"type": "string", |
|
|
"default": "visualization", |
|
|
"enum": ["visualization", "insight"], |
|
|
}, |
|
|
"language": { |
|
|
"description": "english(en) / chinese(zh)", |
|
|
"type": "string", |
|
|
"default": "en", |
|
|
"enum": ["zh", "en"], |
|
|
}, |
|
|
}, |
|
|
"required": ["code"], |
|
|
} |
|
|
llm: LLM = Field(default_factory=LLM, description="Language model instance") |
|
|
|
|
|
@model_validator(mode="after") |
|
|
def initialize_llm(self): |
|
|
"""Initialize llm with default settings if not provided.""" |
|
|
if self.llm is None or not isinstance(self.llm, LLM): |
|
|
self.llm = LLM(config_name=self.name.lower()) |
|
|
return self |
|
|
|
|
|
def get_file_path( |
|
|
self, |
|
|
json_info: list[dict[str, str]], |
|
|
path_str: str, |
|
|
directory: str = None, |
|
|
) -> list[str]: |
|
|
res = [] |
|
|
for item in json_info: |
|
|
if os.path.exists(item[path_str]): |
|
|
res.append(item[path_str]) |
|
|
elif os.path.exists( |
|
|
os.path.join(f"{directory or config.workspace_root}", item[path_str]) |
|
|
): |
|
|
res.append( |
|
|
os.path.join( |
|
|
f"{directory or config.workspace_root}", item[path_str] |
|
|
) |
|
|
) |
|
|
else: |
|
|
raise Exception(f"No such file or directory: {item[path_str]}") |
|
|
return res |
|
|
|
|
|
def success_output_template(self, result: list[dict[str, str]]) -> str: |
|
|
content = "" |
|
|
if len(result) == 0: |
|
|
return "Is EMPTY!" |
|
|
for item in result: |
|
|
content += f"""## {item['title']}\nChart saved in: {item['chart_path']}""" |
|
|
if "insight_path" in item and item["insight_path"] and "insight_md" in item: |
|
|
content += "\n" + item["insight_md"] |
|
|
else: |
|
|
content += "\n" |
|
|
return f"Chart Generated Successful!\n{content}" |
|
|
|
|
|
async def data_visualization( |
|
|
self, json_info: list[dict[str, str]], output_type: str, language: str |
|
|
) -> str: |
|
|
data_list = [] |
|
|
csv_file_path = self.get_file_path(json_info, "csvFilePath") |
|
|
for index, item in enumerate(json_info): |
|
|
df = pd.read_csv(csv_file_path[index], encoding="utf-8") |
|
|
df = df.astype(object) |
|
|
df = df.where(pd.notnull(df), None) |
|
|
data_dict_list = df.to_json(orient="records", force_ascii=False) |
|
|
|
|
|
data_list.append( |
|
|
{ |
|
|
"file_name": os.path.basename(csv_file_path[index]).replace( |
|
|
".csv", "" |
|
|
), |
|
|
"dict_data": data_dict_list, |
|
|
"chartTitle": item["chartTitle"], |
|
|
} |
|
|
) |
|
|
tasks = [ |
|
|
self.invoke_vmind( |
|
|
dict_data=item["dict_data"], |
|
|
chart_description=item["chartTitle"], |
|
|
file_name=item["file_name"], |
|
|
output_type=output_type, |
|
|
task_type="visualization", |
|
|
language=language, |
|
|
) |
|
|
for item in data_list |
|
|
] |
|
|
|
|
|
results = await asyncio.gather(*tasks) |
|
|
error_list = [] |
|
|
success_list = [] |
|
|
for index, result in enumerate(results): |
|
|
csv_path = csv_file_path[index] |
|
|
if "error" in result and "chart_path" not in result: |
|
|
error_list.append(f"Error in {csv_path}: {result['error']}") |
|
|
else: |
|
|
success_list.append( |
|
|
{ |
|
|
**result, |
|
|
"title": json_info[index]["chartTitle"], |
|
|
} |
|
|
) |
|
|
if len(error_list) > 0: |
|
|
return { |
|
|
"observation": f"# Error chart generated{'\n'.join(error_list)}\n{self.success_output_template(success_list)}", |
|
|
"success": False, |
|
|
} |
|
|
else: |
|
|
return {"observation": f"{self.success_output_template(success_list)}"} |
|
|
|
|
|
async def add_insighs( |
|
|
self, json_info: list[dict[str, str]], output_type: str |
|
|
) -> str: |
|
|
data_list = [] |
|
|
chart_file_path = self.get_file_path( |
|
|
json_info, "chartPath", os.path.join(config.workspace_root, "visualization") |
|
|
) |
|
|
for index, item in enumerate(json_info): |
|
|
if "insights_id" in item: |
|
|
data_list.append( |
|
|
{ |
|
|
"file_name": os.path.basename(chart_file_path[index]).replace( |
|
|
f".{output_type}", "" |
|
|
), |
|
|
"insights_id": item["insights_id"], |
|
|
} |
|
|
) |
|
|
tasks = [ |
|
|
self.invoke_vmind( |
|
|
insights_id=item["insights_id"], |
|
|
file_name=item["file_name"], |
|
|
output_type=output_type, |
|
|
task_type="insight", |
|
|
) |
|
|
for item in data_list |
|
|
] |
|
|
results = await asyncio.gather(*tasks) |
|
|
error_list = [] |
|
|
success_list = [] |
|
|
for index, result in enumerate(results): |
|
|
chart_path = chart_file_path[index] |
|
|
if "error" in result and "chart_path" not in result: |
|
|
error_list.append(f"Error in {chart_path}: {result['error']}") |
|
|
else: |
|
|
success_list.append(chart_path) |
|
|
success_template = ( |
|
|
f"# Charts Update with Insights\n{','.join(success_list)}" |
|
|
if len(success_list) > 0 |
|
|
else "" |
|
|
) |
|
|
if len(error_list) > 0: |
|
|
return { |
|
|
"observation": f"# Error in chart insights:{'\n'.join(error_list)}\n{success_template}", |
|
|
"success": False, |
|
|
} |
|
|
else: |
|
|
return {"observation": f"{success_template}"} |
|
|
|
|
|
async def execute( |
|
|
self, |
|
|
json_path: str, |
|
|
output_type: str | None = "html", |
|
|
tool_type: str | None = "visualization", |
|
|
language: str | None = "en", |
|
|
) -> str: |
|
|
try: |
|
|
logger.info(f"📈 data_visualization with {json_path} in: {tool_type} ") |
|
|
with open(json_path, "r", encoding="utf-8") as file: |
|
|
json_info = json.load(file) |
|
|
if tool_type == "visualization": |
|
|
return await self.data_visualization(json_info, output_type, language) |
|
|
else: |
|
|
return await self.add_insighs(json_info, output_type) |
|
|
except Exception as e: |
|
|
return { |
|
|
"observation": f"Error: {e}", |
|
|
"success": False, |
|
|
} |
|
|
|
|
|
async def invoke_vmind( |
|
|
self, |
|
|
file_name: str, |
|
|
output_type: str, |
|
|
task_type: str, |
|
|
insights_id: list[str] = None, |
|
|
dict_data: list[dict[Hashable, Any]] = None, |
|
|
chart_description: str = None, |
|
|
language: str = "en", |
|
|
): |
|
|
llm_config = { |
|
|
"base_url": self.llm.base_url, |
|
|
"model": self.llm.model, |
|
|
"api_key": self.llm.api_key, |
|
|
} |
|
|
vmind_params = { |
|
|
"llm_config": llm_config, |
|
|
"user_prompt": chart_description, |
|
|
"dataset": dict_data, |
|
|
"file_name": file_name, |
|
|
"output_type": output_type, |
|
|
"insights_id": insights_id, |
|
|
"task_type": task_type, |
|
|
"directory": str(config.workspace_root), |
|
|
"language": language, |
|
|
} |
|
|
|
|
|
process = await asyncio.create_subprocess_exec( |
|
|
"npx", |
|
|
"ts-node", |
|
|
"src/chartVisualize.ts", |
|
|
stdin=asyncio.subprocess.PIPE, |
|
|
stdout=asyncio.subprocess.PIPE, |
|
|
stderr=asyncio.subprocess.PIPE, |
|
|
cwd=os.path.dirname(__file__), |
|
|
) |
|
|
input_json = json.dumps(vmind_params, ensure_ascii=False).encode("utf-8") |
|
|
try: |
|
|
stdout, stderr = await process.communicate(input_json) |
|
|
stdout_str = stdout.decode("utf-8") |
|
|
stderr_str = stderr.decode("utf-8") |
|
|
if process.returncode == 0: |
|
|
return json.loads(stdout_str) |
|
|
else: |
|
|
return {"error": f"Node.js Error: {stderr_str}"} |
|
|
except Exception as e: |
|
|
return {"error": f"Subprocess Error: {str(e)}"} |
|
|
|