Spaces:
Running
Running
| """RMScript App - Web IDE for Reachy Mini scripting.""" | |
| import asyncio | |
| import logging | |
| import threading | |
| import time | |
| from typing import Any | |
| import numpy as np | |
| from pydantic import BaseModel | |
| from reachy_mini import ReachyMini, ReachyMiniApp | |
| from rmscript import compile_script | |
| from rmscript.ir import IRAction, IRWaitAction, IRPictureAction, IRPlaySoundAction | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class ScriptInput(BaseModel): | |
| source: str | |
| class IRActionInput(BaseModel): | |
| type: str | |
| duration: float = 0.0 | |
| head_pose: list[list[float]] | None = None | |
| antennas: list[float] | None = None | |
| body_yaw: float | None = None | |
| sound_name: str | None = None | |
| blocking: bool = False | |
| loop: bool = False | |
| class ExecuteInput(BaseModel): | |
| ir: list[IRActionInput] | |
| class VerifyResponse(BaseModel): | |
| success: bool | |
| errors: list[dict[str, Any]] | |
| warnings: list[dict[str, Any]] | |
| name: str = "" | |
| description: str = "" | |
| class IRActionOutput(BaseModel): | |
| type: str | |
| duration: float = 0.0 | |
| head_pose: list[list[float]] | None = None | |
| antennas: list[float] | None = None | |
| body_yaw: float | None = None | |
| sound_name: str | None = None | |
| blocking: bool = False | |
| loop: bool = False | |
| source_line: int = 0 | |
| class CompileResponse(BaseModel): | |
| success: bool | |
| errors: list[dict[str, Any]] | |
| warnings: list[dict[str, Any]] | |
| name: str = "" | |
| description: str = "" | |
| ir: list[IRActionOutput] | |
| class ExecuteResponse(BaseModel): | |
| success: bool | |
| message: str = "" | |
| actions_executed: int = 0 | |
| aborted: bool = False | |
| class AbortResponse(BaseModel): | |
| success: bool | |
| message: str = "" | |
| class RmscriptApp(ReachyMiniApp): | |
| """Web IDE for writing and executing rmscript on Reachy Mini.""" | |
| custom_app_url: str | None = "http://0.0.0.0:8042" | |
| def run(self, reachy_mini: ReachyMini, stop_event: threading.Event) -> None: | |
| # Store reachy_mini for use in API routes | |
| self._reachy_mini = reachy_mini | |
| self._executing = False | |
| self._abort_requested = False | |
| # Register API routes on settings_app | |
| self._register_routes() | |
| logger.info("RMScript App started - settings UI at http://localhost:8042") | |
| # Main loop - just wait for stop event | |
| while not stop_event.is_set(): | |
| time.sleep(0.1) | |
| logger.info("RMScript App stopped") | |
| def _register_routes(self) -> None: | |
| """Register FastAPI routes for script compilation and execution.""" | |
| async def verify_script_route(input: ScriptInput) -> VerifyResponse: | |
| """Verify rmscript syntax without generating IR.""" | |
| logger.info(f"Verifying script ({len(input.source)} chars)") | |
| try: | |
| result = compile_script(input.source) | |
| errors = [ | |
| {"line": e.line, "column": e.column, "message": e.message, "severity": e.severity} | |
| for e in result.errors | |
| ] | |
| warnings = [ | |
| {"line": w.line, "column": w.column, "message": w.message, "severity": w.severity} | |
| for w in result.warnings | |
| ] | |
| return VerifyResponse( | |
| success=result.success, | |
| errors=errors, | |
| warnings=warnings, | |
| name=result.name, | |
| description=result.description, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Verification error: {e}") | |
| return VerifyResponse(success=False, errors=[{"line": 0, "column": 0, "message": str(e), "severity": "error"}], warnings=[]) | |
| async def compile_script_route(input: ScriptInput) -> CompileResponse: | |
| """Compile rmscript to intermediate representation.""" | |
| logger.info(f"Compiling script ({len(input.source)} chars)") | |
| try: | |
| result = compile_script(input.source) | |
| errors = [ | |
| {"line": e.line, "column": e.column, "message": e.message, "severity": e.severity} | |
| for e in result.errors | |
| ] | |
| warnings = [ | |
| {"line": w.line, "column": w.column, "message": w.message, "severity": w.severity} | |
| for w in result.warnings | |
| ] | |
| ir_json = [] | |
| for action in result.ir: | |
| if isinstance(action, IRAction): | |
| ir_json.append(IRActionOutput( | |
| type="action", | |
| duration=action.duration, | |
| head_pose=action.head_pose.tolist() if action.head_pose is not None else None, | |
| antennas=list(action.antennas) if action.antennas is not None else None, | |
| body_yaw=action.body_yaw, | |
| source_line=action.source_line, | |
| )) | |
| elif isinstance(action, IRWaitAction): | |
| ir_json.append(IRActionOutput(type="wait", duration=action.duration, source_line=action.source_line)) | |
| elif isinstance(action, IRPictureAction): | |
| ir_json.append(IRActionOutput(type="picture", source_line=action.source_line)) | |
| elif isinstance(action, IRPlaySoundAction): | |
| ir_json.append(IRActionOutput( | |
| type="sound", | |
| duration=action.duration or 0.0, | |
| sound_name=action.sound_name, | |
| blocking=action.blocking, | |
| loop=action.loop, | |
| source_line=action.source_line, | |
| )) | |
| logger.info(f"Compiled {len(ir_json)} IR actions") | |
| return CompileResponse( | |
| success=result.success, | |
| errors=errors, | |
| warnings=warnings, | |
| name=result.name, | |
| description=result.description, | |
| ir=ir_json, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Compilation error: {e}") | |
| return CompileResponse(success=False, errors=[{"line": 0, "column": 0, "message": str(e), "severity": "error"}], warnings=[], ir=[]) | |
| async def abort_execution_route() -> AbortResponse: | |
| """Abort the currently executing script.""" | |
| if not self._executing: | |
| return AbortResponse(success=False, message="No script is currently executing") | |
| self._abort_requested = True | |
| logger.info("Abort requested") | |
| return AbortResponse(success=True, message="Abort signal sent") | |
| async def execute_script_route(input: ExecuteInput) -> ExecuteResponse: | |
| """Execute compiled IR on the robot.""" | |
| if self._executing: | |
| return ExecuteResponse(success=False, message="Already executing a script") | |
| # Run execution in thread pool to not block event loop (allows abort requests) | |
| loop = asyncio.get_event_loop() | |
| return await loop.run_in_executor(None, self._execute_ir, input.ir) | |
| def _execute_ir(self, ir: list[IRActionInput]) -> ExecuteResponse: | |
| """Execute IR actions (runs in thread pool).""" | |
| self._executing = True | |
| self._abort_requested = False | |
| actions_executed = 0 | |
| aborted = False | |
| try: | |
| logger.info(f"Executing {len(ir)} IR actions") | |
| for action in ir: | |
| # Check for abort request before each action | |
| if self._abort_requested: | |
| logger.info(f"Execution aborted after {actions_executed} actions") | |
| aborted = True | |
| break | |
| if action.type == "action": | |
| head_pose = np.array(action.head_pose) if action.head_pose else None | |
| antennas = tuple(action.antennas) if action.antennas else None | |
| self._reachy_mini.goto_target( | |
| head=head_pose, | |
| antennas=antennas, | |
| body_yaw=action.body_yaw, | |
| duration=action.duration, | |
| ) | |
| # goto_target() is blocking - it waits for task completion internally | |
| elif action.type == "wait": | |
| self._interruptible_sleep(action.duration) | |
| elif action.type == "picture": | |
| # Take picture (could return to frontend in future) | |
| self._reachy_mini.media.get_frame() | |
| logger.info("Picture captured") | |
| elif action.type == "sound": | |
| if action.sound_name: | |
| self._reachy_mini.media.play_sound(action.sound_name) | |
| if action.blocking and action.duration > 0: | |
| self._interruptible_sleep(action.duration) | |
| actions_executed += 1 | |
| if aborted: | |
| return ExecuteResponse(success=True, message="Execution aborted", actions_executed=actions_executed, aborted=True) | |
| logger.info(f"Executed {actions_executed} actions") | |
| return ExecuteResponse(success=True, message="Script executed successfully", actions_executed=actions_executed) | |
| except Exception as e: | |
| logger.error(f"Execution error: {e}") | |
| return ExecuteResponse(success=False, message=str(e), actions_executed=actions_executed) | |
| finally: | |
| self._executing = False | |
| self._abort_requested = False | |
| def _interruptible_sleep(self, duration: float) -> None: | |
| """Sleep that can be interrupted by abort request.""" | |
| interval = 0.05 # Check every 50ms | |
| elapsed = 0.0 | |
| while elapsed < duration and not self._abort_requested: | |
| sleep_time = min(interval, duration - elapsed) | |
| time.sleep(sleep_time) | |
| elapsed += sleep_time | |
| if __name__ == "__main__": | |
| app = RmscriptApp() | |
| app.wrapped_run() | |