malek-messaoudii
add mcp part
8791d59
raw
history blame
12.1 kB
from typing import List, Dict, Any
from mcp import Resource
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class ModelResource(Resource):
"""Resource representing a loaded model"""
def __init__(self, model_name: str, model_info: Dict[str, Any]):
self.model_name = model_name
self.model_info = model_info
super().__init__(
uri=f"model://{model_name}",
name=model_name,
description=f"{model_name} model information and status",
mime_type="application/json"
)
async def get_content(self) -> str:
"""Get model information as JSON"""
import json
return json.dumps({
**self.model_info,
"timestamp": datetime.now().isoformat(),
"uri": self.uri
})
class StanceDetectionResource(ModelResource):
"""Resource for stance detection model"""
def __init__(self):
from services.stance_model_manager import stance_model_manager
model_info = {
"type": "stance_detection",
"description": "Detects PRO/CON stance for topic-argument pairs",
"capabilities": ["single_prediction", "batch_prediction"],
"input_format": {"topic": "string", "argument": "string"},
"output_format": {
"predicted_stance": "PRO/CON",
"confidence": "float",
"probabilities": {"PRO": "float", "CON": "float"}
}
}
if stance_model_manager and stance_model_manager.model_loaded:
model_info.update({
"loaded": True,
"device": str(stance_model_manager.device),
"model_id": getattr(stance_model_manager, 'model_id', 'unknown')
})
else:
model_info["loaded"] = False
super().__init__("stance_detection", model_info)
class KPAResource(ModelResource):
"""Resource for Keypoint-Argument matching model"""
def __init__(self):
from services.label_model_manager import kpa_model_manager
model_info = {
"type": "keypoint_argument_matching",
"description": "Matches arguments with key points (apparie/non_apparie)",
"capabilities": ["single_prediction", "batch_prediction"],
"input_format": {"argument": "string", "key_point": "string"},
"output_format": {
"prediction": "0/1",
"label": "apparie/non_apparie",
"confidence": "float",
"probabilities": {"non_apparie": "float", "apparie": "float"}
}
}
if kpa_model_manager and kpa_model_manager.model_loaded:
model_info.update({
"loaded": True,
"device": str(kpa_model_manager.device),
"model_id": getattr(kpa_model_manager, 'model_id', 'unknown'),
"max_length": getattr(kpa_model_manager, 'max_length', 256)
})
else:
model_info["loaded"] = False
super().__init__("kpa_matching", model_info)
class STTResource(Resource):
"""Resource for Speech-to-Text capabilities"""
def __init__(self):
from config import GROQ_API_KEY, GROQ_STT_MODEL
super().__init__(
uri="service://speech-to-text",
name="speech_to_text",
description="Speech-to-Text service using Groq Whisper API",
mime_type="application/json"
)
self.config = {
"provider": "Groq",
"model": GROQ_STT_MODEL,
"enabled": bool(GROQ_API_KEY),
"language": "English only",
"max_audio_size": "10MB",
"supported_formats": ["wav", "mp3", "m4a", "mp4"]
}
async def get_content(self) -> str:
"""Get STT service information"""
import json
return json.dumps({
**self.config,
"timestamp": datetime.now().isoformat(),
"uri": self.uri
})
class TTSResource(Resource):
"""Resource for Text-to-Speech capabilities"""
def __init__(self):
from config import GROQ_API_KEY, GROQ_TTS_MODEL, GROQ_TTS_VOICE
super().__init__(
uri="service://text-to-speech",
name="text_to_speech",
description="Text-to-Speech service using Groq PlayAI TTS",
mime_type="application/json"
)
self.config = {
"provider": "Groq",
"model": GROQ_TTS_MODEL,
"voice": GROQ_TTS_VOICE,
"enabled": bool(GROQ_API_KEY),
"language": "English only",
"format": "wav/mp3",
"voices_available": ["Aaliyah-PlayAI", "Aria-PlayAI", "Dexter-PlayAI", "Fiona-PlayAI"]
}
async def get_content(self) -> str:
"""Get TTS service information"""
import json
return json.dumps({
**self.config,
"timestamp": datetime.now().isoformat(),
"uri": self.uri
})
class ChatbotResource(Resource):
"""Resource for Chatbot capabilities"""
def __init__(self):
from config import GROQ_API_KEY, GROQ_CHAT_MODEL
super().__init__(
uri="service://chatbot",
name="chatbot",
description="Chatbot service using Groq LLM API",
mime_type="application/json"
)
self.config = {
"provider": "Groq",
"model": GROQ_CHAT_MODEL,
"enabled": bool(GROQ_API_KEY),
"language": "English only",
"features": ["conversation", "context_awareness", "voice_chat"],
"max_context_length": 8192
}
async def get_content(self) -> str:
"""Get chatbot service information"""
import json
return json.dumps({
**self.config,
"timestamp": datetime.now().isoformat(),
"uri": self.uri
})
class ArgumentGenerationResource(Resource):
"""Resource for Argument Generation model (à compléter)"""
def __init__(self):
super().__init__(
uri="model://argument-generation",
name="argument_generation",
description="Persuasive argument generation model",
mime_type="application/json"
)
self.config = {
"type": "argument_generation",
"status": "not_implemented",
"description": "TODO: Implement your argument generation model",
"planned_capabilities": [
"single_argument_generation",
"batch_generation",
"stance_controlled_generation",
"counter_argument_generation"
]
}
async def get_content(self) -> str:
"""Get argument generation model information"""
import json
return json.dumps({
**self.config,
"timestamp": datetime.now().isoformat(),
"uri": self.uri,
"note": "This is a placeholder. Implement your model in services/argument_generation.py"
})
class SystemHealthResource(Resource):
"""Resource for system health and status"""
def __init__(self):
super().__init__(
uri="system://health",
name="system_health",
description="System health and service status",
mime_type="application/json"
)
async def get_content(self) -> str:
"""Get system health information"""
import json
from datetime import datetime
# Collect model status
model_status = {}
try:
from services.stance_model_manager import stance_model_manager
model_status["stance_detection"] = {
"loaded": stance_model_manager.model_loaded if stance_model_manager else False
}
except:
model_status["stance_detection"] = {"loaded": False}
try:
from services.label_model_manager import kpa_model_manager
model_status["kpa_matching"] = {
"loaded": kpa_model_manager.model_loaded if kpa_model_manager else False
}
except:
model_status["kpa_matching"] = {"loaded": False}
# Service status
from config import GROQ_API_KEY
service_status = {
"stt": bool(GROQ_API_KEY),
"tts": bool(GROQ_API_KEY),
"chatbot": bool(GROQ_API_KEY),
"argument_generation": False # À implémenter
}
return json.dumps({
"timestamp": datetime.now().isoformat(),
"status": "operational",
"models": model_status,
"services": service_status,
"api_version": "1.0.0",
"mcp_version": "1.0.0"
})
class APIDocumentationResource(Resource):
"""Resource for API documentation"""
def __init__(self):
super().__init__(
uri="documentation://api",
name="api_documentation",
description="API endpoints documentation",
mime_type="application/json"
)
self.documentation = {
"endpoints": {
"mcp": {
"/mcp/health": "GET - Health check",
"/mcp/resources": "GET - List all resources",
"/mcp/tools": "GET - List all tools",
"/mcp/tools/call": "POST - Call a tool"
},
"models": {
"/api/v1/kpa/predict": "POST - KPA prediction",
"/api/v1/stance/predict": "POST - Stance prediction",
"/api/v1/stance/batch-predict": "POST - Batch stance prediction"
},
"voice": {
"/api/v1/stt/": "POST - Speech to text",
"/api/v1/tts/": "POST - Text to speech",
"/voice-chat/voice": "POST - Voice chat",
"/voice-chat/text": "POST - Text chat"
}
},
"authentication": "Currently none (add JWT or API key based auth)",
"rate_limits": "None configured",
"version": "2.0.0"
}
async def get_content(self) -> str:
"""Get API documentation"""
import json
return json.dumps({
**self.documentation,
"timestamp": datetime.now().isoformat(),
"uri": self.uri
})
def get_resources() -> List[Resource]:
"""Return all available MCP resources"""
resources = []
try:
resources.append(StanceDetectionResource())
except Exception as e:
logger.warning(f"Failed to create StanceDetectionResource: {e}")
try:
resources.append(KPAResource())
except Exception as e:
logger.warning(f"Failed to create KPAResource: {e}")
try:
resources.append(STTResource())
except Exception as e:
logger.warning(f"Failed to create STTResource: {e}")
try:
resources.append(TTSResource())
except Exception as e:
logger.warning(f"Failed to create TTSResource: {e}")
try:
resources.append(ChatbotResource())
except Exception as e:
logger.warning(f"Failed to create ChatbotResource: {e}")
try:
resources.append(ArgumentGenerationResource())
except Exception as e:
logger.warning(f"Failed to create ArgumentGenerationResource: {e}")
try:
resources.append(SystemHealthResource())
except Exception as e:
logger.warning(f"Failed to create SystemHealthResource: {e}")
try:
resources.append(APIDocumentationResource())
except Exception as e:
logger.warning(f"Failed to create APIDocumentationResource: {e}")
logger.info(f"Created {len(resources)} MCP resources")
return resources