use crate::config::EngineConfig; use crate::host_state::HostState; use crate::http_service::HttpHostService; use crate::registry::PluginRegistry; use bex_db::BexDb; use bex_types::plugin_info::PluginInfo; use bex_types::{BexError, Manifest}; use parking_lot::RwLock; use std::path::Path; use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; use std::sync::Arc; use wasmtime::component::{Component, Linker}; use wasmtime::{Engine as WasmtimeEngine, ResourceLimiter, Store}; // ── Bindgen: generate typed bindings from WIT ────────────────────── wasmtime::component::bindgen!({ path: "../../wit", world: "plugin", }); // Re-export bindgen types for convenience pub use crate::engine::bex::plugin::common::*; pub use crate::engine::bex::plugin::http; pub use crate::engine::bex::plugin::kv; pub use crate::engine::bex::plugin::secrets; pub use crate::engine::bex::plugin::log; pub use crate::engine::bex::plugin::clock; pub use crate::engine::bex::plugin::rng; pub use crate::engine::bex::plugin::js; // ── Engine State Machine ──────────────────────────────────────────── const STATE_NOT_READY: u8 = 0; const STATE_READY: u8 = 1; const STATE_DRAINING: u8 = 2; const STATE_STOPPED: u8 = 3; // ── Compile Cache ─────────────────────────────────────────────────── /// HMAC-authenticated compile cache. Stores compiled `.cwasm` files /// on disk with an HMAC-SHA256 tag to detect tampering or stale cache. struct CompileCache { cache_dir: std::path::PathBuf, key_material: Vec, } impl CompileCache { fn new(cache_dir: std::path::PathBuf) -> Result { std::fs::create_dir_all(&cache_dir) .map_err(|e| BexError::Internal(format!("cache dir: {e}")))?; // Derive HMAC key from a stable identifier let key_material = b"bex-engine-compile-cache-v2-key".to_vec(); Ok(Self { cache_dir, key_material, }) } /// Compute the cache file path for a given WASM hash fn cache_path(&self, wasm_hash: &[u8]) -> std::path::PathBuf { let hex_hash = hex::encode(wasm_hash); self.cache_dir.join(format!("{}.cwasm", hex_hash)) } /// Compute HMAC tag for data fn compute_tag(&self, data: &[u8]) -> Vec { use hmac::{Hmac, Mac}; type HmacSha256 = Hmac; let mut mac = HmacSha256::new_from_slice(&self.key_material) .expect("HMAC can take key of any size"); mac.update(data); mac.finalize().into_bytes().to_vec() } /// Try to load a cached compiled component. /// Returns None if cache miss or if HMAC validation fails. fn load(&self, wasm_bytes: &[u8]) -> Option> { let wasm_hash = sha2_hash(wasm_bytes); let path = self.cache_path(&wasm_hash); let cached = std::fs::read(&path).ok()?; // Format: [32 bytes HMAC tag][rest is serialized component] if cached.len() < 32 { return None; } let (stored_tag, component_data) = cached.split_at(32); let expected_tag = self.compute_tag(component_data); // Constant-time comparison to prevent timing attacks if stored_tag.len() != expected_tag.len() { return None; } let mut diff = 0u8; for (a, b) in stored_tag.iter().zip(expected_tag.iter()) { diff |= a ^ b; } if diff != 0 { tracing::warn!("Compile cache HMAC mismatch, ignoring cached file"); return None; } Some(component_data.to_vec()) } /// Store a compiled component in the cache. fn store(&self, wasm_bytes: &[u8], component_data: &[u8]) { let wasm_hash = sha2_hash(wasm_bytes); let path = self.cache_path(&wasm_hash); let tag = self.compute_tag(component_data); let mut out = Vec::with_capacity(32 + component_data.len()); out.extend_from_slice(&tag); out.extend_from_slice(component_data); if let Err(e) = std::fs::write(&path, &out) { tracing::warn!("Failed to write compile cache: {}", e); } } } fn sha2_hash(data: &[u8]) -> Vec { use sha2::{Digest, Sha256}; let mut hasher = Sha256::new(); hasher.update(data); hasher.finalize().to_vec() } /// Compile a WASM component, using the compile cache if available. fn compile_or_cache( wasmtime: &WasmtimeEngine, wasm_bytes: &[u8], cache: &CompileCache, ) -> Result { // Try cache first if let Some(cached_data) = cache.load(wasm_bytes) { // Deserialize from cached compiled module (unsafe is OK here because // we trust our own cache files which are HMAC-authenticated) if let Ok(component) = unsafe { Component::deserialize(wasmtime, &cached_data) } { tracing::debug!("Compile cache hit"); return Ok(component); } // If deserialization fails, fall through to recompile tracing::warn!("Compile cache deserialize failed, recompiling"); } // Compile from source let component = Component::new(wasmtime, wasm_bytes) .map_err(|e| BexError::Internal(format!("compile wasm: {e}")))?; // Store in cache for future use if let Ok(serialized) = component.serialize() { cache.store(wasm_bytes, &serialized); } Ok(component) } // ── Epoch Ticker ──────────────────────────────────────────────────── /// Background thread that increments the Wasmtime epoch at regular intervals. /// This enables wall-clock timeout enforcement via `store.set_epoch_deadline()`. struct EpochTicker { _handle: Option>, shutdown: Arc, } impl EpochTicker { fn start(engine: WasmtimeEngine, interval_ms: u64) -> Self { let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false)); let shutdown_clone = shutdown.clone(); let eng = engine.clone(); let handle = std::thread::Builder::new() .name("bex-epoch-ticker".to_string()) .spawn(move || { let interval = std::time::Duration::from_millis(interval_ms); while !shutdown_clone.load(Ordering::Relaxed) { std::thread::sleep(interval); eng.increment_epoch(); } }) .expect("failed to spawn epoch ticker thread"); Self { _handle: Some(handle), shutdown, } } } impl Drop for EpochTicker { fn drop(&mut self) { self.shutdown.store(true, Ordering::Relaxed); } } // ── Engine ───────────────────────────────────────────────────────── struct EngineInner { /// Kept alive via Arc — cloned into each HostState for block_on() access. /// Field is read via `self.inner.runtime.clone()` in instantiate(). #[allow(dead_code)] // Suppress false positive: read through Arc clone in HostState::new() runtime: Arc, wasmtime: WasmtimeEngine, linker: Linker, db: Arc, http: Arc, js_pool: Arc, config: Arc, registry: RwLock, compile_cache: CompileCache, state: AtomicU8, active_calls: AtomicUsize, start_time: std::time::Instant, _epoch_ticker: EpochTicker, _lock_file: std::fs::File, } #[derive(Clone)] pub struct Engine { inner: Arc, } impl Engine { pub fn new(config: EngineConfig) -> Result { // Create data directory std::fs::create_dir_all(&config.data_dir) .map_err(|e| BexError::Internal(format!("data dir: {e}")))?; // Acquire advisory file lock on data directory (fixes Issue #20) let lock_path = config.data_dir.join(".bex.lock"); let lock_file = std::fs::OpenOptions::new() .create(true) .read(true) .write(true) .open(&lock_path) .map_err(|e| BexError::Internal(format!("lock file: {e}")))?; use fs4::fs_std::FileExt; lock_file .try_lock_exclusive() .map_err(|e| { BexError::Internal(format!( "Another BEX engine instance is using this data directory: {e}" )) })?; // Configure Wasmtime with epoch interruption (fixes Problem #3) let mut wt_config = wasmtime::Config::new(); wt_config.wasm_component_model(true); wt_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); wt_config.consume_fuel(true); wt_config.epoch_interruption(true); wt_config.max_wasm_stack(4 * 1024 * 1024); let wasmtime = WasmtimeEngine::new(&wt_config) .map_err(|e| BexError::Internal(format!("wasmtime init: {e}")))?; // Create ONE shared tokio runtime (fixes Problem #1) let runtime = tokio::runtime::Builder::new_multi_thread() .worker_threads(4) .enable_all() .build() .map_err(|e| BexError::Internal(format!("tokio runtime: {e}")))?; let runtime = Arc::new(runtime); // Open database let db = Arc::new( BexDb::open(&config.data_dir).map_err(|e| BexError::Storage(e.to_string()))?, ); // Create HTTP service with proper pool config (fixes Issues #16, #17) let http = Arc::new(HttpHostService::new( &config.user_agent, config.http_timeout_ms, config.http_pool_idle_timeout_ms, config.http_pool_max_idle_per_host, )); // Build linker ONCE (fixes Problem #2) let linker = Self::build_linker(&wasmtime)?; // Start epoch ticker thread let epoch_ticker = EpochTicker::start(wasmtime.clone(), config.epoch_interval_ms); // Compile cache directory let cache_dir = config.data_dir.join("cache/wasm"); let compile_cache = CompileCache::new(cache_dir)?; // Create JS worker pool (QuickJS integration) // Wire JsPoolConfig from EngineConfig so users can control JS pool settings let js_pool_config = bex_js::JsPoolConfig { initial_workers: config.js_initial_workers, max_workers: config.js_max_workers, memory_limit_bytes: config.js_memory_limit_mb as usize * 1024 * 1024, default_timeout_ms: config.js_timeout_ms, context_idle_ttl_secs: config.js_context_idle_ttl_secs, worker_idle_ttl_secs: config.js_worker_idle_ttl_secs, max_stack_bytes: config.js_max_stack_bytes, }; let js_pool = Arc::new( bex_js::JsPool::new(js_pool_config) .map_err(|e| BexError::Internal(format!("JS pool: {e}")))?, ); // Plugins directory for on-disk WASM storage (fixes Problem #4) let plugins_dir = config.data_dir.join("plugins/installed"); std::fs::create_dir_all(&plugins_dir) .map_err(|e| BexError::Internal(format!("plugins dir: {e}")))?; let config_arc = Arc::new(config); let inner = Arc::new(EngineInner { runtime, wasmtime, linker, db, http, js_pool, config: config_arc, registry: RwLock::new(PluginRegistry::new()), compile_cache, state: AtomicU8::new(STATE_NOT_READY), active_calls: AtomicUsize::new(0), start_time: std::time::Instant::now(), _epoch_ticker: epoch_ticker, _lock_file: lock_file, }); let engine = Self { inner }; // Reload previously installed plugins from database engine.reload_from_db()?; // Mark engine as ready engine.inner.state.store(STATE_READY, Ordering::Release); Ok(engine) } /// Build the linker once. This includes locked-down WASI and all BEX host interfaces. fn build_linker(wasmtime: &WasmtimeEngine) -> Result, BexError> { let mut linker = Linker::::new(wasmtime); // Add WASI support with locked-down context (fixes Problem #6) // We provide the WASI imports but HostState creates a WasiCtx with // no inherited handles, no filesystem, no env, no sockets. wasmtime_wasi::add_to_linker_sync(&mut linker) .map_err(|e| BexError::Internal(format!("wasi linker: {e}")))?; // Add BEX host interfaces Plugin::add_to_linker(&mut linker, |state: &mut HostState| state) .map_err(|e| BexError::Internal(format!("linker setup: {e}")))?; Ok(linker) } /// Check engine state before accepting calls fn check_ready(&self) -> Result<(), BexError> { match self.inner.state.load(Ordering::Acquire) { STATE_READY => Ok(()), STATE_NOT_READY => Err(BexError::NotReady), STATE_DRAINING => Err(BexError::Internal("engine is draining".into())), STATE_STOPPED => Err(BexError::Internal("engine is stopped".into())), _ => Err(BexError::Internal("invalid engine state".into())), } } /// Reload all previously installed plugins from the database. fn reload_from_db(&self) -> Result<(), BexError> { let plugins = self .inner .db .list_plugins() .map_err(|e| BexError::Storage(e.to_string()))?; for info in plugins { // Get WASM blob let wasm_bytes = match self.inner.db.get_wasm_blob(&info.id) { Ok(Some(bytes)) => bytes, _ => continue, }; // Get manifest let manifest = match self.inner.db.get_manifest(&info.id) { Ok(Some(yaml)) => match serde_yaml::from_str::(&yaml) { Ok(m) => m, _ => continue, }, _ => continue, }; // Compile with cache (fixes Problem #5) let component = match compile_or_cache( &self.inner.wasmtime, &wasm_bytes, &self.inner.compile_cache, ) { Ok(c) => Arc::new(c), _ => continue, }; // Store WASM on disk for future recompilation (fixes Problem #4) let wasm_path = self.inner.config.data_dir.join(format!( "plugins/installed/{}/plugin.wasm", info.id.replace('.', "_") )); if let Some(parent) = wasm_path.parent() { let _ = std::fs::create_dir_all(parent); } let _ = std::fs::write(&wasm_path, &wasm_bytes); let record = { let mut r = crate::registry::PluginRecord::new( manifest, component, self.inner.config.circuit_breaker_threshold, self.inner.config.circuit_breaker_cooldown_ms, ); r.enabled = info.enabled; r }; self.inner.registry.write().insert(Arc::new(record)); } Ok(()) } // ── Plugin Management ─────────────────────────────────────────── pub fn install_plugin(&self, path: &Path) -> Result { self.check_ready()?; let data = std::fs::read(path).map_err(|e| BexError::Internal(format!("read: {e}")))?; self.install_bytes(&data) } /// Install a plugin from raw bytes (fixes Issue #13). pub fn install_bytes(&self, data: &[u8]) -> Result { self.check_ready()?; let package = bex_pkg::unpack(data)?; package .manifest .validate(&self.inner.config.host_version)?; // Compile with cache let component = compile_or_cache( &self.inner.wasmtime, &package.wasm, &self.inner.compile_cache, )?; let component = Arc::new(component); let id = package.manifest.id.clone(); // Store WASM on disk (fixes Problem #4: don't keep WASM in memory) let wasm_path = self.inner.config.data_dir.join(format!( "plugins/installed/{}/plugin.wasm", id.replace('.', "_") )); if let Some(parent) = wasm_path.parent() { let _ = std::fs::create_dir_all(parent); } let _ = std::fs::write(&wasm_path, &package.wasm); // Persist WASM blob and manifest to database self.inner .db .save_wasm_blob(&id, &package.wasm) .map_err(|e| BexError::Storage(e.to_string()))?; let manifest_yaml = serde_yaml::to_string(&package.manifest) .map_err(|e| BexError::Internal(e.to_string()))?; self.inner .db .save_manifest(&id, &manifest_yaml) .map_err(|e| BexError::Storage(e.to_string()))?; let record = Arc::new(crate::registry::PluginRecord::new( package.manifest, component, self.inner.config.circuit_breaker_threshold, self.inner.config.circuit_breaker_cooldown_ms, )); let info = record.to_plugin_info(); self.inner .db .save_plugin_info(&info) .map_err(|e| BexError::Storage(e.to_string()))?; self.inner.registry.write().insert(record); Ok(info) } /// Install a plugin directly from WASM bytes and a manifest (no package format). pub fn install_plugin_raw( &self, manifest: Manifest, wasm_bytes: Vec, ) -> Result { self.check_ready()?; manifest.validate(&self.inner.config.host_version)?; let component = compile_or_cache( &self.inner.wasmtime, &wasm_bytes, &self.inner.compile_cache, )?; let component = Arc::new(component); let id = manifest.id.clone(); // Store WASM on disk let wasm_path = self.inner.config.data_dir.join(format!( "plugins/installed/{}/plugin.wasm", id.replace('.', "_") )); if let Some(parent) = wasm_path.parent() { let _ = std::fs::create_dir_all(parent); } let _ = std::fs::write(&wasm_path, &wasm_bytes); // Persist self.inner .db .save_wasm_blob(&id, &wasm_bytes) .map_err(|e| BexError::Storage(e.to_string()))?; let manifest_yaml = serde_yaml::to_string(&manifest) .map_err(|e| BexError::Internal(e.to_string()))?; self.inner .db .save_manifest(&id, &manifest_yaml) .map_err(|e| BexError::Storage(e.to_string()))?; let record = Arc::new(crate::registry::PluginRecord::new( manifest, component, self.inner.config.circuit_breaker_threshold, self.inner.config.circuit_breaker_cooldown_ms, )); let info = record.to_plugin_info(); self.inner .db .save_plugin_info(&info) .map_err(|e| BexError::Storage(e.to_string()))?; self.inner.registry.write().insert(record); Ok(info) } pub fn uninstall_plugin(&self, id: &str) -> Result<(), BexError> { self.check_ready()?; // Wait for active calls to drain (fixes Issue #9) let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); while self.inner.active_calls.load(Ordering::Acquire) > 0 { if std::time::Instant::now() > deadline { return Err(BexError::Internal( "timeout waiting for active calls to drain".into(), )); } std::thread::sleep(std::time::Duration::from_millis(50)); } // Remove WASM from disk let wasm_path = self.inner.config.data_dir.join(format!( "plugins/installed/{}/plugin.wasm", id.replace('.', "_") )); let _ = std::fs::remove_file(&wasm_path); // Try to remove the directory too if empty if let Some(parent) = wasm_path.parent() { let _ = std::fs::remove_dir(parent); } // Evict plugin's JS context from the pool self.inner.js_pool.evict_plugin(id); self.inner.registry.write().remove(id); self.inner .db .remove_plugin(id) .map_err(|e| BexError::Storage(e.to_string()))?; Ok(()) } pub fn list_plugins(&self) -> Vec { self.inner.registry.read().list() } pub fn get_plugin_info(&self, id: &str) -> Option { self.inner.registry.read().get(id).map(|r| r.to_plugin_info()) } pub fn enable_plugin(&self, id: &str) -> Result<(), BexError> { self.check_ready()?; let mut reg = self.inner.registry.write(); let old = reg .remove(id) .ok_or_else(|| BexError::PluginNotFound(id.into()))?; // Create new record with enabled=true let new_record = Arc::new(crate::registry::PluginRecord { id: old.id.clone(), manifest: old.manifest.clone(), capabilities: old.capabilities, enabled: true, installed_at: old.installed_at, component: old.component.clone(), health: crate::registry::PluginHealth::new( self.inner.config.circuit_breaker_threshold, self.inner.config.circuit_breaker_cooldown_ms, ), }); let info = new_record.to_plugin_info(); reg.insert(new_record); drop(reg); self.inner .db .save_plugin_info(&info) .map_err(|e| BexError::Storage(e.to_string()))?; Ok(()) } pub fn disable_plugin(&self, id: &str) -> Result<(), BexError> { self.check_ready()?; let mut reg = self.inner.registry.write(); let old = reg .remove(id) .ok_or_else(|| BexError::PluginNotFound(id.into()))?; let new_record = Arc::new(crate::registry::PluginRecord { id: old.id.clone(), manifest: old.manifest.clone(), capabilities: old.capabilities, enabled: false, installed_at: old.installed_at, component: old.component.clone(), health: crate::registry::PluginHealth::new( self.inner.config.circuit_breaker_threshold, self.inner.config.circuit_breaker_cooldown_ms, ), }); let info = new_record.to_plugin_info(); reg.insert(new_record); drop(reg); self.inner .db .save_plugin_info(&info) .map_err(|e| BexError::Storage(e.to_string()))?; Ok(()) } // ── Secret / API Key Management ──────────────────────────────── /// Set a secret/API key for a plugin. pub fn secret_set(&self, plugin_id: &str, key: &str, value: &str) -> Result<(), BexError> { self.inner .db .secret_set(plugin_id, key, value) .map_err(|e| BexError::Storage(e.to_string())) } /// Get a secret/API key for a plugin. pub fn secret_get(&self, plugin_id: &str, key: &str) -> Result, BexError> { self.inner .db .secret_get(plugin_id, key) .map_err(|e| BexError::Storage(e.to_string())) } /// Delete a secret/API key for a plugin. pub fn secret_remove(&self, plugin_id: &str, key: &str) -> Result { self.inner .db .secret_remove(plugin_id, key) .map_err(|e| BexError::Storage(e.to_string())) } /// List all secret keys for a plugin. pub fn secret_keys(&self, plugin_id: &str) -> Result, BexError> { self.inner .db .secret_keys(plugin_id, "") .map_err(|e| BexError::Storage(e.to_string())) } // ── Instantiation with epoch deadline ─────────────────────────── fn instantiate(&self, plugin_id: &str) -> Result<(Plugin, Store), BexError> { self.check_ready()?; let record = self .inner .registry .read() .get(plugin_id) .cloned() .ok_or_else(|| BexError::PluginNotFound(plugin_id.into()))?; if !record.enabled { return Err(BexError::PluginDisabled(plugin_id.into())); } // Circuit breaker check if !record.health.is_available() { return Err(BexError::PluginFault(format!( "plugin '{}' is circuit-broken (too many consecutive failures)", plugin_id ))); } let component = record.component.clone(); // Create store let mut store = Store::new( &self.inner.wasmtime, HostState::new( self.inner.http.clone(), self.inner.db.clone(), self.inner.js_pool.clone(), self.inner.runtime.clone(), plugin_id.to_string(), record.manifest.clone(), Arc::from(self.inner.config.user_agent.as_str()), self.inner.config.http_timeout_ms, self.inner.config.max_response_bytes, self.inner.config.memory_limit_mb, ), ); // Set fuel budget store .set_fuel(self.inner.config.fuel_per_call) .map_err(|_| BexError::FuelExhausted)?; // Set epoch deadline for wall-clock timeout (fixes Problem #3) let epoch_deadline = (self.inner.config.call_timeout_ms as u64 / self.inner.config.epoch_interval_ms.max(1)) .max(1); store.set_epoch_deadline(epoch_deadline); // Set resource limiter from HostState (fixes Problem #6 - memory limiter) store.limiter(|state: &mut HostState| -> &mut dyn ResourceLimiter { &mut state.limiter }); // Increment active calls counter (fixes Issue #9) self.inner.active_calls.fetch_add(1, Ordering::AcqRel); // Instantiate using the pre-built linker (fixes Problem #2) let instance = self .inner .linker .instantiate(&mut store, &component) .map_err(|e| classify_trap(e, plugin_id))?; let plugin = Plugin::new(&mut store, &instance) .map_err(|e| BexError::PluginFault(format!("bind: {e}")))?; Ok((plugin, store)) } /// Wrap a plugin call with panic catching, circuit breaker, and active call tracking. fn call_plugin_inner( &self, plugin_id: &str, call: impl FnOnce(Plugin, &mut Store) -> wasmtime::Result>, ) -> Result where R: Send + 'static, { let (instance, mut store) = self.instantiate(plugin_id)?; // Execute the call with panic catching (fixes stability issue) let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { call(instance, &mut store) })); // Decrement active calls self.inner.active_calls.fetch_sub(1, Ordering::AcqRel); // Get the plugin record for circuit breaker let record = self.inner.registry.read().get(plugin_id).cloned(); match result { Ok(Ok(Ok(val))) => { // Success — record for circuit breaker if let Some(r) = record { r.health.record_success(); } Ok(val) } Ok(Ok(Err(plugin_err))) => { // Plugin returned a structured error (fixes Issue #14) if let Some(r) = record { r.health.record_failure(); } Err(classify_plugin_error(plugin_err)) } Ok(Err(trap)) => { // Wasmtime trap — convert to BexError if let Some(r) = record { r.health.record_failure(); } Err(classify_trap(trap, plugin_id)) } Err(_panic_payload) => { // Rust panic inside Wasmtime — auto-disable plugin tracing::error!(plugin = %plugin_id, "WASM call panicked"); let _ = self.disable_plugin(plugin_id); Err(BexError::PluginFault( "panic — plugin auto-disabled".into(), )) } } } // ── Typed plugin API calls ──────────────────────────────────────── pub fn call_get_home( &self, plugin_id: &str, ctx: &RequestContext, ) -> Result, BexError> { self.call_plugin_inner(plugin_id, |instance, store| { instance.api().call_get_home(store, ctx) }) } pub fn call_search( &self, plugin_id: &str, ctx: &RequestContext, query: &str, filters: &SearchFilters, ) -> Result { self.call_plugin_inner(plugin_id, |instance, store| { instance.api().call_search(store, ctx, query, filters) }) } pub fn call_get_info( &self, plugin_id: &str, ctx: &RequestContext, id: &str, ) -> Result { self.call_plugin_inner(plugin_id, |instance, store| { instance.api().call_get_info(store, ctx, id) }) } pub fn call_get_servers( &self, plugin_id: &str, ctx: &RequestContext, id: &str, ) -> Result, BexError> { self.call_plugin_inner(plugin_id, |instance, store| { instance.api().call_get_servers(store, ctx, id) }) } pub fn call_resolve_stream( &self, plugin_id: &str, ctx: &RequestContext, server: &Server, ) -> Result { self.call_plugin_inner(plugin_id, |instance, store| { instance.api().call_resolve_stream(store, ctx, server) }) } pub fn call_search_subtitles( &self, plugin_id: &str, ctx: &RequestContext, query: &SubtitleQuery, ) -> Result, BexError> { self.call_plugin_inner(plugin_id, |instance, store| { instance.api().call_search_subtitles(store, ctx, query) }) } pub fn call_download_subtitle( &self, plugin_id: &str, ctx: &RequestContext, id: &str, ) -> Result { self.call_plugin_inner(plugin_id, |instance, store| { instance.api().call_download_subtitle(store, ctx, id) }) } pub fn call_get_articles( &self, plugin_id: &str, ctx: &RequestContext, ) -> Result, BexError> { self.call_plugin_inner(plugin_id, |instance, store| { instance.api().call_get_articles(store, ctx) }) } pub fn call_search_articles( &self, plugin_id: &str, ctx: &RequestContext, query: &str, ) -> Result, BexError> { self.call_plugin_inner(plugin_id, |instance, store| { instance.api().call_search_articles(store, ctx, query) }) } // ── Convenience: default context ──────────────────────────────── pub fn default_ctx() -> RequestContext { RequestContext { request_id: format!( "{:x}", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() ), locale: None, region: None, safe_mode: false, hints: vec![], } } // ── JSON-based calls for the Pure C ABI FFI layer ─────────────── pub fn call_get_home_json(&self, plugin_id: &str) -> Result { let result = self.call_get_home(plugin_id, &Self::default_ctx())?; serde_json::to_string(&convert::home_sections_to_json(&result)) .map_err(|e| BexError::Internal(e.to_string())) } pub fn call_search_json(&self, plugin_id: &str, query: &str) -> Result { let filters = SearchFilters { kind: None, page: PageCursor { token: None, limit: None, }, fast_match: false, }; let result = self.call_search(plugin_id, &Self::default_ctx(), query, &filters)?; serde_json::to_string(&convert::paged_result_to_json(&result)) .map_err(|e| BexError::Internal(e.to_string())) } pub fn call_get_info_json( &self, plugin_id: &str, media_id: &str, ) -> Result { let result = self.call_get_info(plugin_id, &Self::default_ctx(), media_id)?; serde_json::to_string(&convert::media_info_to_json(&result)) .map_err(|e| BexError::Internal(e.to_string())) } pub fn call_get_servers_json( &self, plugin_id: &str, id: &str, ) -> Result { let result = self.call_get_servers(plugin_id, &Self::default_ctx(), id)?; serde_json::to_string(&convert::servers_to_json(&result)) .map_err(|e| BexError::Internal(e.to_string())) } pub fn call_resolve_stream_json( &self, plugin_id: &str, server_json: &str, ) -> Result { let server_json_val: bex_types::stream::Server = serde_json::from_str(server_json) .map_err(|e| BexError::Internal(format!("parse server: {e}")))?; let server = convert::json_to_server(&server_json_val); let result = self.call_resolve_stream(plugin_id, &Self::default_ctx(), &server)?; serde_json::to_string(&convert::stream_source_to_json(&result)) .map_err(|e| BexError::Internal(e.to_string())) } // ── Stats and Shutdown ─────────────────────────────────────────── pub fn stats(&self) -> bex_types::engine_types::EngineStats { let reg = self.inner.registry.read(); let total = reg.list().len(); let enabled = reg.list().iter().filter(|p| p.enabled).count(); let active = self.inner.active_calls.load(Ordering::Acquire); bex_types::engine_types::EngineStats { uptime_ms: self.inner.start_time.elapsed().as_millis() as u64, total_plugins: total, enabled_plugins: enabled, active_calls: active, } } pub fn shutdown(&self) { tracing::info!("BEX Engine shutting down..."); // Transition to draining state (fixes Issue #8) self.inner.state.store(STATE_DRAINING, Ordering::Release); // Wait for active calls to drain (brief window for CLI responsiveness) let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); while self.inner.active_calls.load(Ordering::Acquire) > 0 { if std::time::Instant::now() > deadline { tracing::warn!( "Shutdown: {} active calls still running after 2s timeout", self.inner.active_calls.load(Ordering::Acquire) ); break; } std::thread::sleep(std::time::Duration::from_millis(50)); } // Mark as stopped self.inner.state.store(STATE_STOPPED, Ordering::Release); tracing::info!("BEX Engine shut down complete"); } } // ── Trap classification ───────────────────────────────────────────── /// Classify a Wasmtime trap into a structured BexError. fn classify_trap(error: wasmtime::Error, plugin_id: &str) -> BexError { let msg = error.to_string(); // Check for epoch interruption (timeout) if msg.contains("epoch") || msg.contains("timeout") { return BexError::Timeout { ms: 0 }; } // Check for fuel exhaustion if msg.contains("fuel") { return BexError::FuelExhausted; } // Check for out-of-memory if msg.contains("memory") && (msg.contains("grow") || msg.contains("limit")) { return BexError::PluginFault(format!("out of memory: {}", plugin_id)); } // Default: generic plugin fault BexError::PluginFault(msg) } /// Classify a plugin error from the WIT bindgen into a structured BexError. fn classify_plugin_error(err: PluginError) -> BexError { match err { PluginError::Network(msg) => BexError::Network(msg), PluginError::Parse(msg) => BexError::PluginError(msg), PluginError::NotFound => BexError::PluginError("not found".into()), PluginError::Unauthorized => BexError::PluginError("unauthorized".into()), PluginError::Forbidden => BexError::PluginError("forbidden".into()), PluginError::RateLimited(Some(secs)) => BexError::Timeout { ms: secs * 1000, }, PluginError::RateLimited(None) => BexError::Timeout { ms: 0 }, PluginError::Timeout => BexError::Timeout { ms: 0 }, PluginError::Cancelled => BexError::Cancelled, PluginError::Unsupported => { BexError::Unsupported("plugin does not support this operation".into()) } PluginError::InvalidInput(msg) => BexError::PluginError(msg), PluginError::Internal(msg) => BexError::PluginError(msg), } } // ── Conversion between bindgen types and JSON-serializable types ───── pub mod convert { use super::*; pub fn home_sections_to_json(sections: &[HomeSection]) -> Vec { sections .iter() .map(|s| serde_json::to_value(home_section_to_json(s)).unwrap_or_default()) .collect() } pub fn home_section_to_json(s: &HomeSection) -> bex_types::media::HomeSection { bex_types::media::HomeSection { id: s.id.clone(), title: s.title.clone(), subtitle: s.subtitle.clone(), items: s.items.iter().map(media_card_to_json).collect(), next_page: s.next_page.clone(), layout: format!("{:?}", s.layout).to_lowercase(), show_rank: s.show_rank, categories: s .categories .iter() .map(|c| bex_types::media::CategoryLink { id: c.id.clone(), title: c.title.clone(), subtitle: c.subtitle.clone(), image: c.image.as_ref().map(image_to_json), }) .collect(), extra: s .extra .iter() .map(|a| (a.key.clone(), a.value.clone())) .collect(), } } pub fn media_card_to_json(c: &MediaCard) -> bex_types::media::MediaCard { bex_types::media::MediaCard { id: c.id.clone(), title: c.title.clone(), kind: c.kind.map(|k| format!("{:?}", k).to_lowercase()), images: c.images.as_ref().map(image_set_to_json), original_title: c.original_title.clone(), tagline: c.tagline.clone(), year: c.year.clone(), score: c.score, genres: c.genres.clone(), status: c.status.map(|s| format!("{:?}", s).to_lowercase()), content_rating: c.content_rating.clone(), url: c.url.clone(), ids: c .ids .iter() .map(|id| bex_types::media::LinkedId { source: id.source.clone(), id: id.id.clone(), }) .collect(), extra: c .extra .iter() .map(|a| (a.key.clone(), a.value.clone())) .collect(), } } pub fn image_set_to_json(s: &ImageSet) -> bex_types::media::ImageSet { bex_types::media::ImageSet { low: s.low.as_ref().map(image_to_json), medium: s.medium.as_ref().map(image_to_json), high: s.high.as_ref().map(image_to_json), backdrop: s.backdrop.as_ref().map(image_to_json), logo: s.logo.as_ref().map(image_to_json), } } pub fn image_to_json(i: &Image) -> bex_types::media::Image { bex_types::media::Image { url: i.url.clone(), layout: format!("{:?}", i.layout).to_lowercase(), width: i.width, height: i.height, blurhash: i.blurhash.clone(), } } pub fn paged_result_to_json(r: &PagedResult) -> bex_types::media::PagedResult { bex_types::media::PagedResult { items: r.items.iter().map(media_card_to_json).collect(), categories: r .categories .iter() .map(|c| bex_types::media::CategoryLink { id: c.id.clone(), title: c.title.clone(), subtitle: c.subtitle.clone(), image: c.image.as_ref().map(image_to_json), }) .collect(), next_page: r.next_page.clone(), } } #[allow(clippy::too_many_lines)] pub fn media_info_to_json(m: &MediaInfo) -> bex_types::media::MediaInfo { bex_types::media::MediaInfo { id: m.id.clone(), title: m.title.clone(), kind: format!("{:?}", m.kind).to_lowercase(), images: m.images.as_ref().map(image_set_to_json), original_title: m.original_title.clone(), description: m.description.clone(), score: m.score, scored_by: m.scored_by, year: m.year.clone(), release_date: m.release_date.clone(), genres: m.genres.clone(), tags: m.tags.clone(), status: m.status.map(|s| format!("{:?}", s).to_lowercase()), content_rating: m.content_rating.clone(), seasons: m .seasons .iter() .map(|s| bex_types::media::Season { id: s.id.clone(), title: s.title.clone(), number: s.number, year: s.year, episodes: s .episodes .iter() .map(|e| bex_types::media::Episode { id: e.id.clone(), title: e.title.clone(), number: e.number, season: e.season, images: e.images.as_ref().map(image_set_to_json), description: e.description.clone(), released: e.released.clone(), score: e.score, url: e.url.clone(), tags: e.tags.clone(), extra: e .extra .iter() .map(|a| (a.key.clone(), a.value.clone())) .collect(), }) .collect(), }) .collect(), cast: m .cast .iter() .map(|p| bex_types::media::Person { id: p.id.clone(), name: p.name.clone(), image: p.image.as_ref().map(image_set_to_json), role: p.role.clone(), url: p.url.clone(), }) .collect(), crew: m .crew .iter() .map(|p| bex_types::media::Person { id: p.id.clone(), name: p.name.clone(), image: p.image.as_ref().map(image_set_to_json), role: p.role.clone(), url: p.url.clone(), }) .collect(), runtime_minutes: m.runtime_minutes, trailer_url: m.trailer_url.clone(), ids: m .ids .iter() .map(|id| bex_types::media::LinkedId { source: id.source.clone(), id: id.id.clone(), }) .collect(), studio: m.studio.clone(), country: m.country.clone(), language: m.language.clone(), url: m.url.clone(), extra: m .extra .iter() .map(|a| (a.key.clone(), a.value.clone())) .collect(), } } pub fn servers_to_json(servers: &[Server]) -> Vec { servers .iter() .map(|s| bex_types::stream::Server { id: s.id.clone(), label: s.label.clone(), url: s.url.clone(), priority: s.priority, extra: s .extra .iter() .map(|a| (a.key.clone(), a.value.clone())) .collect(), }) .collect() } pub fn json_to_server(s: &bex_types::stream::Server) -> Server { Server { id: s.id.clone(), label: s.label.clone(), url: s.url.clone(), priority: s.priority, extra: s .extra .iter() .map(|(k, v)| Attr { key: k.clone(), value: v.clone(), }) .collect(), } } pub fn stream_source_to_json(s: &StreamSource) -> bex_types::stream::StreamSource { bex_types::stream::StreamSource { id: s.id.clone(), label: s.label.clone(), format: format!("{:?}", s.format).to_lowercase(), manifest_url: s.manifest_url.clone(), videos: s .videos .iter() .map(|v| bex_types::stream::VideoTrack { resolution: bex_types::stream::VideoResolution { width: v.resolution.width, height: v.resolution.height, hdr: v.resolution.hdr, label: v.resolution.label.clone(), }, url: v.url.clone(), mime_type: v.mime_type.clone(), bitrate: v.bitrate, codecs: v.codecs.clone(), }) .collect(), subtitles: s .subtitles .iter() .map(|st| bex_types::stream::SubtitleTrack { label: st.label.clone(), url: st.url.clone(), language: st.language.clone(), format: st.format.clone(), }) .collect(), headers: s .headers .iter() .map(|a| (a.key.clone(), a.value.clone())) .collect(), extra: s .extra .iter() .map(|a| (a.key.clone(), a.value.clone())) .collect(), } } }