File size: 5,380 Bytes
6e8d513 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
"""User service for managing user operations"""
import logging
import uuid
from typing import Optional, Dict
from datetime import datetime
from services.database_service import database_service
logger = logging.getLogger(__name__)
class UserService:
"""Service for user CRUD operations"""
def __init__(self):
self.table_name = "users"
def _get_client(self):
"""Get Supabase client"""
return database_service.get_client()
def create_user(self, unique_id: str, name: str) -> Dict:
"""
Create a new user or get existing user by unique_id
Args:
unique_id: Browser-generated unique identifier
name: User's display name
Returns:
User dictionary with id, unique_id, name, created_at, updated_at
"""
try:
client = self._get_client()
# Check if user already exists
existing_user = self.get_user_by_unique_id(unique_id)
if existing_user:
logger.info(f"User already exists with unique_id: {unique_id}")
return existing_user
# Create new user
user_data = {
"unique_id": unique_id,
"name": name,
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat()
}
result = client.table(self.table_name).insert(user_data).execute()
if result.data and len(result.data) > 0:
logger.info(f"Created new user: {result.data[0].get('id')}")
return result.data[0]
else:
raise RuntimeError("Failed to create user: no data returned")
except Exception as e:
logger.error(f"Error creating user: {str(e)}")
raise RuntimeError(f"Failed to create user: {str(e)}")
def get_user_by_unique_id(self, unique_id: str) -> Optional[Dict]:
"""
Get user by unique_id
Args:
unique_id: Browser-generated unique identifier
Returns:
User dictionary or None if not found
"""
try:
client = self._get_client()
result = client.table(self.table_name).select("*").eq("unique_id", unique_id).execute()
if result.data and len(result.data) > 0:
return result.data[0]
return None
except Exception as e:
logger.error(f"Error getting user by unique_id: {str(e)}")
raise RuntimeError(f"Failed to get user: {str(e)}")
def get_user_by_id(self, user_id: str) -> Optional[Dict]:
"""
Get user by id (UUID)
Args:
user_id: User UUID
Returns:
User dictionary or None if not found
"""
try:
client = self._get_client()
result = client.table(self.table_name).select("*").eq("id", user_id).execute()
if result.data and len(result.data) > 0:
return result.data[0]
return None
except Exception as e:
logger.error(f"Error getting user by id: {str(e)}")
raise RuntimeError(f"Failed to get user: {str(e)}")
def update_user_name(self, user_id: str, name: str) -> Dict:
"""
Update user's name
Args:
user_id: User UUID
name: New display name
Returns:
Updated user dictionary
"""
try:
client = self._get_client()
update_data = {
"name": name,
"updated_at": datetime.utcnow().isoformat()
}
result = client.table(self.table_name).update(update_data).eq("id", user_id).execute()
if result.data and len(result.data) > 0:
logger.info(f"Updated user name: {user_id}")
return result.data[0]
else:
raise RuntimeError("Failed to update user: user not found")
except Exception as e:
logger.error(f"Error updating user name: {str(e)}")
raise RuntimeError(f"Failed to update user: {str(e)}")
def register_or_get_user(self, unique_id: str, name: Optional[str] = None) -> Dict:
"""
Register a new user or get existing user by unique_id
If name is not provided, generates a random one
Args:
unique_id: Browser-generated unique identifier
name: Optional user's display name (generates random if not provided)
Returns:
User dictionary
"""
# Generate random name if not provided
if not name:
name = f"User_{uuid.uuid4().hex[:8]}"
# Check if user exists
existing_user = self.get_user_by_unique_id(unique_id)
if existing_user:
return existing_user
# Create new user
return self.create_user(unique_id, name)
# Initialize singleton instance
user_service = UserService()
|