|
|
""" |
|
|
Routes FastAPI pour intégration n8n avec MCP |
|
|
À ajouter dans votre app.py principal |
|
|
""" |
|
|
from fastapi import APIRouter, HTTPException, BackgroundTasks, UploadFile, File |
|
|
from pydantic import BaseModel |
|
|
from typing import Dict, Any, Optional, List |
|
|
import logging |
|
|
from datetime import datetime |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
n8n_router = APIRouter(prefix="/n8n", tags=["n8n"]) |
|
|
|
|
|
|
|
|
|
|
|
class N8NToolRequest(BaseModel): |
|
|
"""Request model pour appels n8n""" |
|
|
tool_name: str |
|
|
arguments: Dict[str, Any] |
|
|
context: Optional[Dict[str, Any]] = None |
|
|
async_callback: Optional[str] = None |
|
|
|
|
|
class Config: |
|
|
json_schema_extra = { |
|
|
"example": { |
|
|
"tool_name": "predict_stance", |
|
|
"arguments": { |
|
|
"topic": "climate change", |
|
|
"argument": "We need renewable energy" |
|
|
}, |
|
|
"context": { |
|
|
"session_id": "session_123", |
|
|
"user_id": "user_456" |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
class N8NBatchRequest(BaseModel): |
|
|
"""Request pour traitement batch""" |
|
|
tool_name: str |
|
|
items: List[Dict[str, Any]] |
|
|
batch_size: int = 10 |
|
|
parallel: bool = False |
|
|
|
|
|
class Config: |
|
|
json_schema_extra = { |
|
|
"example": { |
|
|
"tool_name": "predict_stance", |
|
|
"items": [ |
|
|
{"topic": "AI", "argument": "AI will help humanity"}, |
|
|
{"topic": "AI", "argument": "AI is dangerous"} |
|
|
], |
|
|
"batch_size": 10 |
|
|
} |
|
|
} |
|
|
|
|
|
class N8NPipelineRequest(BaseModel): |
|
|
"""Request pour pipeline complexe""" |
|
|
pipeline_name: str |
|
|
input_data: Dict[str, Any] |
|
|
steps: List[Dict[str, Any]] |
|
|
|
|
|
class Config: |
|
|
json_schema_extra = { |
|
|
"example": { |
|
|
"pipeline_name": "debate_analysis", |
|
|
"input_data": { |
|
|
"topic": "climate change", |
|
|
"text": "We must act now" |
|
|
}, |
|
|
"steps": [ |
|
|
{"tool": "predict_stance", "output_key": "stance"}, |
|
|
{"tool": "predict_kpa", "use_previous": True} |
|
|
] |
|
|
} |
|
|
} |
|
|
|
|
|
class N8NResponse(BaseModel): |
|
|
"""Response standardisée pour n8n""" |
|
|
success: bool |
|
|
data: Optional[Dict[str, Any]] = None |
|
|
error: Optional[str] = None |
|
|
execution_time: float |
|
|
timestamp: datetime = datetime.now() |
|
|
|
|
|
|
|
|
|
|
|
@n8n_router.post("/execute", response_model=N8NResponse) |
|
|
async def execute_tool(request: N8NToolRequest): |
|
|
""" |
|
|
Endpoint principal pour exécuter un outil MCP depuis n8n |
|
|
""" |
|
|
import time |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
from mcp.server import MCPServer |
|
|
from mcp import server |
|
|
|
|
|
|
|
|
result = await server.call_tool( |
|
|
tool_name=request.tool_name, |
|
|
arguments=request.arguments |
|
|
) |
|
|
|
|
|
|
|
|
if request.context: |
|
|
result["context"] = request.context |
|
|
|
|
|
execution_time = time.time() - start_time |
|
|
|
|
|
return N8NResponse( |
|
|
success=True, |
|
|
data=result, |
|
|
execution_time=execution_time |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Tool execution failed: {str(e)}") |
|
|
execution_time = time.time() - start_time |
|
|
|
|
|
return N8NResponse( |
|
|
success=False, |
|
|
error=str(e), |
|
|
execution_time=execution_time |
|
|
) |
|
|
|
|
|
@n8n_router.post("/batch", response_model=N8NResponse) |
|
|
async def batch_execute(request: N8NBatchRequest): |
|
|
""" |
|
|
Endpoint pour traitement batch depuis n8n |
|
|
""" |
|
|
import time |
|
|
import asyncio |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
from mcp import server |
|
|
|
|
|
results = [] |
|
|
|
|
|
|
|
|
if request.parallel: |
|
|
|
|
|
tasks = [] |
|
|
for item in request.items: |
|
|
task = server.call_tool( |
|
|
tool_name=request.tool_name, |
|
|
arguments=item |
|
|
) |
|
|
tasks.append(task) |
|
|
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
else: |
|
|
|
|
|
for i in range(0, len(request.items), request.batch_size): |
|
|
batch = request.items[i:i + request.batch_size] |
|
|
|
|
|
for item in batch: |
|
|
try: |
|
|
result = await server.call_tool( |
|
|
tool_name=request.tool_name, |
|
|
arguments=item |
|
|
) |
|
|
results.append(result) |
|
|
except Exception as e: |
|
|
results.append({"error": str(e), "item": item}) |
|
|
|
|
|
execution_time = time.time() - start_time |
|
|
|
|
|
return N8NResponse( |
|
|
success=True, |
|
|
data={ |
|
|
"results": results, |
|
|
"total": len(results), |
|
|
"successful": sum(1 for r in results if not isinstance(r, Exception) and "error" not in r), |
|
|
"failed": sum(1 for r in results if isinstance(r, Exception) or "error" in r) |
|
|
}, |
|
|
execution_time=execution_time |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Batch execution failed: {str(e)}") |
|
|
execution_time = time.time() - start_time |
|
|
|
|
|
return N8NResponse( |
|
|
success=False, |
|
|
error=str(e), |
|
|
execution_time=execution_time |
|
|
) |
|
|
|
|
|
@n8n_router.post("/pipeline", response_model=N8NResponse) |
|
|
async def execute_pipeline(request: N8NPipelineRequest): |
|
|
""" |
|
|
Endpoint pour exécuter un pipeline multi-étapes |
|
|
""" |
|
|
import time |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
from mcp import server |
|
|
|
|
|
pipeline_context = {"input": request.input_data} |
|
|
results = {} |
|
|
|
|
|
for step in request.steps: |
|
|
tool_name = step["tool"] |
|
|
output_key = step.get("output_key", tool_name) |
|
|
use_previous = step.get("use_previous", False) |
|
|
|
|
|
|
|
|
if use_previous: |
|
|
|
|
|
arguments = {**request.input_data, **results} |
|
|
else: |
|
|
arguments = step.get("arguments", request.input_data) |
|
|
|
|
|
|
|
|
result = await server.call_tool( |
|
|
tool_name=tool_name, |
|
|
arguments=arguments |
|
|
) |
|
|
|
|
|
results[output_key] = result |
|
|
pipeline_context[output_key] = result |
|
|
|
|
|
execution_time = time.time() - start_time |
|
|
|
|
|
return N8NResponse( |
|
|
success=True, |
|
|
data={ |
|
|
"pipeline": request.pipeline_name, |
|
|
"results": results, |
|
|
"context": pipeline_context |
|
|
}, |
|
|
execution_time=execution_time |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Pipeline execution failed: {str(e)}") |
|
|
execution_time = time.time() - start_time |
|
|
|
|
|
return N8NResponse( |
|
|
success=False, |
|
|
error=str(e), |
|
|
execution_time=execution_time |
|
|
) |
|
|
|
|
|
@n8n_router.post("/voice-pipeline") |
|
|
async def voice_debate_pipeline( |
|
|
audio: UploadFile = File(...), |
|
|
topic: str = None, |
|
|
session_id: str = None |
|
|
): |
|
|
""" |
|
|
Pipeline complet : Audio → STT → Stance → KPA → Argument Generation → TTS |
|
|
Optimisé pour n8n |
|
|
""" |
|
|
import time |
|
|
import tempfile |
|
|
import os |
|
|
start_time = time.time() |
|
|
|
|
|
try: |
|
|
from mcp import server |
|
|
from services.stt_service import transcribe_audio |
|
|
from services.tts_service import text_to_speech |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp: |
|
|
content = await audio.read() |
|
|
tmp.write(content) |
|
|
tmp_path = tmp.name |
|
|
|
|
|
try: |
|
|
|
|
|
transcription = await transcribe_audio(tmp_path) |
|
|
user_text = transcription.get("text", "") |
|
|
|
|
|
|
|
|
stance_result = await server.call_tool( |
|
|
"predict_stance", |
|
|
{"topic": topic, "argument": user_text} |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
opposite_stance = "CON" if stance_result["predicted_stance"] == "PRO" else "PRO" |
|
|
counter_arg_result = await server.call_tool( |
|
|
"generate_argument", |
|
|
{ |
|
|
"prompt": f"Generate a {opposite_stance} argument about {topic}", |
|
|
"context": f"User said: {user_text}", |
|
|
"stance": opposite_stance |
|
|
} |
|
|
) |
|
|
|
|
|
|
|
|
tts_audio_path = await text_to_speech( |
|
|
counter_arg_result["generated_argument"] |
|
|
) |
|
|
|
|
|
execution_time = time.time() - start_time |
|
|
|
|
|
return N8NResponse( |
|
|
success=True, |
|
|
data={ |
|
|
"transcription": user_text, |
|
|
"stance_analysis": stance_result, |
|
|
"counter_argument": counter_arg_result, |
|
|
"audio_response_path": tts_audio_path, |
|
|
"session_id": session_id |
|
|
}, |
|
|
execution_time=execution_time |
|
|
) |
|
|
|
|
|
finally: |
|
|
|
|
|
if os.path.exists(tmp_path): |
|
|
os.remove(tmp_path) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Voice pipeline failed: {str(e)}") |
|
|
return N8NResponse( |
|
|
success=False, |
|
|
error=str(e), |
|
|
execution_time=time.time() - start_time |
|
|
) |
|
|
|
|
|
@n8n_router.get("/tools") |
|
|
async def list_tools(): |
|
|
""" |
|
|
Liste tous les outils disponibles (format n8n-friendly) |
|
|
""" |
|
|
try: |
|
|
from mcp import server |
|
|
tools = await server.list_tools() |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"tools": tools, |
|
|
"total": len(tools) |
|
|
} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@n8n_router.get("/resources") |
|
|
async def list_resources(): |
|
|
""" |
|
|
Liste toutes les ressources disponibles (format n8n-friendly) |
|
|
""" |
|
|
try: |
|
|
from mcp import server |
|
|
resources = await server.list_resources() |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"resources": resources, |
|
|
"total": len(resources) |
|
|
} |
|
|
except Exception as e: |
|
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
@n8n_router.get("/health") |
|
|
async def health_check(): |
|
|
""" |
|
|
Health check pour n8n monitoring |
|
|
""" |
|
|
from services.stance_model_manager import stance_model_manager |
|
|
from services.label_model_manager import kpa_model_manager |
|
|
|
|
|
return { |
|
|
"status": "healthy", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"models": { |
|
|
"stance": stance_model_manager.model_loaded if stance_model_manager else False, |
|
|
"kpa": kpa_model_manager.model_loaded if kpa_model_manager else False |
|
|
}, |
|
|
"services": { |
|
|
"stt": True, |
|
|
"tts": True, |
|
|
"chat": True |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@n8n_router.post("/webhook/debate-result") |
|
|
async def webhook_debate_result(data: Dict[str, Any], background_tasks: BackgroundTasks): |
|
|
""" |
|
|
Webhook pour recevoir les résultats de débat depuis n8n |
|
|
Peut être utilisé pour stocker, notifier, etc. |
|
|
""" |
|
|
logger.info(f"Received debate result webhook: {data}") |
|
|
|
|
|
|
|
|
background_tasks.add_task(process_debate_result, data) |
|
|
|
|
|
return {"status": "received", "message": "Processing in background"} |
|
|
|
|
|
async def process_debate_result(data: Dict[str, Any]): |
|
|
""" |
|
|
Traiter les résultats de débat en arrière-plan |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"Processing debate result: {data}") |
|
|
|
|
|
|
|
|
|
|
|
def register_n8n_routes(app): |
|
|
""" |
|
|
Enregistrer les routes n8n dans l'application FastAPI |
|
|
""" |
|
|
app.include_router(n8n_router) |
|
|
logger.info("n8n routes registered successfully") |