Speedofmastery's picture
Upload folder using huggingface_hub
88f3fce verified
import asyncio
import json
import os
from typing import Any, Hashable
import pandas as pd
from pydantic import Field, model_validator
from app.config import config
from app.llm import LLM
from app.logger import logger
from app.tool.base import BaseTool
class DataVisualization(BaseTool):
name: str = "data_visualization"
description: str = """Visualize statistical chart or Add insights in chart with JSON info from visualization_preparation tool. You can do steps as follows:
1. Visualize statistical chart
2. Choose insights into chart based on step 1 (Optional)
Outputs:
1. Charts (png/html)
2. Charts Insights (.md)(Optional)"""
parameters: dict = {
"type": "object",
"properties": {
"json_path": {
"type": "string",
"description": """file path of json info with ".json" in the end""",
},
"output_type": {
"description": "Rendering format (html=interactive)",
"type": "string",
"default": "html",
"enum": ["png", "html"],
},
"tool_type": {
"description": "visualize chart or add insights",
"type": "string",
"default": "visualization",
"enum": ["visualization", "insight"],
},
"language": {
"description": "english(en) / chinese(zh)",
"type": "string",
"default": "en",
"enum": ["zh", "en"],
},
},
"required": ["code"],
}
llm: LLM = Field(default_factory=LLM, description="Language model instance")
@model_validator(mode="after")
def initialize_llm(self):
"""Initialize llm with default settings if not provided."""
if self.llm is None or not isinstance(self.llm, LLM):
self.llm = LLM(config_name=self.name.lower())
return self
def get_file_path(
self,
json_info: list[dict[str, str]],
path_str: str,
directory: str = None,
) -> list[str]:
res = []
for item in json_info:
if os.path.exists(item[path_str]):
res.append(item[path_str])
elif os.path.exists(
os.path.join(f"{directory or config.workspace_root}", item[path_str])
):
res.append(
os.path.join(
f"{directory or config.workspace_root}", item[path_str]
)
)
else:
raise Exception(f"No such file or directory: {item[path_str]}")
return res
def success_output_template(self, result: list[dict[str, str]]) -> str:
content = ""
if len(result) == 0:
return "Is EMPTY!"
for item in result:
content += f"""## {item['title']}\nChart saved in: {item['chart_path']}"""
if "insight_path" in item and item["insight_path"] and "insight_md" in item:
content += "\n" + item["insight_md"]
else:
content += "\n"
return f"Chart Generated Successful!\n{content}"
async def data_visualization(
self, json_info: list[dict[str, str]], output_type: str, language: str
) -> str:
data_list = []
csv_file_path = self.get_file_path(json_info, "csvFilePath")
for index, item in enumerate(json_info):
df = pd.read_csv(csv_file_path[index], encoding="utf-8")
df = df.astype(object)
df = df.where(pd.notnull(df), None)
data_dict_list = df.to_json(orient="records", force_ascii=False)
data_list.append(
{
"file_name": os.path.basename(csv_file_path[index]).replace(
".csv", ""
),
"dict_data": data_dict_list,
"chartTitle": item["chartTitle"],
}
)
tasks = [
self.invoke_vmind(
dict_data=item["dict_data"],
chart_description=item["chartTitle"],
file_name=item["file_name"],
output_type=output_type,
task_type="visualization",
language=language,
)
for item in data_list
]
results = await asyncio.gather(*tasks)
error_list = []
success_list = []
for index, result in enumerate(results):
csv_path = csv_file_path[index]
if "error" in result and "chart_path" not in result:
error_list.append(f"Error in {csv_path}: {result['error']}")
else:
success_list.append(
{
**result,
"title": json_info[index]["chartTitle"],
}
)
if len(error_list) > 0:
return {
"observation": f"# Error chart generated{'\n'.join(error_list)}\n{self.success_output_template(success_list)}",
"success": False,
}
else:
return {"observation": f"{self.success_output_template(success_list)}"}
async def add_insighs(
self, json_info: list[dict[str, str]], output_type: str
) -> str:
data_list = []
chart_file_path = self.get_file_path(
json_info, "chartPath", os.path.join(config.workspace_root, "visualization")
)
for index, item in enumerate(json_info):
if "insights_id" in item:
data_list.append(
{
"file_name": os.path.basename(chart_file_path[index]).replace(
f".{output_type}", ""
),
"insights_id": item["insights_id"],
}
)
tasks = [
self.invoke_vmind(
insights_id=item["insights_id"],
file_name=item["file_name"],
output_type=output_type,
task_type="insight",
)
for item in data_list
]
results = await asyncio.gather(*tasks)
error_list = []
success_list = []
for index, result in enumerate(results):
chart_path = chart_file_path[index]
if "error" in result and "chart_path" not in result:
error_list.append(f"Error in {chart_path}: {result['error']}")
else:
success_list.append(chart_path)
success_template = (
f"# Charts Update with Insights\n{','.join(success_list)}"
if len(success_list) > 0
else ""
)
if len(error_list) > 0:
return {
"observation": f"# Error in chart insights:{'\n'.join(error_list)}\n{success_template}",
"success": False,
}
else:
return {"observation": f"{success_template}"}
async def execute(
self,
json_path: str,
output_type: str | None = "html",
tool_type: str | None = "visualization",
language: str | None = "en",
) -> str:
try:
logger.info(f"📈 data_visualization with {json_path} in: {tool_type} ")
with open(json_path, "r", encoding="utf-8") as file:
json_info = json.load(file)
if tool_type == "visualization":
return await self.data_visualization(json_info, output_type, language)
else:
return await self.add_insighs(json_info, output_type)
except Exception as e:
return {
"observation": f"Error: {e}",
"success": False,
}
async def invoke_vmind(
self,
file_name: str,
output_type: str,
task_type: str,
insights_id: list[str] = None,
dict_data: list[dict[Hashable, Any]] = None,
chart_description: str = None,
language: str = "en",
):
llm_config = {
"base_url": self.llm.base_url,
"model": self.llm.model,
"api_key": self.llm.api_key,
}
vmind_params = {
"llm_config": llm_config,
"user_prompt": chart_description,
"dataset": dict_data,
"file_name": file_name,
"output_type": output_type,
"insights_id": insights_id,
"task_type": task_type,
"directory": str(config.workspace_root),
"language": language,
}
# build async sub process
process = await asyncio.create_subprocess_exec(
"npx",
"ts-node",
"src/chartVisualize.ts",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=os.path.dirname(__file__),
)
input_json = json.dumps(vmind_params, ensure_ascii=False).encode("utf-8")
try:
stdout, stderr = await process.communicate(input_json)
stdout_str = stdout.decode("utf-8")
stderr_str = stderr.decode("utf-8")
if process.returncode == 0:
return json.loads(stdout_str)
else:
return {"error": f"Node.js Error: {stderr_str}"}
except Exception as e:
return {"error": f"Subprocess Error: {str(e)}"}