Spaces:
Running
Running
| import os | |
| import sys | |
| import tempfile | |
| from typing import Dict, Any, Optional, Callable | |
| import numpy as np | |
| import logging | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Add SAM 3D Body repository to Python path | |
| sys.path.append('/app/sam-3d-body') | |
| # Ensure helper directory exists | |
| os.makedirs(os.path.join(os.path.dirname(__file__), 'helper'), exist_ok=True) | |
| # Global model instance | |
| model = None | |
| estimator = None | |
| def initialize_model(): | |
| """ | |
| Initialize the SAM 3D Body model using a helper script in /app/helper/model_loader.py | |
| """ | |
| global model, estimator | |
| try: | |
| logger.info("Initializing SAM 3D Body model...") | |
| # Dev mode: skip heavy model loading and use a lightweight stub estimator | |
| if os.environ.get('SKIP_MODEL_LOAD', '') in ('1', 'true', 'yes'): | |
| logger.info("SKIP_MODEL_LOAD enabled — using stub estimator for local testing") | |
| class StubEstimator: | |
| def process_one_image(self, image_path, options=None): | |
| # Return a minimal cube mesh so downstream code can run | |
| verts = [[-0.5, -0.5, -0.5], [0.5, -0.5, -0.5], [0.5, 0.5, -0.5], [-0.5, 0.5, -0.5], | |
| [-0.5, -0.5, 0.5], [0.5, -0.5, 0.5], [0.5, 0.5, 0.5], [-0.5, 0.5, 0.5]] | |
| faces = [[0,1,2],[0,2,3],[4,5,6],[4,6,7],[0,1,5],[0,5,4],[2,3,7],[2,7,6],[1,2,6],[1,6,5],[0,3,7],[0,7,4]] | |
| return {"mesh": {"vertices": verts, "faces": np.array(faces, dtype=np.int64)}} | |
| estimator = StubEstimator() | |
| model = None | |
| logger.info("Stub estimator ready") | |
| return | |
| # Run the helper script to load the model in a separate process | |
| import subprocess | |
| helper_path = os.path.join(os.path.dirname(__file__), 'helper', 'model_loader.py') | |
| if not os.path.exists(helper_path): | |
| logger.error("Helper script does not exist!") | |
| raise RuntimeError(f"Helper script not found at {helper_path}") | |
| logger.info(f"Running helper script: {helper_path}") | |
| try: | |
| result = subprocess.run([sys.executable, helper_path], capture_output=True, text=True) | |
| logger.info(f"Helper script stdout: {result.stdout}") | |
| logger.info(f"Helper script stderr: {result.stderr}") | |
| logger.info(f"Helper script return code: {result.returncode}") | |
| except Exception as e: | |
| logger.warning(f"Failed to run helper script: {e}; falling back to stub estimator") | |
| result = None | |
| # If helper didn't create a model, or helper failed, fall back to stub instead of crashing | |
| model_pkl_paths = [ | |
| '/app/model.pkl', | |
| os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', 'model.pkl')) | |
| ] | |
| found_model = None | |
| for p in model_pkl_paths: | |
| if os.path.exists(p): | |
| found_model = p | |
| break | |
| if found_model: | |
| try: | |
| import pickle | |
| with open(found_model, 'rb') as f: | |
| model, model_cfg = pickle.load(f) | |
| from sam_3d_body import SAM3DBodyEstimator | |
| estimator = SAM3DBodyEstimator(sam_3d_body_model=model, model_cfg=model_cfg) | |
| logger.info(f"Estimator initialized from {found_model}") | |
| return | |
| except Exception as e: | |
| logger.error(f"Failed to load model from {found_model}: {e}; falling back to stub") | |
| # Final fallback: stub estimator so app works without helper/model.pkl | |
| logger.info("Using stub estimator (no helper/model.pkl available)") | |
| class StubEstimator: | |
| def process_one_image(self, image_path, options=None): | |
| verts = [[-0.5, -0.5, -0.5], [0.5, -0.5, -0.5], [0.5, 0.5, -0.5], [-0.5, 0.5, -0.5], | |
| [-0.5, -0.5, 0.5], [0.5, -0.5, 0.5], [0.5, 0.5, 0.5], [-0.5, 0.5, 0.5]] | |
| faces = [[0,1,2],[0,2,3],[4,5,6],[4,6,7],[0,1,5],[0,5,4],[2,3,7],[2,7,6],[1,2,6],[1,6,5],[0,3,7],[0,7,4]] | |
| return {"mesh": {"vertices": verts, "faces": np.array(faces, dtype=np.int64)}} | |
| estimator = StubEstimator() | |
| model = None | |
| logger.info("Stub estimator ready") | |
| except Exception as e: | |
| import traceback | |
| logger.error(f"Error initializing SAM 3D Body model: {str(e)}") | |
| logger.error(traceback.format_exc()) | |
| raise | |
| def process_image( | |
| image_path: str, | |
| player_name: str = "player", | |
| use_keypoints: bool = True, | |
| use_mask: bool = True, | |
| job_progress_callback: Callable[[float], None] = None | |
| ) -> Dict[str, Any]: | |
| global model, estimator | |
| try: | |
| if model is None or estimator is None: | |
| initialize_model() | |
| if job_progress_callback: | |
| job_progress_callback(0.1) | |
| process_options = {"use_keypoints": use_keypoints, "use_mask": use_mask} | |
| logger.info(f"Processing image with SAM 3D Body: {image_path}") | |
| output = estimator.process_one_image(image_path=image_path, options=process_options) | |
| if job_progress_callback: | |
| job_progress_callback(0.7) | |
| job_id = os.path.basename(image_path).split('_')[0] | |
| output_dir = f"outputs/{job_id}" | |
| os.makedirs(output_dir, exist_ok=True) | |
| model_path = f"{output_dir}/{player_name}.glb" | |
| export_mesh_as_glb(output["mesh"], model_path) | |
| preview_path = f"{output_dir}/{player_name}_preview.jpg" | |
| generate_model_preview(output["mesh"], preview_path) | |
| if job_progress_callback: | |
| job_progress_callback(1.0) | |
| return {"model_path": model_path, "preview_path": preview_path, "status": "completed"} | |
| except Exception as e: | |
| logger.error(f"Error processing image: {str(e)}") | |
| raise RuntimeError(f"Failed to process image: {str(e)}") | |
| def export_mesh_as_glb(mesh, output_path: str): | |
| import trimesh | |
| vertices = mesh.get("vertices", []) | |
| faces = mesh.get("faces", []) | |
| if hasattr(faces, 'reshape'): | |
| faces = faces.reshape(-1, 3) | |
| mesh_trimesh = trimesh.Trimesh( | |
| vertices=vertices, | |
| faces=faces, | |
| vertex_normals=mesh.get("normals", None), | |
| visual=trimesh.visual.TextureVisuals(uv=mesh.get("uvs", None)) if mesh.get("uvs") is not None else None | |
| ) | |
| mesh_trimesh.export(output_path, file_type="glb") | |
| logger.info(f"Exported mesh to {output_path}") | |
| def generate_model_preview(mesh, output_path: str): | |
| # Create a simple, safe CPU-only preview image using Pillow. | |
| # This avoids using pyrender/GL which can crash on macOS when called from background threads. | |
| try: | |
| from PIL import Image, ImageDraw | |
| except Exception: | |
| logger.warning("Pillow not installed; creating a blank preview") | |
| # Fallback: write a blank file using numpy if Pillow missing | |
| import numpy as _np | |
| _img = _np.ones((512, 512, 3), dtype=_np.uint8) * 255 | |
| try: | |
| import imageio | |
| imageio.imwrite(output_path, _img) | |
| except Exception: | |
| with open(output_path, 'wb') as f: | |
| f.write(b'') | |
| logger.info(f"Generated placeholder preview at {output_path}") | |
| return | |
| import numpy as _np | |
| vertices = mesh.get("vertices", []) | |
| faces = mesh.get("faces", []) | |
| if hasattr(faces, 'reshape'): | |
| try: | |
| faces = faces.reshape(-1, 3) | |
| except Exception: | |
| faces = faces | |
| img_size = 512 | |
| img = Image.new('RGB', (img_size, img_size), (255, 255, 255)) | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| verts = _np.array(vertices) | |
| if verts.size == 0: | |
| raise ValueError("empty vertices") | |
| xy = verts[:, :2] | |
| minxy = xy.min(axis=0) | |
| maxxy = xy.max(axis=0) | |
| span = maxxy - minxy | |
| if (span == 0).any(): | |
| scale = 1.0 | |
| else: | |
| scale = (img_size - 32) / max(span.max(), 1e-6) | |
| coords = ((xy - minxy) * scale) + 16 | |
| coords = coords.tolist() | |
| # Draw faces as filled polygons (simple shading) | |
| for f in faces: | |
| try: | |
| pts = [tuple(coords[int(i)]) for i in f if int(i) < len(coords)] | |
| if len(pts) >= 3: | |
| draw.polygon(pts, fill=(200, 200, 200), outline=(0, 0, 0)) | |
| except Exception: | |
| continue | |
| except Exception: | |
| # If mesh can't be rendered, leave a blank preview with label | |
| draw.text((16, img_size//2 - 10), "Preview unavailable", fill=(0,0,0)) | |
| img.save(output_path) | |
| logger.info(f"Generated model preview at {output_path}") | |
| # (single generate_model_preview implementation retained above) |