Yassine Mhirsi
feat: Add analysis endpoints and service for processing arguments, extracting topics, and predicting stance
430b54f
raw
history blame
6.01 kB
"""Analysis endpoints for processing arguments, extracting topics, and predicting stance"""
from fastapi import APIRouter, HTTPException, Header, UploadFile, File
from typing import Optional
import logging
import csv
import io
from datetime import datetime
from services.analysis_service import analysis_service
from models.analysis import (
AnalysisRequest,
AnalysisResponse,
AnalysisResult,
GetAnalysisRequest,
GetAnalysisResponse,
)
router = APIRouter()
logger = logging.getLogger(__name__)
def parse_csv_file(file_content: bytes) -> List[str]:
"""
Parse CSV file and extract arguments
Args:
file_content: CSV file content as bytes
Returns:
List of argument strings
"""
try:
# Decode bytes to string
content = file_content.decode('utf-8')
# Parse CSV
csv_reader = csv.reader(io.StringIO(content))
arguments = []
# Skip header row if present, extract arguments from first column or 'argument' column
rows = list(csv_reader)
if len(rows) == 0:
return []
# Check if first row is header
header = rows[0] if rows else []
start_idx = 1 if any(col.lower() in ['argument', 'text', 'content'] for col in header) else 0
# Find argument column index
arg_col_idx = 0
if start_idx == 1:
for idx, col in enumerate(header):
if col.lower() in ['argument', 'text', 'content']:
arg_col_idx = idx
break
# Extract arguments
for row in rows[start_idx:]:
if row and len(row) > arg_col_idx:
arg = row[arg_col_idx].strip()
if arg: # Only add non-empty arguments
arguments.append(arg)
return arguments
except Exception as e:
logger.error(f"Error parsing CSV file: {str(e)}")
raise ValueError(f"Failed to parse CSV file: {str(e)}")
@router.post("/analyse", response_model=AnalysisResponse, tags=["Analysis"])
async def analyse_arguments(
request: Optional[AnalysisRequest] = None,
file: Optional[UploadFile] = File(None),
x_user_id: Optional[str] = Header(None, alias="X-User-ID")
):
"""
Analyze arguments: extract topics and predict stance
Accepts either:
- JSON body with `arguments` array, OR
- CSV file upload with arguments
- **X-User-ID**: User UUID (required in header)
- **arguments**: List of argument texts (if using JSON)
- **file**: CSV file with arguments (if using file upload)
Returns analysis results with extracted topics and stance predictions
"""
if not x_user_id:
raise HTTPException(status_code=400, detail="X-User-ID header is required")
try:
arguments = []
# Get arguments from either JSON body or CSV file
if file:
# Read CSV file
file_content = await file.read()
arguments = parse_csv_file(file_content)
if not arguments:
raise HTTPException(
status_code=400,
detail="CSV file is empty or contains no valid arguments"
)
logger.info(f"Parsed {len(arguments)} arguments from CSV file")
elif request and request.arguments:
arguments = request.arguments
logger.info(f"Received {len(arguments)} arguments from JSON body")
else:
raise HTTPException(
status_code=400,
detail="Either 'arguments' in JSON body or CSV 'file' must be provided"
)
# Analyze arguments
results = analysis_service.analyze_arguments(
user_id=x_user_id,
arguments=arguments
)
# Convert to response models
analysis_results = [
AnalysisResult(**result) for result in results
]
logger.info(f"Analysis completed: {len(analysis_results)} results")
return AnalysisResponse(
results=analysis_results,
total_processed=len(analysis_results),
timestamp=datetime.now().isoformat()
)
except HTTPException:
raise
except ValueError as e:
logger.error(f"Validation error: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Analysis error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
@router.get("/analyse", response_model=GetAnalysisResponse, tags=["Analysis"])
async def get_analysis_results(
limit: int = 100,
offset: int = 0,
x_user_id: Optional[str] = Header(None, alias="X-User-ID")
):
"""
Get user's analysis results
- **X-User-ID**: User UUID (required in header)
- **limit**: Maximum number of results to return (default: 100, max: 1000)
- **offset**: Number of results to skip (default: 0)
Returns paginated analysis results
"""
if not x_user_id:
raise HTTPException(status_code=400, detail="X-User-ID header is required")
try:
results = analysis_service.get_user_analysis_results(
user_id=x_user_id,
limit=limit,
offset=offset
)
# Convert to response models
analysis_results = [
AnalysisResult(**result) for result in results
]
return GetAnalysisResponse(
results=analysis_results,
total=len(analysis_results),
limit=limit,
offset=offset
)
except Exception as e:
logger.error(f"Error getting analysis results: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get analysis results: {str(e)}")