Yassine Mhirsi
feat: Enhance analysis endpoints to support JSON body input and CSV file uploads for argument analysis
7a0be7b
raw
history blame
6.93 kB
"""Analysis endpoints for processing arguments, extracting topics, and predicting stance"""
from fastapi import APIRouter, HTTPException, Header, UploadFile, File, Form
from typing import Optional, List, Union
import logging
import csv
import io
import json
from datetime import datetime
from services.analysis_service import analysis_service
from models.analysis import (
AnalysisRequest,
AnalysisResponse,
AnalysisResult,
GetAnalysisRequest,
GetAnalysisResponse,
)
router = APIRouter()
logger = logging.getLogger(__name__)
def parse_csv_file(file_content: bytes) -> List[str]:
"""
Parse CSV file and extract arguments
Args:
file_content: CSV file content as bytes
Returns:
List of argument strings
"""
try:
# Decode bytes to string
content = file_content.decode('utf-8')
# Parse CSV
csv_reader = csv.reader(io.StringIO(content))
arguments = []
# Skip header row if present, extract arguments from first column or 'argument' column
rows = list(csv_reader)
if len(rows) == 0:
return []
# Check if first row is header
header = rows[0] if rows else []
start_idx = 1 if any(col.lower() in ['argument', 'text', 'content'] for col in header) else 0
# Find argument column index
arg_col_idx = 0
if start_idx == 1:
for idx, col in enumerate(header):
if col.lower() in ['argument', 'text', 'content']:
arg_col_idx = idx
break
# Extract arguments
for row in rows[start_idx:]:
if row and len(row) > arg_col_idx:
arg = row[arg_col_idx].strip()
if arg: # Only add non-empty arguments
arguments.append(arg)
return arguments
except Exception as e:
logger.error(f"Error parsing CSV file: {str(e)}")
raise ValueError(f"Failed to parse CSV file: {str(e)}")
@router.post("", response_model=AnalysisResponse, tags=["Analysis"])
async def analyse_arguments(
request: AnalysisRequest,
x_user_id: Optional[str] = Header(None, alias="X-User-ID")
):
"""
Analyze arguments: extract topics and predict stance (JSON body)
- **X-User-ID**: User UUID (required in header)
- **arguments**: List of argument texts in JSON body
Returns analysis results with extracted topics and stance predictions
"""
if not x_user_id:
raise HTTPException(status_code=400, detail="X-User-ID header is required")
try:
arguments = request.arguments
logger.info(f"Received {len(arguments)} arguments from JSON body")
# Analyze arguments
results = analysis_service.analyze_arguments(
user_id=x_user_id,
arguments=arguments
)
# Convert to response models
analysis_results = [
AnalysisResult(**result) for result in results
]
logger.info(f"Analysis completed: {len(analysis_results)} results")
return AnalysisResponse(
results=analysis_results,
total_processed=len(analysis_results),
timestamp=datetime.now().isoformat()
)
except HTTPException:
raise
except ValueError as e:
logger.error(f"Validation error: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Analysis error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
@router.post("/csv", response_model=AnalysisResponse, tags=["Analysis"])
async def analyse_arguments_csv(
file: UploadFile = File(...),
x_user_id: Optional[str] = Header(None, alias="X-User-ID")
):
"""
Analyze arguments from CSV file: extract topics and predict stance
- **X-User-ID**: User UUID (required in header)
- **file**: CSV file with arguments
Returns analysis results with extracted topics and stance predictions
"""
if not x_user_id:
raise HTTPException(status_code=400, detail="X-User-ID header is required")
try:
# Read CSV file
file_content = await file.read()
arguments = parse_csv_file(file_content)
if not arguments:
raise HTTPException(
status_code=400,
detail="CSV file is empty or contains no valid arguments"
)
logger.info(f"Parsed {len(arguments)} arguments from CSV file")
# Analyze arguments
results = analysis_service.analyze_arguments(
user_id=x_user_id,
arguments=arguments
)
# Convert to response models
analysis_results = [
AnalysisResult(**result) for result in results
]
logger.info(f"Analysis completed: {len(analysis_results)} results")
return AnalysisResponse(
results=analysis_results,
total_processed=len(analysis_results),
timestamp=datetime.now().isoformat()
)
except HTTPException:
raise
except ValueError as e:
logger.error(f"Validation error: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Analysis error: {str(e)}")
raise HTTPException(status_code=500, detail=f"Analysis failed: {str(e)}")
@router.get("", response_model=GetAnalysisResponse, tags=["Analysis"])
async def get_analysis_results(
limit: int = 100,
offset: int = 0,
x_user_id: Optional[str] = Header(None, alias="X-User-ID")
):
"""
Get user's analysis results
- **X-User-ID**: User UUID (required in header)
- **limit**: Maximum number of results to return (default: 100, max: 1000)
- **offset**: Number of results to skip (default: 0)
Returns paginated analysis results
"""
if not x_user_id:
raise HTTPException(status_code=400, detail="X-User-ID header is required")
try:
results = analysis_service.get_user_analysis_results(
user_id=x_user_id,
limit=limit,
offset=offset
)
# Convert to response models
analysis_results = [
AnalysisResult(**result) for result in results
]
return GetAnalysisResponse(
results=analysis_results,
total=len(analysis_results),
limit=limit,
offset=offset
)
except Exception as e:
logger.error(f"Error getting analysis results: {str(e)}")
raise HTTPException(status_code=500, detail=f"Failed to get analysis results: {str(e)}")