|
|
"""
|
|
|
Hugging Face Models Tool for OpenManus AI Agent
|
|
|
Tool for calling any Hugging Face model via Inference API
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import base64
|
|
|
import io
|
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
|
|
from app.huggingface_models import HuggingFaceModelManager, ModelCategory
|
|
|
from app.tool.base import BaseTool
|
|
|
|
|
|
|
|
|
class HuggingFaceModelsTool(BaseTool):
|
|
|
"""Tool for accessing Hugging Face models via Inference API"""
|
|
|
|
|
|
def __init__(self, api_token: str):
|
|
|
super().__init__()
|
|
|
self.name = "huggingface_models"
|
|
|
self.description = """
|
|
|
Access thousands of Hugging Face models for various AI tasks including:
|
|
|
- Text generation (GPT-like models, instruction-tuned models)
|
|
|
- Image generation (FLUX, Stable Diffusion, Qwen-Image)
|
|
|
- Speech recognition (Whisper, Parakeet, Canary)
|
|
|
- Text-to-speech (Kokoro, XTTS, VibeVoice)
|
|
|
- Image classification (NSFW detection, emotion recognition)
|
|
|
- Feature extraction (embeddings, sentence transformers)
|
|
|
- Translation, summarization, question answering
|
|
|
|
|
|
Use this tool to leverage state-of-the-art AI models for any task.
|
|
|
"""
|
|
|
self.model_manager = HuggingFaceModelManager(api_token)
|
|
|
|
|
|
async def text_generation(
|
|
|
self,
|
|
|
model_name: str,
|
|
|
prompt: str,
|
|
|
max_tokens: int = 100,
|
|
|
temperature: float = 0.7,
|
|
|
stream: bool = False,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Generate text using a text generation model
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model (e.g., "MiniMax-M2", "GPT-OSS 20B")
|
|
|
prompt: Input text prompt
|
|
|
max_tokens: Maximum tokens to generate
|
|
|
temperature: Sampling temperature (0.0 to 2.0)
|
|
|
stream: Whether to stream the response
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
model = self._find_model(model_name, ModelCategory.TEXT_GENERATION)
|
|
|
if not model:
|
|
|
return {"error": f"Text generation model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.TEXT_GENERATION,
|
|
|
prompt=prompt,
|
|
|
max_tokens=max_tokens,
|
|
|
temperature=temperature,
|
|
|
stream=stream,
|
|
|
)
|
|
|
|
|
|
return {"model": model.name, "model_id": model.model_id, "result": result}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"Text generation failed: {str(e)}"}
|
|
|
|
|
|
async def generate_image(
|
|
|
self,
|
|
|
model_name: str,
|
|
|
prompt: str,
|
|
|
negative_prompt: Optional[str] = None,
|
|
|
width: int = 1024,
|
|
|
height: int = 1024,
|
|
|
num_inference_steps: int = 20,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Generate image from text prompt
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model (e.g., "FLUX.1 Dev", "Stable Diffusion XL")
|
|
|
prompt: Text description of the image
|
|
|
negative_prompt: What to avoid in the image
|
|
|
width: Image width in pixels
|
|
|
height: Image height in pixels
|
|
|
num_inference_steps: Number of denoising steps
|
|
|
"""
|
|
|
try:
|
|
|
model = self._find_model(model_name, ModelCategory.TEXT_TO_IMAGE)
|
|
|
if not model:
|
|
|
return {"error": f"Text-to-image model '{model_name}' not found"}
|
|
|
|
|
|
image_bytes = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.TEXT_TO_IMAGE,
|
|
|
prompt=prompt,
|
|
|
negative_prompt=negative_prompt,
|
|
|
width=width,
|
|
|
height=height,
|
|
|
num_inference_steps=num_inference_steps,
|
|
|
)
|
|
|
|
|
|
|
|
|
image_b64 = base64.b64encode(image_bytes).decode()
|
|
|
|
|
|
return {
|
|
|
"model": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"image_base64": image_b64,
|
|
|
"size": f"{width}x{height}",
|
|
|
"prompt": prompt,
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"Image generation failed: {str(e)}"}
|
|
|
|
|
|
async def transcribe_audio(
|
|
|
self,
|
|
|
model_name: str,
|
|
|
audio_data: bytes,
|
|
|
language: Optional[str] = None,
|
|
|
task: str = "transcribe",
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Transcribe audio to text
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model (e.g., "Whisper Large v3")
|
|
|
audio_data: Audio file as bytes
|
|
|
language: Source language code (e.g., "en", "es")
|
|
|
task: "transcribe" or "translate"
|
|
|
"""
|
|
|
try:
|
|
|
model = self._find_model(
|
|
|
model_name, ModelCategory.AUTOMATIC_SPEECH_RECOGNITION
|
|
|
)
|
|
|
if not model:
|
|
|
return {"error": f"ASR model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.AUTOMATIC_SPEECH_RECOGNITION,
|
|
|
audio_data=audio_data,
|
|
|
language=language,
|
|
|
task=task,
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"model": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"transcription": result.get("text", ""),
|
|
|
"language": language,
|
|
|
"task": task,
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"Audio transcription failed: {str(e)}"}
|
|
|
|
|
|
async def text_to_speech(
|
|
|
self,
|
|
|
model_name: str,
|
|
|
text: str,
|
|
|
voice_id: Optional[str] = None,
|
|
|
speed: float = 1.0,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Convert text to speech
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model (e.g., "Kokoro 82M", "VibeVoice 1.5B")
|
|
|
text: Text to convert to speech
|
|
|
voice_id: Voice identifier (model-specific)
|
|
|
speed: Speech speed multiplier
|
|
|
"""
|
|
|
try:
|
|
|
model = self._find_model(model_name, ModelCategory.TEXT_TO_SPEECH)
|
|
|
if not model:
|
|
|
return {"error": f"TTS model '{model_name}' not found"}
|
|
|
|
|
|
audio_bytes = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.TEXT_TO_SPEECH,
|
|
|
text=text,
|
|
|
voice_id=voice_id,
|
|
|
speed=speed,
|
|
|
)
|
|
|
|
|
|
|
|
|
audio_b64 = base64.b64encode(audio_bytes).decode()
|
|
|
|
|
|
return {
|
|
|
"model": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"audio_base64": audio_b64,
|
|
|
"text": text,
|
|
|
"voice_id": voice_id,
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"Text-to-speech failed: {str(e)}"}
|
|
|
|
|
|
async def classify_image(
|
|
|
self, model_name: str, image_data: bytes, top_k: int = 5
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Classify image content
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model (e.g., "NSFW Image Detection")
|
|
|
image_data: Image file as bytes
|
|
|
top_k: Number of top predictions to return
|
|
|
"""
|
|
|
try:
|
|
|
model = self._find_model(model_name, ModelCategory.IMAGE_CLASSIFICATION)
|
|
|
if not model:
|
|
|
return {"error": f"Image classification model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.IMAGE_CLASSIFICATION,
|
|
|
image_data=image_data,
|
|
|
top_k=top_k,
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"model": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"predictions": result,
|
|
|
"top_k": top_k,
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"Image classification failed: {str(e)}"}
|
|
|
|
|
|
async def get_embeddings(
|
|
|
self, model_name: str, texts: Union[str, List[str]]
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Extract embeddings from text
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model (e.g., "Sentence Transformers All MiniLM")
|
|
|
texts: Text or list of texts to embed
|
|
|
"""
|
|
|
try:
|
|
|
model = self._find_model(model_name, ModelCategory.FEATURE_EXTRACTION)
|
|
|
if not model:
|
|
|
return {"error": f"Feature extraction model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id, ModelCategory.FEATURE_EXTRACTION, texts=texts
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"model": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"embeddings": result,
|
|
|
"input_count": len(texts) if isinstance(texts, list) else 1,
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"Feature extraction failed: {str(e)}"}
|
|
|
|
|
|
async def translate_text(
|
|
|
self,
|
|
|
model_name: str,
|
|
|
text: str,
|
|
|
source_language: Optional[str] = None,
|
|
|
target_language: Optional[str] = None,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Translate text between languages
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model (e.g., "M2M100 1.2B")
|
|
|
text: Text to translate
|
|
|
source_language: Source language code
|
|
|
target_language: Target language code
|
|
|
"""
|
|
|
try:
|
|
|
model = self._find_model(model_name, ModelCategory.TRANSLATION)
|
|
|
if not model:
|
|
|
return {"error": f"Translation model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.TRANSLATION,
|
|
|
text=text,
|
|
|
src_lang=source_language,
|
|
|
tgt_lang=target_language,
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"model": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"translation": result,
|
|
|
"source_language": source_language,
|
|
|
"target_language": target_language,
|
|
|
"original_text": text,
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"Translation failed: {str(e)}"}
|
|
|
|
|
|
async def summarize_text(
|
|
|
self, model_name: str, text: str, max_length: int = 150, min_length: int = 30
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Summarize long text
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model (e.g., "PEGASUS XSum")
|
|
|
text: Text to summarize
|
|
|
max_length: Maximum summary length
|
|
|
min_length: Minimum summary length
|
|
|
"""
|
|
|
try:
|
|
|
model = self._find_model(model_name, ModelCategory.SUMMARIZATION)
|
|
|
if not model:
|
|
|
return {"error": f"Summarization model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.SUMMARIZATION,
|
|
|
text=text,
|
|
|
max_length=max_length,
|
|
|
min_length=min_length,
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"model": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"summary": result,
|
|
|
"original_length": len(text),
|
|
|
"summary_length": (
|
|
|
len(result.get("summary_text", ""))
|
|
|
if isinstance(result, dict)
|
|
|
else len(str(result))
|
|
|
),
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"Summarization failed: {str(e)}"}
|
|
|
|
|
|
async def answer_question(
|
|
|
self, model_name: str, question: str, context: str
|
|
|
) -> Dict[str, Any]:
|
|
|
"""
|
|
|
Answer questions based on context
|
|
|
|
|
|
Args:
|
|
|
model_name: Name or ID of the model
|
|
|
question: Question to answer
|
|
|
context: Context containing the answer
|
|
|
"""
|
|
|
try:
|
|
|
|
|
|
model = self._find_model(model_name, ModelCategory.TEXT_GENERATION)
|
|
|
if not model:
|
|
|
return {"error": f"Question answering model '{model_name}' not found"}
|
|
|
|
|
|
|
|
|
prompt = f"Context: {context}\n\nQuestion: {question}\n\nAnswer:"
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.TEXT_GENERATION,
|
|
|
prompt=prompt,
|
|
|
max_tokens=200,
|
|
|
temperature=0.3,
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"model": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"answer": result,
|
|
|
"question": question,
|
|
|
"context_length": len(context),
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {"error": f"Question answering failed: {str(e)}"}
|
|
|
|
|
|
def list_available_models(self, category: Optional[str] = None) -> Dict[str, Any]:
|
|
|
"""
|
|
|
List all available models by category
|
|
|
|
|
|
Args:
|
|
|
category: Specific category to filter (optional)
|
|
|
"""
|
|
|
try:
|
|
|
if category:
|
|
|
cat_enum = ModelCategory(category.lower().replace("-", "_"))
|
|
|
models = self.model_manager.get_models_by_category(cat_enum)
|
|
|
return {
|
|
|
"category": category,
|
|
|
"models": [
|
|
|
{
|
|
|
"name": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"description": model.description,
|
|
|
"endpoint_compatible": model.endpoint_compatible,
|
|
|
"requires_auth": model.requires_auth,
|
|
|
}
|
|
|
for model in models
|
|
|
],
|
|
|
}
|
|
|
else:
|
|
|
all_models = self.model_manager.get_all_models()
|
|
|
return {
|
|
|
"categories": {
|
|
|
cat.value: [
|
|
|
{
|
|
|
"name": model.name,
|
|
|
"model_id": model.model_id,
|
|
|
"description": model.description,
|
|
|
"endpoint_compatible": model.endpoint_compatible,
|
|
|
"requires_auth": model.requires_auth,
|
|
|
}
|
|
|
for model in models
|
|
|
]
|
|
|
for cat, models in all_models.items()
|
|
|
}
|
|
|
}
|
|
|
except Exception as e:
|
|
|
return {"error": f"Failed to list models: {str(e)}"}
|
|
|
|
|
|
def _find_model(self, model_name: str, category: ModelCategory):
|
|
|
"""Find a model by name or ID within a category"""
|
|
|
models = self.model_manager.get_models_by_category(category)
|
|
|
|
|
|
|
|
|
for model in models:
|
|
|
if model.name.lower() == model_name.lower():
|
|
|
return model
|
|
|
|
|
|
|
|
|
for model in models:
|
|
|
if model.model_id.lower() == model_name.lower():
|
|
|
return model
|
|
|
|
|
|
|
|
|
for model in models:
|
|
|
if model_name.lower() in model.name.lower():
|
|
|
return model
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def execute(self, **kwargs) -> Dict[str, Any]:
|
|
|
"""Execute the Hugging Face models tool"""
|
|
|
action = kwargs.get("action", "list_models")
|
|
|
|
|
|
if action == "text_generation":
|
|
|
return await self.text_generation(
|
|
|
kwargs.get("model_name"),
|
|
|
kwargs.get("prompt"),
|
|
|
kwargs.get("max_tokens", 100),
|
|
|
kwargs.get("temperature", 0.7),
|
|
|
kwargs.get("stream", False),
|
|
|
)
|
|
|
elif action == "generate_image":
|
|
|
return await self.generate_image(
|
|
|
kwargs.get("model_name"),
|
|
|
kwargs.get("prompt"),
|
|
|
kwargs.get("negative_prompt"),
|
|
|
kwargs.get("width", 1024),
|
|
|
kwargs.get("height", 1024),
|
|
|
kwargs.get("num_inference_steps", 20),
|
|
|
)
|
|
|
elif action == "transcribe_audio":
|
|
|
return await self.transcribe_audio(
|
|
|
kwargs.get("model_name"),
|
|
|
kwargs.get("audio_data"),
|
|
|
kwargs.get("language"),
|
|
|
kwargs.get("task", "transcribe"),
|
|
|
)
|
|
|
elif action == "text_to_speech":
|
|
|
return await self.text_to_speech(
|
|
|
kwargs.get("model_name"),
|
|
|
kwargs.get("text"),
|
|
|
kwargs.get("voice_id"),
|
|
|
kwargs.get("speed", 1.0),
|
|
|
)
|
|
|
elif action == "classify_image":
|
|
|
return await self.classify_image(
|
|
|
kwargs.get("model_name"),
|
|
|
kwargs.get("image_data"),
|
|
|
kwargs.get("top_k", 5),
|
|
|
)
|
|
|
elif action == "get_embeddings":
|
|
|
return await self.get_embeddings(
|
|
|
kwargs.get("model_name"), kwargs.get("texts")
|
|
|
)
|
|
|
elif action == "translate_text":
|
|
|
return await self.translate_text(
|
|
|
kwargs.get("model_name"),
|
|
|
kwargs.get("text"),
|
|
|
kwargs.get("source_language"),
|
|
|
kwargs.get("target_language"),
|
|
|
)
|
|
|
elif action == "summarize_text":
|
|
|
return await self.summarize_text(
|
|
|
kwargs.get("model_name"),
|
|
|
kwargs.get("text"),
|
|
|
kwargs.get("max_length", 150),
|
|
|
kwargs.get("min_length", 30),
|
|
|
)
|
|
|
elif action == "answer_question":
|
|
|
return await self.answer_question(
|
|
|
kwargs.get("model_name"), kwargs.get("question"), kwargs.get("context")
|
|
|
)
|
|
|
elif action == "list_models":
|
|
|
return self.list_available_models(kwargs.get("category"))
|
|
|
|
|
|
|
|
|
elif action == "text_to_video":
|
|
|
return await self.text_to_video(
|
|
|
kwargs.get("model_name"), kwargs.get("prompt"), **kwargs
|
|
|
)
|
|
|
elif action == "code_generation":
|
|
|
return await self.code_generation(
|
|
|
kwargs.get("model_name"), kwargs.get("prompt"), **kwargs
|
|
|
)
|
|
|
elif action == "text_to_3d":
|
|
|
return await self.text_to_3d(
|
|
|
kwargs.get("model_name"), kwargs.get("prompt"), **kwargs
|
|
|
)
|
|
|
elif action == "ocr":
|
|
|
return await self.ocr(
|
|
|
kwargs.get("model_name"), kwargs.get("image_data"), **kwargs
|
|
|
)
|
|
|
elif action == "document_analysis":
|
|
|
return await self.document_analysis(
|
|
|
kwargs.get("model_name"), kwargs.get("document_data"), **kwargs
|
|
|
)
|
|
|
elif action == "vision_language":
|
|
|
return await self.vision_language(
|
|
|
kwargs.get("model_name"),
|
|
|
kwargs.get("image_data"),
|
|
|
kwargs.get("text"),
|
|
|
**kwargs,
|
|
|
)
|
|
|
elif action == "music_generation":
|
|
|
return await self.music_generation(
|
|
|
kwargs.get("model_name"), kwargs.get("prompt"), **kwargs
|
|
|
)
|
|
|
elif action == "creative_writing":
|
|
|
return await self.creative_writing(
|
|
|
kwargs.get("model_name"), kwargs.get("prompt"), **kwargs
|
|
|
)
|
|
|
elif action == "business_document":
|
|
|
return await self.business_document(
|
|
|
kwargs.get("model_name"),
|
|
|
kwargs.get("document_type"),
|
|
|
kwargs.get("context"),
|
|
|
**kwargs,
|
|
|
)
|
|
|
else:
|
|
|
return {"error": f"Unknown action: {action}"}
|
|
|
|
|
|
|
|
|
|
|
|
async def text_to_video(
|
|
|
self, model_name: str, prompt: str, duration: int = 5, fps: int = 24, **kwargs
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Generate video from text prompt"""
|
|
|
try:
|
|
|
model = self._get_model_by_name(model_name)
|
|
|
if not model:
|
|
|
return {"error": f"Model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.TEXT_TO_VIDEO,
|
|
|
prompt=prompt,
|
|
|
duration=duration,
|
|
|
fps=fps,
|
|
|
**kwargs,
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def code_generation(
|
|
|
self, model_name: str, prompt: str, language: str = "python", **kwargs
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Generate code from natural language description"""
|
|
|
try:
|
|
|
model = self._get_model_by_name(model_name)
|
|
|
if not model:
|
|
|
return {"error": f"Model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.CODE_GENERATION,
|
|
|
prompt=prompt,
|
|
|
language=language,
|
|
|
**kwargs,
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def text_to_3d(
|
|
|
self, model_name: str, prompt: str, resolution: int = 64, **kwargs
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Generate 3D model from text description"""
|
|
|
try:
|
|
|
model = self._get_model_by_name(model_name)
|
|
|
if not model:
|
|
|
return {"error": f"Model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.TEXT_TO_3D,
|
|
|
prompt=prompt,
|
|
|
resolution=resolution,
|
|
|
**kwargs,
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def ocr(
|
|
|
self, model_name: str, image_data: bytes, language: str = "en", **kwargs
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Perform OCR on image"""
|
|
|
try:
|
|
|
model = self._get_model_by_name(model_name)
|
|
|
if not model:
|
|
|
return {"error": f"Model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.OCR,
|
|
|
image_data=image_data,
|
|
|
language=language,
|
|
|
**kwargs,
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def document_analysis(
|
|
|
self, model_name: str, document_data: bytes, **kwargs
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Analyze document structure and content"""
|
|
|
try:
|
|
|
model = self._get_model_by_name(model_name)
|
|
|
if not model:
|
|
|
return {"error": f"Model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.DOCUMENT_ANALYSIS,
|
|
|
document_data=document_data,
|
|
|
**kwargs,
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def vision_language(
|
|
|
self, model_name: str, image_data: bytes, text: str, **kwargs
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Process image and text together using multimodal models"""
|
|
|
try:
|
|
|
model = self._get_model_by_name(model_name)
|
|
|
if not model:
|
|
|
return {"error": f"Model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.VISION_LANGUAGE,
|
|
|
image_data=image_data,
|
|
|
text=text,
|
|
|
**kwargs,
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def music_generation(
|
|
|
self, model_name: str, prompt: str, duration: int = 30, **kwargs
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Generate music from text description"""
|
|
|
try:
|
|
|
model = self._get_model_by_name(model_name)
|
|
|
if not model:
|
|
|
return {"error": f"Model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.MUSIC_GENERATION,
|
|
|
prompt=prompt,
|
|
|
duration=duration,
|
|
|
**kwargs,
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def creative_writing(
|
|
|
self, model_name: str, prompt: str, content_type: str = "story", **kwargs
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Generate creative content"""
|
|
|
try:
|
|
|
model = self._get_model_by_name(model_name)
|
|
|
if not model:
|
|
|
return {"error": f"Model '{model_name}' not found"}
|
|
|
|
|
|
enhanced_prompt = f"Write a {content_type}: {prompt}"
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.CREATIVE_WRITING,
|
|
|
prompt=enhanced_prompt,
|
|
|
**kwargs,
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"error": str(e)}
|
|
|
|
|
|
async def business_document(
|
|
|
self, model_name: str, document_type: str, context: str, **kwargs
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Generate business documents"""
|
|
|
try:
|
|
|
model = self._get_model_by_name(model_name)
|
|
|
if not model:
|
|
|
return {"error": f"Model '{model_name}' not found"}
|
|
|
|
|
|
result = await self.model_manager.call_model(
|
|
|
model.model_id,
|
|
|
ModelCategory.EMAIL_GENERATION,
|
|
|
document_type=document_type,
|
|
|
context=context,
|
|
|
**kwargs,
|
|
|
)
|
|
|
return {"success": True, "result": result}
|
|
|
except Exception as e:
|
|
|
return {"error": str(e)}
|
|
|
|