mlse-player-3d / app /sam_3d_service.py
Jake Reardon
chore: end-of-day commit
7741722
import os
import sys
import tempfile
from typing import Dict, Any, Optional, Callable
import numpy as np
import logging
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Add SAM 3D Body repository to Python path
sys.path.append('/app/sam-3d-body')
# Ensure helper directory exists
os.makedirs(os.path.join(os.path.dirname(__file__), 'helper'), exist_ok=True)
# Global model instance
model = None
estimator = None
def initialize_model():
"""
Initialize the SAM 3D Body model using a helper script in /app/helper/model_loader.py
"""
global model, estimator
try:
logger.info("Initializing SAM 3D Body model...")
# Dev mode: skip heavy model loading and use a lightweight stub estimator
if os.environ.get('SKIP_MODEL_LOAD', '') in ('1', 'true', 'yes'):
logger.info("SKIP_MODEL_LOAD enabled — using stub estimator for local testing")
class StubEstimator:
def process_one_image(self, image_path, options=None):
# Return a minimal cube mesh so downstream code can run
verts = [[-0.5, -0.5, -0.5], [0.5, -0.5, -0.5], [0.5, 0.5, -0.5], [-0.5, 0.5, -0.5],
[-0.5, -0.5, 0.5], [0.5, -0.5, 0.5], [0.5, 0.5, 0.5], [-0.5, 0.5, 0.5]]
faces = [[0,1,2],[0,2,3],[4,5,6],[4,6,7],[0,1,5],[0,5,4],[2,3,7],[2,7,6],[1,2,6],[1,6,5],[0,3,7],[0,7,4]]
return {"mesh": {"vertices": verts, "faces": np.array(faces, dtype=np.int64)}}
estimator = StubEstimator()
model = None
logger.info("Stub estimator ready")
return
# Run the helper script to load the model in a separate process
import subprocess
helper_path = os.path.join(os.path.dirname(__file__), 'helper', 'model_loader.py')
if not os.path.exists(helper_path):
logger.error("Helper script does not exist!")
raise RuntimeError(f"Helper script not found at {helper_path}")
logger.info(f"Running helper script: {helper_path}")
try:
result = subprocess.run([sys.executable, helper_path], capture_output=True, text=True)
logger.info(f"Helper script stdout: {result.stdout}")
logger.info(f"Helper script stderr: {result.stderr}")
logger.info(f"Helper script return code: {result.returncode}")
except Exception as e:
logger.warning(f"Failed to run helper script: {e}; falling back to stub estimator")
result = None
# If helper didn't create a model, or helper failed, fall back to stub instead of crashing
model_pkl_paths = [
'/app/model.pkl',
os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'model.pkl'))
]
found_model = None
for p in model_pkl_paths:
if os.path.exists(p):
found_model = p
break
if found_model:
try:
import pickle
with open(found_model, 'rb') as f:
model, model_cfg = pickle.load(f)
from sam_3d_body import SAM3DBodyEstimator
estimator = SAM3DBodyEstimator(sam_3d_body_model=model, model_cfg=model_cfg)
logger.info(f"Estimator initialized from {found_model}")
return
except Exception as e:
logger.error(f"Failed to load model from {found_model}: {e}; falling back to stub")
# Final fallback: stub estimator so app works without helper/model.pkl
logger.info("Using stub estimator (no helper/model.pkl available)")
class StubEstimator:
def process_one_image(self, image_path, options=None):
verts = [[-0.5, -0.5, -0.5], [0.5, -0.5, -0.5], [0.5, 0.5, -0.5], [-0.5, 0.5, -0.5],
[-0.5, -0.5, 0.5], [0.5, -0.5, 0.5], [0.5, 0.5, 0.5], [-0.5, 0.5, 0.5]]
faces = [[0,1,2],[0,2,3],[4,5,6],[4,6,7],[0,1,5],[0,5,4],[2,3,7],[2,7,6],[1,2,6],[1,6,5],[0,3,7],[0,7,4]]
return {"mesh": {"vertices": verts, "faces": np.array(faces, dtype=np.int64)}}
estimator = StubEstimator()
model = None
logger.info("Stub estimator ready")
except Exception as e:
import traceback
logger.error(f"Error initializing SAM 3D Body model: {str(e)}")
logger.error(traceback.format_exc())
raise
def process_image(
image_path: str,
player_name: str = "player",
use_keypoints: bool = True,
use_mask: bool = True,
job_progress_callback: Callable[[float], None] = None
) -> Dict[str, Any]:
global model, estimator
try:
if model is None or estimator is None:
initialize_model()
if job_progress_callback:
job_progress_callback(0.1)
process_options = {"use_keypoints": use_keypoints, "use_mask": use_mask}
logger.info(f"Processing image with SAM 3D Body: {image_path}")
output = estimator.process_one_image(image_path=image_path, options=process_options)
if job_progress_callback:
job_progress_callback(0.7)
job_id = os.path.basename(image_path).split('_')[0]
output_dir = f"outputs/{job_id}"
os.makedirs(output_dir, exist_ok=True)
model_path = f"{output_dir}/{player_name}.glb"
export_mesh_as_glb(output["mesh"], model_path)
preview_path = f"{output_dir}/{player_name}_preview.jpg"
generate_model_preview(output["mesh"], preview_path)
if job_progress_callback:
job_progress_callback(1.0)
return {"model_path": model_path, "preview_path": preview_path, "status": "completed"}
except Exception as e:
logger.error(f"Error processing image: {str(e)}")
raise RuntimeError(f"Failed to process image: {str(e)}")
def export_mesh_as_glb(mesh, output_path: str):
import trimesh
vertices = mesh.get("vertices", [])
faces = mesh.get("faces", [])
if hasattr(faces, 'reshape'):
faces = faces.reshape(-1, 3)
mesh_trimesh = trimesh.Trimesh(
vertices=vertices,
faces=faces,
vertex_normals=mesh.get("normals", None),
visual=trimesh.visual.TextureVisuals(uv=mesh.get("uvs", None)) if mesh.get("uvs") is not None else None
)
mesh_trimesh.export(output_path, file_type="glb")
logger.info(f"Exported mesh to {output_path}")
def generate_model_preview(mesh, output_path: str):
# Create a simple, safe CPU-only preview image using Pillow.
# This avoids using pyrender/GL which can crash on macOS when called from background threads.
try:
from PIL import Image, ImageDraw
except Exception:
logger.warning("Pillow not installed; creating a blank preview")
# Fallback: write a blank file using numpy if Pillow missing
import numpy as _np
_img = _np.ones((512, 512, 3), dtype=_np.uint8) * 255
try:
import imageio
imageio.imwrite(output_path, _img)
except Exception:
with open(output_path, 'wb') as f:
f.write(b'')
logger.info(f"Generated placeholder preview at {output_path}")
return
import numpy as _np
vertices = mesh.get("vertices", [])
faces = mesh.get("faces", [])
if hasattr(faces, 'reshape'):
try:
faces = faces.reshape(-1, 3)
except Exception:
faces = faces
img_size = 512
img = Image.new('RGB', (img_size, img_size), (255, 255, 255))
draw = ImageDraw.Draw(img)
try:
verts = _np.array(vertices)
if verts.size == 0:
raise ValueError("empty vertices")
xy = verts[:, :2]
minxy = xy.min(axis=0)
maxxy = xy.max(axis=0)
span = maxxy - minxy
if (span == 0).any():
scale = 1.0
else:
scale = (img_size - 32) / max(span.max(), 1e-6)
coords = ((xy - minxy) * scale) + 16
coords = coords.tolist()
# Draw faces as filled polygons (simple shading)
for f in faces:
try:
pts = [tuple(coords[int(i)]) for i in f if int(i) < len(coords)]
if len(pts) >= 3:
draw.polygon(pts, fill=(200, 200, 200), outline=(0, 0, 0))
except Exception:
continue
except Exception:
# If mesh can't be rendered, leave a blank preview with label
draw.text((16, img_size//2 - 10), "Preview unavailable", fill=(0,0,0))
img.save(output_path)
logger.info(f"Generated model preview at {output_path}")
# (single generate_model_preview implementation retained above)