"""User service for managing user operations""" import logging import uuid from typing import Optional, Dict from datetime import datetime from services.database_service import database_service logger = logging.getLogger(__name__) class UserService: """Service for user CRUD operations""" def __init__(self): self.table_name = "users" def _get_client(self): """Get Supabase client""" return database_service.get_client() def create_user(self, unique_id: str, name: str) -> Dict: """ Create a new user or get existing user by unique_id Args: unique_id: Browser-generated unique identifier name: User's display name Returns: User dictionary with id, unique_id, name, created_at, updated_at """ try: client = self._get_client() # Check if user already exists existing_user = self.get_user_by_unique_id(unique_id) if existing_user: logger.info(f"User already exists with unique_id: {unique_id}") return existing_user # Create new user user_data = { "unique_id": unique_id, "name": name, "created_at": datetime.utcnow().isoformat(), "updated_at": datetime.utcnow().isoformat() } result = client.table(self.table_name).insert(user_data).execute() if result.data and len(result.data) > 0: logger.info(f"Created new user: {result.data[0].get('id')}") return result.data[0] else: raise RuntimeError("Failed to create user: no data returned") except Exception as e: logger.error(f"Error creating user: {str(e)}") raise RuntimeError(f"Failed to create user: {str(e)}") def get_user_by_unique_id(self, unique_id: str) -> Optional[Dict]: """ Get user by unique_id Args: unique_id: Browser-generated unique identifier Returns: User dictionary or None if not found """ try: client = self._get_client() result = client.table(self.table_name).select("*").eq("unique_id", unique_id).execute() if result.data and len(result.data) > 0: return result.data[0] return None except Exception as e: logger.error(f"Error getting user by unique_id: {str(e)}") raise RuntimeError(f"Failed to get user: {str(e)}") def get_user_by_id(self, user_id: str) -> Optional[Dict]: """ Get user by id (UUID) Args: user_id: User UUID Returns: User dictionary or None if not found """ try: client = self._get_client() result = client.table(self.table_name).select("*").eq("id", user_id).execute() if result.data and len(result.data) > 0: return result.data[0] return None except Exception as e: logger.error(f"Error getting user by id: {str(e)}") raise RuntimeError(f"Failed to get user: {str(e)}") def update_user_name(self, user_id: str, name: str) -> Dict: """ Update user's name Args: user_id: User UUID name: New display name Returns: Updated user dictionary """ try: client = self._get_client() update_data = { "name": name, "updated_at": datetime.utcnow().isoformat() } result = client.table(self.table_name).update(update_data).eq("id", user_id).execute() if result.data and len(result.data) > 0: logger.info(f"Updated user name: {user_id}") return result.data[0] else: raise RuntimeError("Failed to update user: user not found") except Exception as e: logger.error(f"Error updating user name: {str(e)}") raise RuntimeError(f"Failed to update user: {str(e)}") def register_or_get_user(self, unique_id: str, name: Optional[str] = None) -> Dict: """ Register a new user or get existing user by unique_id If name is not provided, generates a random one Args: unique_id: Browser-generated unique identifier name: Optional user's display name (generates random if not provided) Returns: User dictionary """ # Generate random name if not provided if not name: name = f"User_{uuid.uuid4().hex[:8]}" # Check if user exists existing_user = self.get_user_by_unique_id(unique_id) if existing_user: return existing_user # Create new user return self.create_user(unique_id, name) # Initialize singleton instance user_service = UserService()