| use crate::config::EngineConfig; |
| use crate::host_state::HostState; |
| use crate::http_service::HttpHostService; |
| use crate::registry::PluginRegistry; |
| use bex_db::BexDb; |
| use bex_types::plugin_info::PluginInfo; |
| use bex_types::{BexError, Manifest}; |
| use parking_lot::RwLock; |
| use std::path::Path; |
| use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering}; |
| use std::sync::Arc; |
| use wasmtime::component::{Component, Linker}; |
| use wasmtime::{Engine as WasmtimeEngine, ResourceLimiter, Store}; |
|
|
| |
| wasmtime::component::bindgen!({ |
| path: "../../wit", |
| world: "plugin", |
| }); |
|
|
| |
| pub use crate::engine::bex::plugin::common::*; |
| pub use crate::engine::bex::plugin::http; |
| pub use crate::engine::bex::plugin::kv; |
| pub use crate::engine::bex::plugin::secrets; |
| pub use crate::engine::bex::plugin::log; |
| pub use crate::engine::bex::plugin::clock; |
| pub use crate::engine::bex::plugin::rng; |
| pub use crate::engine::bex::plugin::js; |
|
|
| |
|
|
| const STATE_NOT_READY: u8 = 0; |
| const STATE_READY: u8 = 1; |
| const STATE_DRAINING: u8 = 2; |
| const STATE_STOPPED: u8 = 3; |
|
|
| |
|
|
| |
| |
| struct CompileCache { |
| cache_dir: std::path::PathBuf, |
| key_material: Vec<u8>, |
| } |
|
|
| impl CompileCache { |
| fn new(cache_dir: std::path::PathBuf) -> Result<Self, BexError> { |
| std::fs::create_dir_all(&cache_dir) |
| .map_err(|e| BexError::Internal(format!("cache dir: {e}")))?; |
|
|
| |
| let key_material = b"bex-engine-compile-cache-v2-key".to_vec(); |
|
|
| Ok(Self { |
| cache_dir, |
| key_material, |
| }) |
| } |
|
|
| |
| fn cache_path(&self, wasm_hash: &[u8]) -> std::path::PathBuf { |
| let hex_hash = hex::encode(wasm_hash); |
| self.cache_dir.join(format!("{}.cwasm", hex_hash)) |
| } |
|
|
| |
| fn compute_tag(&self, data: &[u8]) -> Vec<u8> { |
| use hmac::{Hmac, Mac}; |
| type HmacSha256 = Hmac<sha2::Sha256>; |
| let mut mac = HmacSha256::new_from_slice(&self.key_material) |
| .expect("HMAC can take key of any size"); |
| mac.update(data); |
| mac.finalize().into_bytes().to_vec() |
| } |
|
|
| |
| |
| fn load(&self, wasm_bytes: &[u8]) -> Option<Vec<u8>> { |
| let wasm_hash = sha2_hash(wasm_bytes); |
| let path = self.cache_path(&wasm_hash); |
| let cached = std::fs::read(&path).ok()?; |
|
|
| |
| if cached.len() < 32 { |
| return None; |
| } |
| let (stored_tag, component_data) = cached.split_at(32); |
| let expected_tag = self.compute_tag(component_data); |
|
|
| |
| if stored_tag.len() != expected_tag.len() { |
| return None; |
| } |
| let mut diff = 0u8; |
| for (a, b) in stored_tag.iter().zip(expected_tag.iter()) { |
| diff |= a ^ b; |
| } |
| if diff != 0 { |
| tracing::warn!("Compile cache HMAC mismatch, ignoring cached file"); |
| return None; |
| } |
|
|
| Some(component_data.to_vec()) |
| } |
|
|
| |
| fn store(&self, wasm_bytes: &[u8], component_data: &[u8]) { |
| let wasm_hash = sha2_hash(wasm_bytes); |
| let path = self.cache_path(&wasm_hash); |
| let tag = self.compute_tag(component_data); |
|
|
| let mut out = Vec::with_capacity(32 + component_data.len()); |
| out.extend_from_slice(&tag); |
| out.extend_from_slice(component_data); |
|
|
| if let Err(e) = std::fs::write(&path, &out) { |
| tracing::warn!("Failed to write compile cache: {}", e); |
| } |
| } |
| } |
|
|
| fn sha2_hash(data: &[u8]) -> Vec<u8> { |
| use sha2::{Digest, Sha256}; |
| let mut hasher = Sha256::new(); |
| hasher.update(data); |
| hasher.finalize().to_vec() |
| } |
|
|
| |
| fn compile_or_cache( |
| wasmtime: &WasmtimeEngine, |
| wasm_bytes: &[u8], |
| cache: &CompileCache, |
| ) -> Result<Component, BexError> { |
| |
| if let Some(cached_data) = cache.load(wasm_bytes) { |
| |
| |
| if let Ok(component) = unsafe { Component::deserialize(wasmtime, &cached_data) } { |
| tracing::debug!("Compile cache hit"); |
| return Ok(component); |
| } |
| |
| tracing::warn!("Compile cache deserialize failed, recompiling"); |
| } |
|
|
| |
| let component = Component::new(wasmtime, wasm_bytes) |
| .map_err(|e| BexError::Internal(format!("compile wasm: {e}")))?; |
|
|
| |
| if let Ok(serialized) = component.serialize() { |
| cache.store(wasm_bytes, &serialized); |
| } |
|
|
| Ok(component) |
| } |
|
|
| |
|
|
| |
| |
| struct EpochTicker { |
| _handle: Option<std::thread::JoinHandle<()>>, |
| shutdown: Arc<std::sync::atomic::AtomicBool>, |
| } |
|
|
| impl EpochTicker { |
| fn start(engine: WasmtimeEngine, interval_ms: u64) -> Self { |
| let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false)); |
| let shutdown_clone = shutdown.clone(); |
| let eng = engine.clone(); |
|
|
| let handle = std::thread::Builder::new() |
| .name("bex-epoch-ticker".to_string()) |
| .spawn(move || { |
| let interval = std::time::Duration::from_millis(interval_ms); |
| while !shutdown_clone.load(Ordering::Relaxed) { |
| std::thread::sleep(interval); |
| eng.increment_epoch(); |
| } |
| }) |
| .expect("failed to spawn epoch ticker thread"); |
|
|
| Self { |
| _handle: Some(handle), |
| shutdown, |
| } |
| } |
| } |
|
|
| impl Drop for EpochTicker { |
| fn drop(&mut self) { |
| self.shutdown.store(true, Ordering::Relaxed); |
| } |
| } |
|
|
| |
|
|
| struct EngineInner { |
| |
| |
| #[allow(dead_code)] |
| runtime: Arc<tokio::runtime::Runtime>, |
| wasmtime: WasmtimeEngine, |
| linker: Linker<HostState>, |
| db: Arc<BexDb>, |
| http: Arc<HttpHostService>, |
| js_pool: Arc<bex_js::JsPool>, |
| config: Arc<EngineConfig>, |
| registry: RwLock<PluginRegistry>, |
| compile_cache: CompileCache, |
| state: AtomicU8, |
| active_calls: AtomicUsize, |
| start_time: std::time::Instant, |
| _epoch_ticker: EpochTicker, |
| _lock_file: std::fs::File, |
| } |
|
|
| #[derive(Clone)] |
| pub struct Engine { |
| inner: Arc<EngineInner>, |
| } |
|
|
| impl Engine { |
| pub fn new(config: EngineConfig) -> Result<Self, BexError> { |
| |
| std::fs::create_dir_all(&config.data_dir) |
| .map_err(|e| BexError::Internal(format!("data dir: {e}")))?; |
|
|
| |
| let lock_path = config.data_dir.join(".bex.lock"); |
| let lock_file = std::fs::OpenOptions::new() |
| .create(true) |
| .read(true) |
| .write(true) |
| .open(&lock_path) |
| .map_err(|e| BexError::Internal(format!("lock file: {e}")))?; |
|
|
| use fs4::fs_std::FileExt; |
| lock_file |
| .try_lock_exclusive() |
| .map_err(|e| { |
| BexError::Internal(format!( |
| "Another BEX engine instance is using this data directory: {e}" |
| )) |
| })?; |
|
|
| |
| let mut wt_config = wasmtime::Config::new(); |
| wt_config.wasm_component_model(true); |
| wt_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable); |
| wt_config.consume_fuel(true); |
| wt_config.epoch_interruption(true); |
| wt_config.max_wasm_stack(4 * 1024 * 1024); |
|
|
| let wasmtime = WasmtimeEngine::new(&wt_config) |
| .map_err(|e| BexError::Internal(format!("wasmtime init: {e}")))?; |
|
|
| |
| let runtime = tokio::runtime::Builder::new_multi_thread() |
| .worker_threads(4) |
| .enable_all() |
| .build() |
| .map_err(|e| BexError::Internal(format!("tokio runtime: {e}")))?; |
|
|
| let runtime = Arc::new(runtime); |
|
|
| |
| let db = Arc::new( |
| BexDb::open(&config.data_dir).map_err(|e| BexError::Storage(e.to_string()))?, |
| ); |
|
|
| |
| let http = Arc::new(HttpHostService::new( |
| &config.user_agent, |
| config.http_timeout_ms, |
| config.http_pool_idle_timeout_ms, |
| config.http_pool_max_idle_per_host, |
| )); |
|
|
| |
| let linker = Self::build_linker(&wasmtime)?; |
|
|
| |
| let epoch_ticker = EpochTicker::start(wasmtime.clone(), config.epoch_interval_ms); |
|
|
| |
| let cache_dir = config.data_dir.join("cache/wasm"); |
| let compile_cache = CompileCache::new(cache_dir)?; |
|
|
| |
| |
| let js_pool_config = bex_js::JsPoolConfig { |
| initial_workers: config.js_initial_workers, |
| max_workers: config.js_max_workers, |
| memory_limit_bytes: config.js_memory_limit_mb as usize * 1024 * 1024, |
| default_timeout_ms: config.js_timeout_ms, |
| context_idle_ttl_secs: config.js_context_idle_ttl_secs, |
| worker_idle_ttl_secs: config.js_worker_idle_ttl_secs, |
| max_stack_bytes: config.js_max_stack_bytes, |
| }; |
| let js_pool = Arc::new( |
| bex_js::JsPool::new(js_pool_config) |
| .map_err(|e| BexError::Internal(format!("JS pool: {e}")))?, |
| ); |
|
|
| |
| let plugins_dir = config.data_dir.join("plugins/installed"); |
| std::fs::create_dir_all(&plugins_dir) |
| .map_err(|e| BexError::Internal(format!("plugins dir: {e}")))?; |
|
|
| let config_arc = Arc::new(config); |
|
|
| let inner = Arc::new(EngineInner { |
| runtime, |
| wasmtime, |
| linker, |
| db, |
| http, |
| js_pool, |
| config: config_arc, |
| registry: RwLock::new(PluginRegistry::new()), |
| compile_cache, |
| state: AtomicU8::new(STATE_NOT_READY), |
| active_calls: AtomicUsize::new(0), |
| start_time: std::time::Instant::now(), |
| _epoch_ticker: epoch_ticker, |
| _lock_file: lock_file, |
| }); |
|
|
| let engine = Self { inner }; |
|
|
| |
| engine.reload_from_db()?; |
|
|
| |
| engine.inner.state.store(STATE_READY, Ordering::Release); |
|
|
| Ok(engine) |
| } |
|
|
| |
| fn build_linker(wasmtime: &WasmtimeEngine) -> Result<Linker<HostState>, BexError> { |
| let mut linker = Linker::<HostState>::new(wasmtime); |
|
|
| |
| |
| |
| wasmtime_wasi::add_to_linker_sync(&mut linker) |
| .map_err(|e| BexError::Internal(format!("wasi linker: {e}")))?; |
|
|
| |
| Plugin::add_to_linker(&mut linker, |state: &mut HostState| state) |
| .map_err(|e| BexError::Internal(format!("linker setup: {e}")))?; |
|
|
| Ok(linker) |
| } |
|
|
| |
| fn check_ready(&self) -> Result<(), BexError> { |
| match self.inner.state.load(Ordering::Acquire) { |
| STATE_READY => Ok(()), |
| STATE_NOT_READY => Err(BexError::NotReady), |
| STATE_DRAINING => Err(BexError::Internal("engine is draining".into())), |
| STATE_STOPPED => Err(BexError::Internal("engine is stopped".into())), |
| _ => Err(BexError::Internal("invalid engine state".into())), |
| } |
| } |
|
|
| |
| fn reload_from_db(&self) -> Result<(), BexError> { |
| let plugins = self |
| .inner |
| .db |
| .list_plugins() |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
|
|
| for info in plugins { |
| |
| let wasm_bytes = match self.inner.db.get_wasm_blob(&info.id) { |
| Ok(Some(bytes)) => bytes, |
| _ => continue, |
| }; |
|
|
| |
| let manifest = match self.inner.db.get_manifest(&info.id) { |
| Ok(Some(yaml)) => match serde_yaml::from_str::<Manifest>(&yaml) { |
| Ok(m) => m, |
| _ => continue, |
| }, |
| _ => continue, |
| }; |
|
|
| |
| let component = match compile_or_cache( |
| &self.inner.wasmtime, |
| &wasm_bytes, |
| &self.inner.compile_cache, |
| ) { |
| Ok(c) => Arc::new(c), |
| _ => continue, |
| }; |
|
|
| |
| let wasm_path = self.inner.config.data_dir.join(format!( |
| "plugins/installed/{}/plugin.wasm", |
| info.id.replace('.', "_") |
| )); |
| if let Some(parent) = wasm_path.parent() { |
| let _ = std::fs::create_dir_all(parent); |
| } |
| let _ = std::fs::write(&wasm_path, &wasm_bytes); |
|
|
| let record = { |
| let mut r = crate::registry::PluginRecord::new( |
| manifest, |
| component, |
| self.inner.config.circuit_breaker_threshold, |
| self.inner.config.circuit_breaker_cooldown_ms, |
| ); |
| r.enabled = info.enabled; |
| r |
| }; |
| self.inner.registry.write().insert(Arc::new(record)); |
| } |
|
|
| Ok(()) |
| } |
|
|
| |
|
|
| pub fn install_plugin(&self, path: &Path) -> Result<PluginInfo, BexError> { |
| self.check_ready()?; |
| let data = std::fs::read(path).map_err(|e| BexError::Internal(format!("read: {e}")))?; |
| self.install_bytes(&data) |
| } |
|
|
| |
| pub fn install_bytes(&self, data: &[u8]) -> Result<PluginInfo, BexError> { |
| self.check_ready()?; |
|
|
| let package = bex_pkg::unpack(data)?; |
| package |
| .manifest |
| .validate(&self.inner.config.host_version)?; |
|
|
| |
| let component = compile_or_cache( |
| &self.inner.wasmtime, |
| &package.wasm, |
| &self.inner.compile_cache, |
| )?; |
| let component = Arc::new(component); |
|
|
| let id = package.manifest.id.clone(); |
|
|
| |
| let wasm_path = self.inner.config.data_dir.join(format!( |
| "plugins/installed/{}/plugin.wasm", |
| id.replace('.', "_") |
| )); |
| if let Some(parent) = wasm_path.parent() { |
| let _ = std::fs::create_dir_all(parent); |
| } |
| let _ = std::fs::write(&wasm_path, &package.wasm); |
|
|
| |
| self.inner |
| .db |
| .save_wasm_blob(&id, &package.wasm) |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
| let manifest_yaml = serde_yaml::to_string(&package.manifest) |
| .map_err(|e| BexError::Internal(e.to_string()))?; |
| self.inner |
| .db |
| .save_manifest(&id, &manifest_yaml) |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
|
|
| let record = Arc::new(crate::registry::PluginRecord::new( |
| package.manifest, |
| component, |
| self.inner.config.circuit_breaker_threshold, |
| self.inner.config.circuit_breaker_cooldown_ms, |
| )); |
| let info = record.to_plugin_info(); |
| self.inner |
| .db |
| .save_plugin_info(&info) |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
| self.inner.registry.write().insert(record); |
|
|
| Ok(info) |
| } |
|
|
| |
| pub fn install_plugin_raw( |
| &self, |
| manifest: Manifest, |
| wasm_bytes: Vec<u8>, |
| ) -> Result<PluginInfo, BexError> { |
| self.check_ready()?; |
| manifest.validate(&self.inner.config.host_version)?; |
|
|
| let component = compile_or_cache( |
| &self.inner.wasmtime, |
| &wasm_bytes, |
| &self.inner.compile_cache, |
| )?; |
| let component = Arc::new(component); |
|
|
| let id = manifest.id.clone(); |
|
|
| |
| let wasm_path = self.inner.config.data_dir.join(format!( |
| "plugins/installed/{}/plugin.wasm", |
| id.replace('.', "_") |
| )); |
| if let Some(parent) = wasm_path.parent() { |
| let _ = std::fs::create_dir_all(parent); |
| } |
| let _ = std::fs::write(&wasm_path, &wasm_bytes); |
|
|
| |
| self.inner |
| .db |
| .save_wasm_blob(&id, &wasm_bytes) |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
| let manifest_yaml = serde_yaml::to_string(&manifest) |
| .map_err(|e| BexError::Internal(e.to_string()))?; |
| self.inner |
| .db |
| .save_manifest(&id, &manifest_yaml) |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
|
|
| let record = Arc::new(crate::registry::PluginRecord::new( |
| manifest, |
| component, |
| self.inner.config.circuit_breaker_threshold, |
| self.inner.config.circuit_breaker_cooldown_ms, |
| )); |
| let info = record.to_plugin_info(); |
| self.inner |
| .db |
| .save_plugin_info(&info) |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
| self.inner.registry.write().insert(record); |
|
|
| Ok(info) |
| } |
|
|
| pub fn uninstall_plugin(&self, id: &str) -> Result<(), BexError> { |
| self.check_ready()?; |
|
|
| |
| let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5); |
| while self.inner.active_calls.load(Ordering::Acquire) > 0 { |
| if std::time::Instant::now() > deadline { |
| return Err(BexError::Internal( |
| "timeout waiting for active calls to drain".into(), |
| )); |
| } |
| std::thread::sleep(std::time::Duration::from_millis(50)); |
| } |
|
|
| |
| let wasm_path = self.inner.config.data_dir.join(format!( |
| "plugins/installed/{}/plugin.wasm", |
| id.replace('.', "_") |
| )); |
| let _ = std::fs::remove_file(&wasm_path); |
| |
| if let Some(parent) = wasm_path.parent() { |
| let _ = std::fs::remove_dir(parent); |
| } |
|
|
| |
| self.inner.js_pool.evict_plugin(id); |
|
|
| self.inner.registry.write().remove(id); |
| self.inner |
| .db |
| .remove_plugin(id) |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
| Ok(()) |
| } |
|
|
| pub fn list_plugins(&self) -> Vec<PluginInfo> { |
| self.inner.registry.read().list() |
| } |
|
|
| pub fn get_plugin_info(&self, id: &str) -> Option<PluginInfo> { |
| self.inner.registry.read().get(id).map(|r| r.to_plugin_info()) |
| } |
|
|
| pub fn enable_plugin(&self, id: &str) -> Result<(), BexError> { |
| self.check_ready()?; |
| let mut reg = self.inner.registry.write(); |
| let old = reg |
| .remove(id) |
| .ok_or_else(|| BexError::PluginNotFound(id.into()))?; |
|
|
| |
| let new_record = Arc::new(crate::registry::PluginRecord { |
| id: old.id.clone(), |
| manifest: old.manifest.clone(), |
| capabilities: old.capabilities, |
| enabled: true, |
| installed_at: old.installed_at, |
| component: old.component.clone(), |
| health: crate::registry::PluginHealth::new( |
| self.inner.config.circuit_breaker_threshold, |
| self.inner.config.circuit_breaker_cooldown_ms, |
| ), |
| }); |
| let info = new_record.to_plugin_info(); |
| reg.insert(new_record); |
| drop(reg); |
|
|
| self.inner |
| .db |
| .save_plugin_info(&info) |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
| Ok(()) |
| } |
|
|
| pub fn disable_plugin(&self, id: &str) -> Result<(), BexError> { |
| self.check_ready()?; |
| let mut reg = self.inner.registry.write(); |
| let old = reg |
| .remove(id) |
| .ok_or_else(|| BexError::PluginNotFound(id.into()))?; |
|
|
| let new_record = Arc::new(crate::registry::PluginRecord { |
| id: old.id.clone(), |
| manifest: old.manifest.clone(), |
| capabilities: old.capabilities, |
| enabled: false, |
| installed_at: old.installed_at, |
| component: old.component.clone(), |
| health: crate::registry::PluginHealth::new( |
| self.inner.config.circuit_breaker_threshold, |
| self.inner.config.circuit_breaker_cooldown_ms, |
| ), |
| }); |
| let info = new_record.to_plugin_info(); |
| reg.insert(new_record); |
| drop(reg); |
|
|
| self.inner |
| .db |
| .save_plugin_info(&info) |
| .map_err(|e| BexError::Storage(e.to_string()))?; |
| Ok(()) |
| } |
|
|
| |
|
|
| |
| pub fn secret_set(&self, plugin_id: &str, key: &str, value: &str) -> Result<(), BexError> { |
| self.inner |
| .db |
| .secret_set(plugin_id, key, value) |
| .map_err(|e| BexError::Storage(e.to_string())) |
| } |
|
|
| |
| pub fn secret_get(&self, plugin_id: &str, key: &str) -> Result<Option<String>, BexError> { |
| self.inner |
| .db |
| .secret_get(plugin_id, key) |
| .map_err(|e| BexError::Storage(e.to_string())) |
| } |
|
|
| |
| pub fn secret_remove(&self, plugin_id: &str, key: &str) -> Result<bool, BexError> { |
| self.inner |
| .db |
| .secret_remove(plugin_id, key) |
| .map_err(|e| BexError::Storage(e.to_string())) |
| } |
|
|
| |
| pub fn secret_keys(&self, plugin_id: &str) -> Result<Vec<String>, BexError> { |
| self.inner |
| .db |
| .secret_keys(plugin_id, "") |
| .map_err(|e| BexError::Storage(e.to_string())) |
| } |
|
|
| |
|
|
| fn instantiate(&self, plugin_id: &str) -> Result<(Plugin, Store<HostState>), BexError> { |
| self.check_ready()?; |
|
|
| let record = self |
| .inner |
| .registry |
| .read() |
| .get(plugin_id) |
| .cloned() |
| .ok_or_else(|| BexError::PluginNotFound(plugin_id.into()))?; |
|
|
| if !record.enabled { |
| return Err(BexError::PluginDisabled(plugin_id.into())); |
| } |
|
|
| |
| if !record.health.is_available() { |
| return Err(BexError::PluginFault(format!( |
| "plugin '{}' is circuit-broken (too many consecutive failures)", |
| plugin_id |
| ))); |
| } |
|
|
| let component = record.component.clone(); |
|
|
| |
| let mut store = Store::new( |
| &self.inner.wasmtime, |
| HostState::new( |
| self.inner.http.clone(), |
| self.inner.db.clone(), |
| self.inner.js_pool.clone(), |
| self.inner.runtime.clone(), |
| plugin_id.to_string(), |
| record.manifest.clone(), |
| Arc::from(self.inner.config.user_agent.as_str()), |
| self.inner.config.http_timeout_ms, |
| self.inner.config.max_response_bytes, |
| self.inner.config.memory_limit_mb, |
| ), |
| ); |
|
|
| |
| store |
| .set_fuel(self.inner.config.fuel_per_call) |
| .map_err(|_| BexError::FuelExhausted)?; |
|
|
| |
| let epoch_deadline = (self.inner.config.call_timeout_ms as u64 |
| / self.inner.config.epoch_interval_ms.max(1)) |
| .max(1); |
| store.set_epoch_deadline(epoch_deadline); |
|
|
| |
| store.limiter(|state: &mut HostState| -> &mut dyn ResourceLimiter { |
| &mut state.limiter |
| }); |
|
|
| |
| self.inner.active_calls.fetch_add(1, Ordering::AcqRel); |
|
|
| |
| let instance = self |
| .inner |
| .linker |
| .instantiate(&mut store, &component) |
| .map_err(|e| classify_trap(e, plugin_id))?; |
|
|
| let plugin = Plugin::new(&mut store, &instance) |
| .map_err(|e| BexError::PluginFault(format!("bind: {e}")))?; |
|
|
| Ok((plugin, store)) |
| } |
|
|
| |
| fn call_plugin_inner<R>( |
| &self, |
| plugin_id: &str, |
| call: impl FnOnce(Plugin, &mut Store<HostState>) -> wasmtime::Result<Result<R, PluginError>>, |
| ) -> Result<R, BexError> |
| where |
| R: Send + 'static, |
| { |
| let (instance, mut store) = self.instantiate(plugin_id)?; |
|
|
| |
| let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { |
| call(instance, &mut store) |
| })); |
|
|
| |
| self.inner.active_calls.fetch_sub(1, Ordering::AcqRel); |
|
|
| |
| let record = self.inner.registry.read().get(plugin_id).cloned(); |
|
|
| match result { |
| Ok(Ok(Ok(val))) => { |
| |
| if let Some(r) = record { |
| r.health.record_success(); |
| } |
| Ok(val) |
| } |
| Ok(Ok(Err(plugin_err))) => { |
| |
| if let Some(r) = record { |
| r.health.record_failure(); |
| } |
| Err(classify_plugin_error(plugin_err)) |
| } |
| Ok(Err(trap)) => { |
| |
| if let Some(r) = record { |
| r.health.record_failure(); |
| } |
| Err(classify_trap(trap, plugin_id)) |
| } |
| Err(_panic_payload) => { |
| |
| tracing::error!(plugin = %plugin_id, "WASM call panicked"); |
| let _ = self.disable_plugin(plugin_id); |
| Err(BexError::PluginFault( |
| "panic — plugin auto-disabled".into(), |
| )) |
| } |
| } |
| } |
|
|
| |
|
|
| pub fn call_get_home( |
| &self, |
| plugin_id: &str, |
| ctx: &RequestContext, |
| ) -> Result<Vec<HomeSection>, BexError> { |
| self.call_plugin_inner(plugin_id, |instance, store| { |
| instance.api().call_get_home(store, ctx) |
| }) |
| } |
|
|
| pub fn call_search( |
| &self, |
| plugin_id: &str, |
| ctx: &RequestContext, |
| query: &str, |
| filters: &SearchFilters, |
| ) -> Result<PagedResult, BexError> { |
| self.call_plugin_inner(plugin_id, |instance, store| { |
| instance.api().call_search(store, ctx, query, filters) |
| }) |
| } |
|
|
| pub fn call_get_info( |
| &self, |
| plugin_id: &str, |
| ctx: &RequestContext, |
| id: &str, |
| ) -> Result<MediaInfo, BexError> { |
| self.call_plugin_inner(plugin_id, |instance, store| { |
| instance.api().call_get_info(store, ctx, id) |
| }) |
| } |
|
|
| pub fn call_get_servers( |
| &self, |
| plugin_id: &str, |
| ctx: &RequestContext, |
| id: &str, |
| ) -> Result<Vec<Server>, BexError> { |
| self.call_plugin_inner(plugin_id, |instance, store| { |
| instance.api().call_get_servers(store, ctx, id) |
| }) |
| } |
|
|
| pub fn call_resolve_stream( |
| &self, |
| plugin_id: &str, |
| ctx: &RequestContext, |
| server: &Server, |
| ) -> Result<StreamSource, BexError> { |
| self.call_plugin_inner(plugin_id, |instance, store| { |
| instance.api().call_resolve_stream(store, ctx, server) |
| }) |
| } |
|
|
| pub fn call_search_subtitles( |
| &self, |
| plugin_id: &str, |
| ctx: &RequestContext, |
| query: &SubtitleQuery, |
| ) -> Result<Vec<SubtitleEntry>, BexError> { |
| self.call_plugin_inner(plugin_id, |instance, store| { |
| instance.api().call_search_subtitles(store, ctx, query) |
| }) |
| } |
|
|
| pub fn call_download_subtitle( |
| &self, |
| plugin_id: &str, |
| ctx: &RequestContext, |
| id: &str, |
| ) -> Result<SubtitleFile, BexError> { |
| self.call_plugin_inner(plugin_id, |instance, store| { |
| instance.api().call_download_subtitle(store, ctx, id) |
| }) |
| } |
|
|
| pub fn call_get_articles( |
| &self, |
| plugin_id: &str, |
| ctx: &RequestContext, |
| ) -> Result<Vec<ArticleSection>, BexError> { |
| self.call_plugin_inner(plugin_id, |instance, store| { |
| instance.api().call_get_articles(store, ctx) |
| }) |
| } |
|
|
| pub fn call_search_articles( |
| &self, |
| plugin_id: &str, |
| ctx: &RequestContext, |
| query: &str, |
| ) -> Result<Vec<Article>, BexError> { |
| self.call_plugin_inner(plugin_id, |instance, store| { |
| instance.api().call_search_articles(store, ctx, query) |
| }) |
| } |
|
|
| |
|
|
| pub fn default_ctx() -> RequestContext { |
| RequestContext { |
| request_id: format!( |
| "{:x}", |
| std::time::SystemTime::now() |
| .duration_since(std::time::UNIX_EPOCH) |
| .unwrap_or_default() |
| .as_millis() |
| ), |
| locale: None, |
| region: None, |
| safe_mode: false, |
| hints: vec![], |
| } |
| } |
|
|
| |
|
|
| pub fn call_get_home_json(&self, plugin_id: &str) -> Result<String, BexError> { |
| let result = self.call_get_home(plugin_id, &Self::default_ctx())?; |
| serde_json::to_string(&convert::home_sections_to_json(&result)) |
| .map_err(|e| BexError::Internal(e.to_string())) |
| } |
|
|
| pub fn call_search_json(&self, plugin_id: &str, query: &str) -> Result<String, BexError> { |
| let filters = SearchFilters { |
| kind: None, |
| page: PageCursor { |
| token: None, |
| limit: None, |
| }, |
| fast_match: false, |
| }; |
| let result = self.call_search(plugin_id, &Self::default_ctx(), query, &filters)?; |
| serde_json::to_string(&convert::paged_result_to_json(&result)) |
| .map_err(|e| BexError::Internal(e.to_string())) |
| } |
|
|
| pub fn call_get_info_json( |
| &self, |
| plugin_id: &str, |
| media_id: &str, |
| ) -> Result<String, BexError> { |
| let result = self.call_get_info(plugin_id, &Self::default_ctx(), media_id)?; |
| serde_json::to_string(&convert::media_info_to_json(&result)) |
| .map_err(|e| BexError::Internal(e.to_string())) |
| } |
|
|
| pub fn call_get_servers_json( |
| &self, |
| plugin_id: &str, |
| id: &str, |
| ) -> Result<String, BexError> { |
| let result = self.call_get_servers(plugin_id, &Self::default_ctx(), id)?; |
| serde_json::to_string(&convert::servers_to_json(&result)) |
| .map_err(|e| BexError::Internal(e.to_string())) |
| } |
|
|
| pub fn call_resolve_stream_json( |
| &self, |
| plugin_id: &str, |
| server_json: &str, |
| ) -> Result<String, BexError> { |
| let server_json_val: bex_types::stream::Server = serde_json::from_str(server_json) |
| .map_err(|e| BexError::Internal(format!("parse server: {e}")))?; |
| let server = convert::json_to_server(&server_json_val); |
| let result = self.call_resolve_stream(plugin_id, &Self::default_ctx(), &server)?; |
| serde_json::to_string(&convert::stream_source_to_json(&result)) |
| .map_err(|e| BexError::Internal(e.to_string())) |
| } |
|
|
| |
|
|
| pub fn stats(&self) -> bex_types::engine_types::EngineStats { |
| let reg = self.inner.registry.read(); |
| let total = reg.list().len(); |
| let enabled = reg.list().iter().filter(|p| p.enabled).count(); |
| let active = self.inner.active_calls.load(Ordering::Acquire); |
| bex_types::engine_types::EngineStats { |
| uptime_ms: self.inner.start_time.elapsed().as_millis() as u64, |
| total_plugins: total, |
| enabled_plugins: enabled, |
| active_calls: active, |
| } |
| } |
|
|
| pub fn shutdown(&self) { |
| tracing::info!("BEX Engine shutting down..."); |
|
|
| |
| self.inner.state.store(STATE_DRAINING, Ordering::Release); |
|
|
| |
| let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2); |
| while self.inner.active_calls.load(Ordering::Acquire) > 0 { |
| if std::time::Instant::now() > deadline { |
| tracing::warn!( |
| "Shutdown: {} active calls still running after 2s timeout", |
| self.inner.active_calls.load(Ordering::Acquire) |
| ); |
| break; |
| } |
| std::thread::sleep(std::time::Duration::from_millis(50)); |
| } |
|
|
| |
| self.inner.state.store(STATE_STOPPED, Ordering::Release); |
| tracing::info!("BEX Engine shut down complete"); |
| } |
| } |
|
|
| |
|
|
| |
| fn classify_trap(error: wasmtime::Error, plugin_id: &str) -> BexError { |
| let msg = error.to_string(); |
|
|
| |
| if msg.contains("epoch") || msg.contains("timeout") { |
| return BexError::Timeout { ms: 0 }; |
| } |
|
|
| |
| if msg.contains("fuel") { |
| return BexError::FuelExhausted; |
| } |
|
|
| |
| if msg.contains("memory") && (msg.contains("grow") || msg.contains("limit")) { |
| return BexError::PluginFault(format!("out of memory: {}", plugin_id)); |
| } |
|
|
| |
| BexError::PluginFault(msg) |
| } |
|
|
| |
| fn classify_plugin_error(err: PluginError) -> BexError { |
| match err { |
| PluginError::Network(msg) => BexError::Network(msg), |
| PluginError::Parse(msg) => BexError::PluginError(msg), |
| PluginError::NotFound => BexError::PluginError("not found".into()), |
| PluginError::Unauthorized => BexError::PluginError("unauthorized".into()), |
| PluginError::Forbidden => BexError::PluginError("forbidden".into()), |
| PluginError::RateLimited(Some(secs)) => BexError::Timeout { |
| ms: secs * 1000, |
| }, |
| PluginError::RateLimited(None) => BexError::Timeout { ms: 0 }, |
| PluginError::Timeout => BexError::Timeout { ms: 0 }, |
| PluginError::Cancelled => BexError::Cancelled, |
| PluginError::Unsupported => { |
| BexError::Unsupported("plugin does not support this operation".into()) |
| } |
| PluginError::InvalidInput(msg) => BexError::PluginError(msg), |
| PluginError::Internal(msg) => BexError::PluginError(msg), |
| } |
| } |
|
|
| |
| pub mod convert { |
| use super::*; |
|
|
| pub fn home_sections_to_json(sections: &[HomeSection]) -> Vec<serde_json::Value> { |
| sections |
| .iter() |
| .map(|s| serde_json::to_value(home_section_to_json(s)).unwrap_or_default()) |
| .collect() |
| } |
|
|
| pub fn home_section_to_json(s: &HomeSection) -> bex_types::media::HomeSection { |
| bex_types::media::HomeSection { |
| id: s.id.clone(), |
| title: s.title.clone(), |
| subtitle: s.subtitle.clone(), |
| items: s.items.iter().map(media_card_to_json).collect(), |
| next_page: s.next_page.clone(), |
| layout: format!("{:?}", s.layout).to_lowercase(), |
| show_rank: s.show_rank, |
| categories: s |
| .categories |
| .iter() |
| .map(|c| bex_types::media::CategoryLink { |
| id: c.id.clone(), |
| title: c.title.clone(), |
| subtitle: c.subtitle.clone(), |
| image: c.image.as_ref().map(image_to_json), |
| }) |
| .collect(), |
| extra: s |
| .extra |
| .iter() |
| .map(|a| (a.key.clone(), a.value.clone())) |
| .collect(), |
| } |
| } |
|
|
| pub fn media_card_to_json(c: &MediaCard) -> bex_types::media::MediaCard { |
| bex_types::media::MediaCard { |
| id: c.id.clone(), |
| title: c.title.clone(), |
| kind: c.kind.map(|k| format!("{:?}", k).to_lowercase()), |
| images: c.images.as_ref().map(image_set_to_json), |
| original_title: c.original_title.clone(), |
| tagline: c.tagline.clone(), |
| year: c.year.clone(), |
| score: c.score, |
| genres: c.genres.clone(), |
| status: c.status.map(|s| format!("{:?}", s).to_lowercase()), |
| content_rating: c.content_rating.clone(), |
| url: c.url.clone(), |
| ids: c |
| .ids |
| .iter() |
| .map(|id| bex_types::media::LinkedId { |
| source: id.source.clone(), |
| id: id.id.clone(), |
| }) |
| .collect(), |
| extra: c |
| .extra |
| .iter() |
| .map(|a| (a.key.clone(), a.value.clone())) |
| .collect(), |
| } |
| } |
|
|
| pub fn image_set_to_json(s: &ImageSet) -> bex_types::media::ImageSet { |
| bex_types::media::ImageSet { |
| low: s.low.as_ref().map(image_to_json), |
| medium: s.medium.as_ref().map(image_to_json), |
| high: s.high.as_ref().map(image_to_json), |
| backdrop: s.backdrop.as_ref().map(image_to_json), |
| logo: s.logo.as_ref().map(image_to_json), |
| } |
| } |
|
|
| pub fn image_to_json(i: &Image) -> bex_types::media::Image { |
| bex_types::media::Image { |
| url: i.url.clone(), |
| layout: format!("{:?}", i.layout).to_lowercase(), |
| width: i.width, |
| height: i.height, |
| blurhash: i.blurhash.clone(), |
| } |
| } |
|
|
| pub fn paged_result_to_json(r: &PagedResult) -> bex_types::media::PagedResult { |
| bex_types::media::PagedResult { |
| items: r.items.iter().map(media_card_to_json).collect(), |
| categories: r |
| .categories |
| .iter() |
| .map(|c| bex_types::media::CategoryLink { |
| id: c.id.clone(), |
| title: c.title.clone(), |
| subtitle: c.subtitle.clone(), |
| image: c.image.as_ref().map(image_to_json), |
| }) |
| .collect(), |
| next_page: r.next_page.clone(), |
| } |
| } |
|
|
| #[allow(clippy::too_many_lines)] |
| pub fn media_info_to_json(m: &MediaInfo) -> bex_types::media::MediaInfo { |
| bex_types::media::MediaInfo { |
| id: m.id.clone(), |
| title: m.title.clone(), |
| kind: format!("{:?}", m.kind).to_lowercase(), |
| images: m.images.as_ref().map(image_set_to_json), |
| original_title: m.original_title.clone(), |
| description: m.description.clone(), |
| score: m.score, |
| scored_by: m.scored_by, |
| year: m.year.clone(), |
| release_date: m.release_date.clone(), |
| genres: m.genres.clone(), |
| tags: m.tags.clone(), |
| status: m.status.map(|s| format!("{:?}", s).to_lowercase()), |
| content_rating: m.content_rating.clone(), |
| seasons: m |
| .seasons |
| .iter() |
| .map(|s| bex_types::media::Season { |
| id: s.id.clone(), |
| title: s.title.clone(), |
| number: s.number, |
| year: s.year, |
| episodes: s |
| .episodes |
| .iter() |
| .map(|e| bex_types::media::Episode { |
| id: e.id.clone(), |
| title: e.title.clone(), |
| number: e.number, |
| season: e.season, |
| images: e.images.as_ref().map(image_set_to_json), |
| description: e.description.clone(), |
| released: e.released.clone(), |
| score: e.score, |
| url: e.url.clone(), |
| tags: e.tags.clone(), |
| extra: e |
| .extra |
| .iter() |
| .map(|a| (a.key.clone(), a.value.clone())) |
| .collect(), |
| }) |
| .collect(), |
| }) |
| .collect(), |
| cast: m |
| .cast |
| .iter() |
| .map(|p| bex_types::media::Person { |
| id: p.id.clone(), |
| name: p.name.clone(), |
| image: p.image.as_ref().map(image_set_to_json), |
| role: p.role.clone(), |
| url: p.url.clone(), |
| }) |
| .collect(), |
| crew: m |
| .crew |
| .iter() |
| .map(|p| bex_types::media::Person { |
| id: p.id.clone(), |
| name: p.name.clone(), |
| image: p.image.as_ref().map(image_set_to_json), |
| role: p.role.clone(), |
| url: p.url.clone(), |
| }) |
| .collect(), |
| runtime_minutes: m.runtime_minutes, |
| trailer_url: m.trailer_url.clone(), |
| ids: m |
| .ids |
| .iter() |
| .map(|id| bex_types::media::LinkedId { |
| source: id.source.clone(), |
| id: id.id.clone(), |
| }) |
| .collect(), |
| studio: m.studio.clone(), |
| country: m.country.clone(), |
| language: m.language.clone(), |
| url: m.url.clone(), |
| extra: m |
| .extra |
| .iter() |
| .map(|a| (a.key.clone(), a.value.clone())) |
| .collect(), |
| } |
| } |
|
|
| pub fn servers_to_json(servers: &[Server]) -> Vec<bex_types::stream::Server> { |
| servers |
| .iter() |
| .map(|s| bex_types::stream::Server { |
| id: s.id.clone(), |
| label: s.label.clone(), |
| url: s.url.clone(), |
| priority: s.priority, |
| extra: s |
| .extra |
| .iter() |
| .map(|a| (a.key.clone(), a.value.clone())) |
| .collect(), |
| }) |
| .collect() |
| } |
|
|
| pub fn json_to_server(s: &bex_types::stream::Server) -> Server { |
| Server { |
| id: s.id.clone(), |
| label: s.label.clone(), |
| url: s.url.clone(), |
| priority: s.priority, |
| extra: s |
| .extra |
| .iter() |
| .map(|(k, v)| Attr { |
| key: k.clone(), |
| value: v.clone(), |
| }) |
| .collect(), |
| } |
| } |
|
|
| pub fn stream_source_to_json(s: &StreamSource) -> bex_types::stream::StreamSource { |
| bex_types::stream::StreamSource { |
| id: s.id.clone(), |
| label: s.label.clone(), |
| format: format!("{:?}", s.format).to_lowercase(), |
| manifest_url: s.manifest_url.clone(), |
| videos: s |
| .videos |
| .iter() |
| .map(|v| bex_types::stream::VideoTrack { |
| resolution: bex_types::stream::VideoResolution { |
| width: v.resolution.width, |
| height: v.resolution.height, |
| hdr: v.resolution.hdr, |
| label: v.resolution.label.clone(), |
| }, |
| url: v.url.clone(), |
| mime_type: v.mime_type.clone(), |
| bitrate: v.bitrate, |
| codecs: v.codecs.clone(), |
| }) |
| .collect(), |
| subtitles: s |
| .subtitles |
| .iter() |
| .map(|st| bex_types::stream::SubtitleTrack { |
| label: st.label.clone(), |
| url: st.url.clone(), |
| language: st.language.clone(), |
| format: st.format.clone(), |
| }) |
| .collect(), |
| headers: s |
| .headers |
| .iter() |
| .map(|a| (a.key.clone(), a.value.clone())) |
| .collect(), |
| extra: s |
| .extra |
| .iter() |
| .map(|a| (a.key.clone(), a.value.clone())) |
| .collect(), |
| } |
| } |
| } |
|
|