File size: 5,380 Bytes
6e8d513 |
|
"""User service for managing user operations"""
import logging
import uuid
from typing import Optional, Dict
from datetime import datetime
from services.database_service import database_service
logger = logging.getLogger(__name__)
class UserService:
"""Service for user CRUD operations"""
def __init__(self):
self.table_name = "users"
def _get_client(self):
"""Get Supabase client"""
return database_service.get_client()
def create_user(self, unique_id: str, name: str) -> Dict:
"""
Create a new user or get existing user by unique_id
Args:
unique_id: Browser-generated unique identifier
name: User's display name
Returns:
User dictionary with id, unique_id, name, created_at, updated_at
"""
try:
client = self._get_client()
# Check if user already exists
existing_user = self.get_user_by_unique_id(unique_id)
if existing_user:
logger.info(f"User already exists with unique_id: {unique_id}")
return existing_user
# Create new user
user_data = {
"unique_id": unique_id,
"name": name,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat()
}
result = client.table(self.table_name).insert(user_data).execute()
if result.data and len(result.data) > 0:
logger.info(f"Created new user: {result.data[0].get('id')}")
return result.data[0]
else:
raise RuntimeError("Failed to create user: no data returned")
except Exception as e:
logger.error(f"Error creating user: {str(e)}")
raise RuntimeError(f"Failed to create user: {str(e)}")
def get_user_by_unique_id(self, unique_id: str) -> Optional[Dict]:
"""
Get user by unique_id
Args:
unique_id: Browser-generated unique identifier
Returns:
User dictionary or None if not found
"""
try:
client = self._get_client()
result = client.table(self.table_name).select("*").eq("unique_id", unique_id).execute()
if result.data and len(result.data) > 0:
return result.data[0]
return None
except Exception as e:
logger.error(f"Error getting user by unique_id: {str(e)}")
raise RuntimeError(f"Failed to get user: {str(e)}")
def get_user_by_id(self, user_id: str) -> Optional[Dict]:
"""
Get user by id (UUID)
Args:
user_id: User UUID
Returns:
User dictionary or None if not found
"""
try:
client = self._get_client()
result = client.table(self.table_name).select("*").eq("id", user_id).execute()
if result.data and len(result.data) > 0:
return result.data[0]
return None
except Exception as e:
logger.error(f"Error getting user by id: {str(e)}")
raise RuntimeError(f"Failed to get user: {str(e)}")
def update_user_name(self, user_id: str, name: str) -> Dict:
"""
Update user's name
Args:
user_id: User UUID
name: New display name
Returns:
Updated user dictionary
"""
try:
client = self._get_client()
update_data = {
"name": name,
"updated_at": datetime.utcnow().isoformat()
}
result = client.table(self.table_name).update(update_data).eq("id", user_id).execute()
if result.data and len(result.data) > 0:
logger.info(f"Updated user name: {user_id}")
return result.data[0]
else:
raise RuntimeError("Failed to update user: user not found")
except Exception as e:
logger.error(f"Error updating user name: {str(e)}")
raise RuntimeError(f"Failed to update user: {str(e)}")
def register_or_get_user(self, unique_id: str, name: Optional[str] = None) -> Dict:
"""
Register a new user or get existing user by unique_id
If name is not provided, generates a random one
Args:
unique_id: Browser-generated unique identifier
name: Optional user's display name (generates random if not provided)
Returns:
User dictionary
"""
# Generate random name if not provided
if not name:
name = f"User_{uuid.uuid4().hex[:8]}"
# Check if user exists
existing_user = self.get_user_by_unique_id(unique_id)
if existing_user:
return existing_user
# Create new user
return self.create_user(unique_id, name)
# Initialize singleton instance
user_service = UserService()
|