krystv's picture
Upload 107 files
3374e90 verified
use crate::config::EngineConfig;
use crate::host_state::HostState;
use crate::http_service::HttpHostService;
use crate::registry::PluginRegistry;
use bex_db::BexDb;
use bex_types::plugin_info::PluginInfo;
use bex_types::{BexError, Manifest};
use parking_lot::RwLock;
use std::path::Path;
use std::sync::atomic::{AtomicU8, AtomicUsize, Ordering};
use std::sync::Arc;
use wasmtime::component::{Component, Linker};
use wasmtime::{Engine as WasmtimeEngine, ResourceLimiter, Store};
// ── Bindgen: generate typed bindings from WIT ──────────────────────
wasmtime::component::bindgen!({
path: "../../wit",
world: "plugin",
});
// Re-export bindgen types for convenience
pub use crate::engine::bex::plugin::common::*;
pub use crate::engine::bex::plugin::http;
pub use crate::engine::bex::plugin::kv;
pub use crate::engine::bex::plugin::secrets;
pub use crate::engine::bex::plugin::log;
pub use crate::engine::bex::plugin::clock;
pub use crate::engine::bex::plugin::rng;
pub use crate::engine::bex::plugin::js;
// ── Engine State Machine ────────────────────────────────────────────
const STATE_NOT_READY: u8 = 0;
const STATE_READY: u8 = 1;
const STATE_DRAINING: u8 = 2;
const STATE_STOPPED: u8 = 3;
// ── Compile Cache ───────────────────────────────────────────────────
/// HMAC-authenticated compile cache. Stores compiled `.cwasm` files
/// on disk with an HMAC-SHA256 tag to detect tampering or stale cache.
struct CompileCache {
cache_dir: std::path::PathBuf,
key_material: Vec<u8>,
}
impl CompileCache {
fn new(cache_dir: std::path::PathBuf) -> Result<Self, BexError> {
std::fs::create_dir_all(&cache_dir)
.map_err(|e| BexError::Internal(format!("cache dir: {e}")))?;
// Derive HMAC key from a stable identifier
let key_material = b"bex-engine-compile-cache-v2-key".to_vec();
Ok(Self {
cache_dir,
key_material,
})
}
/// Compute the cache file path for a given WASM hash
fn cache_path(&self, wasm_hash: &[u8]) -> std::path::PathBuf {
let hex_hash = hex::encode(wasm_hash);
self.cache_dir.join(format!("{}.cwasm", hex_hash))
}
/// Compute HMAC tag for data
fn compute_tag(&self, data: &[u8]) -> Vec<u8> {
use hmac::{Hmac, Mac};
type HmacSha256 = Hmac<sha2::Sha256>;
let mut mac = HmacSha256::new_from_slice(&self.key_material)
.expect("HMAC can take key of any size");
mac.update(data);
mac.finalize().into_bytes().to_vec()
}
/// Try to load a cached compiled component.
/// Returns None if cache miss or if HMAC validation fails.
fn load(&self, wasm_bytes: &[u8]) -> Option<Vec<u8>> {
let wasm_hash = sha2_hash(wasm_bytes);
let path = self.cache_path(&wasm_hash);
let cached = std::fs::read(&path).ok()?;
// Format: [32 bytes HMAC tag][rest is serialized component]
if cached.len() < 32 {
return None;
}
let (stored_tag, component_data) = cached.split_at(32);
let expected_tag = self.compute_tag(component_data);
// Constant-time comparison to prevent timing attacks
if stored_tag.len() != expected_tag.len() {
return None;
}
let mut diff = 0u8;
for (a, b) in stored_tag.iter().zip(expected_tag.iter()) {
diff |= a ^ b;
}
if diff != 0 {
tracing::warn!("Compile cache HMAC mismatch, ignoring cached file");
return None;
}
Some(component_data.to_vec())
}
/// Store a compiled component in the cache.
fn store(&self, wasm_bytes: &[u8], component_data: &[u8]) {
let wasm_hash = sha2_hash(wasm_bytes);
let path = self.cache_path(&wasm_hash);
let tag = self.compute_tag(component_data);
let mut out = Vec::with_capacity(32 + component_data.len());
out.extend_from_slice(&tag);
out.extend_from_slice(component_data);
if let Err(e) = std::fs::write(&path, &out) {
tracing::warn!("Failed to write compile cache: {}", e);
}
}
}
fn sha2_hash(data: &[u8]) -> Vec<u8> {
use sha2::{Digest, Sha256};
let mut hasher = Sha256::new();
hasher.update(data);
hasher.finalize().to_vec()
}
/// Compile a WASM component, using the compile cache if available.
fn compile_or_cache(
wasmtime: &WasmtimeEngine,
wasm_bytes: &[u8],
cache: &CompileCache,
) -> Result<Component, BexError> {
// Try cache first
if let Some(cached_data) = cache.load(wasm_bytes) {
// Deserialize from cached compiled module (unsafe is OK here because
// we trust our own cache files which are HMAC-authenticated)
if let Ok(component) = unsafe { Component::deserialize(wasmtime, &cached_data) } {
tracing::debug!("Compile cache hit");
return Ok(component);
}
// If deserialization fails, fall through to recompile
tracing::warn!("Compile cache deserialize failed, recompiling");
}
// Compile from source
let component = Component::new(wasmtime, wasm_bytes)
.map_err(|e| BexError::Internal(format!("compile wasm: {e}")))?;
// Store in cache for future use
if let Ok(serialized) = component.serialize() {
cache.store(wasm_bytes, &serialized);
}
Ok(component)
}
// ── Epoch Ticker ────────────────────────────────────────────────────
/// Background thread that increments the Wasmtime epoch at regular intervals.
/// This enables wall-clock timeout enforcement via `store.set_epoch_deadline()`.
struct EpochTicker {
_handle: Option<std::thread::JoinHandle<()>>,
shutdown: Arc<std::sync::atomic::AtomicBool>,
}
impl EpochTicker {
fn start(engine: WasmtimeEngine, interval_ms: u64) -> Self {
let shutdown = Arc::new(std::sync::atomic::AtomicBool::new(false));
let shutdown_clone = shutdown.clone();
let eng = engine.clone();
let handle = std::thread::Builder::new()
.name("bex-epoch-ticker".to_string())
.spawn(move || {
let interval = std::time::Duration::from_millis(interval_ms);
while !shutdown_clone.load(Ordering::Relaxed) {
std::thread::sleep(interval);
eng.increment_epoch();
}
})
.expect("failed to spawn epoch ticker thread");
Self {
_handle: Some(handle),
shutdown,
}
}
}
impl Drop for EpochTicker {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Relaxed);
}
}
// ── Engine ─────────────────────────────────────────────────────────
struct EngineInner {
/// Kept alive via Arc — cloned into each HostState for block_on() access.
/// Field is read via `self.inner.runtime.clone()` in instantiate().
#[allow(dead_code)] // Suppress false positive: read through Arc clone in HostState::new()
runtime: Arc<tokio::runtime::Runtime>,
wasmtime: WasmtimeEngine,
linker: Linker<HostState>,
db: Arc<BexDb>,
http: Arc<HttpHostService>,
js_pool: Arc<bex_js::JsPool>,
config: Arc<EngineConfig>,
registry: RwLock<PluginRegistry>,
compile_cache: CompileCache,
state: AtomicU8,
active_calls: AtomicUsize,
start_time: std::time::Instant,
_epoch_ticker: EpochTicker,
_lock_file: std::fs::File,
}
#[derive(Clone)]
pub struct Engine {
inner: Arc<EngineInner>,
}
impl Engine {
pub fn new(config: EngineConfig) -> Result<Self, BexError> {
// Create data directory
std::fs::create_dir_all(&config.data_dir)
.map_err(|e| BexError::Internal(format!("data dir: {e}")))?;
// Acquire advisory file lock on data directory (fixes Issue #20)
let lock_path = config.data_dir.join(".bex.lock");
let lock_file = std::fs::OpenOptions::new()
.create(true)
.read(true)
.write(true)
.open(&lock_path)
.map_err(|e| BexError::Internal(format!("lock file: {e}")))?;
use fs4::fs_std::FileExt;
lock_file
.try_lock_exclusive()
.map_err(|e| {
BexError::Internal(format!(
"Another BEX engine instance is using this data directory: {e}"
))
})?;
// Configure Wasmtime with epoch interruption (fixes Problem #3)
let mut wt_config = wasmtime::Config::new();
wt_config.wasm_component_model(true);
wt_config.wasm_backtrace_details(wasmtime::WasmBacktraceDetails::Enable);
wt_config.consume_fuel(true);
wt_config.epoch_interruption(true);
wt_config.max_wasm_stack(4 * 1024 * 1024);
let wasmtime = WasmtimeEngine::new(&wt_config)
.map_err(|e| BexError::Internal(format!("wasmtime init: {e}")))?;
// Create ONE shared tokio runtime (fixes Problem #1)
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.map_err(|e| BexError::Internal(format!("tokio runtime: {e}")))?;
let runtime = Arc::new(runtime);
// Open database
let db = Arc::new(
BexDb::open(&config.data_dir).map_err(|e| BexError::Storage(e.to_string()))?,
);
// Create HTTP service with proper pool config (fixes Issues #16, #17)
let http = Arc::new(HttpHostService::new(
&config.user_agent,
config.http_timeout_ms,
config.http_pool_idle_timeout_ms,
config.http_pool_max_idle_per_host,
));
// Build linker ONCE (fixes Problem #2)
let linker = Self::build_linker(&wasmtime)?;
// Start epoch ticker thread
let epoch_ticker = EpochTicker::start(wasmtime.clone(), config.epoch_interval_ms);
// Compile cache directory
let cache_dir = config.data_dir.join("cache/wasm");
let compile_cache = CompileCache::new(cache_dir)?;
// Create JS worker pool (QuickJS integration)
// Wire JsPoolConfig from EngineConfig so users can control JS pool settings
let js_pool_config = bex_js::JsPoolConfig {
initial_workers: config.js_initial_workers,
max_workers: config.js_max_workers,
memory_limit_bytes: config.js_memory_limit_mb as usize * 1024 * 1024,
default_timeout_ms: config.js_timeout_ms,
context_idle_ttl_secs: config.js_context_idle_ttl_secs,
worker_idle_ttl_secs: config.js_worker_idle_ttl_secs,
max_stack_bytes: config.js_max_stack_bytes,
};
let js_pool = Arc::new(
bex_js::JsPool::new(js_pool_config)
.map_err(|e| BexError::Internal(format!("JS pool: {e}")))?,
);
// Plugins directory for on-disk WASM storage (fixes Problem #4)
let plugins_dir = config.data_dir.join("plugins/installed");
std::fs::create_dir_all(&plugins_dir)
.map_err(|e| BexError::Internal(format!("plugins dir: {e}")))?;
let config_arc = Arc::new(config);
let inner = Arc::new(EngineInner {
runtime,
wasmtime,
linker,
db,
http,
js_pool,
config: config_arc,
registry: RwLock::new(PluginRegistry::new()),
compile_cache,
state: AtomicU8::new(STATE_NOT_READY),
active_calls: AtomicUsize::new(0),
start_time: std::time::Instant::now(),
_epoch_ticker: epoch_ticker,
_lock_file: lock_file,
});
let engine = Self { inner };
// Reload previously installed plugins from database
engine.reload_from_db()?;
// Mark engine as ready
engine.inner.state.store(STATE_READY, Ordering::Release);
Ok(engine)
}
/// Build the linker once. This includes locked-down WASI and all BEX host interfaces.
fn build_linker(wasmtime: &WasmtimeEngine) -> Result<Linker<HostState>, BexError> {
let mut linker = Linker::<HostState>::new(wasmtime);
// Add WASI support with locked-down context (fixes Problem #6)
// We provide the WASI imports but HostState creates a WasiCtx with
// no inherited handles, no filesystem, no env, no sockets.
wasmtime_wasi::add_to_linker_sync(&mut linker)
.map_err(|e| BexError::Internal(format!("wasi linker: {e}")))?;
// Add BEX host interfaces
Plugin::add_to_linker(&mut linker, |state: &mut HostState| state)
.map_err(|e| BexError::Internal(format!("linker setup: {e}")))?;
Ok(linker)
}
/// Check engine state before accepting calls
fn check_ready(&self) -> Result<(), BexError> {
match self.inner.state.load(Ordering::Acquire) {
STATE_READY => Ok(()),
STATE_NOT_READY => Err(BexError::NotReady),
STATE_DRAINING => Err(BexError::Internal("engine is draining".into())),
STATE_STOPPED => Err(BexError::Internal("engine is stopped".into())),
_ => Err(BexError::Internal("invalid engine state".into())),
}
}
/// Reload all previously installed plugins from the database.
fn reload_from_db(&self) -> Result<(), BexError> {
let plugins = self
.inner
.db
.list_plugins()
.map_err(|e| BexError::Storage(e.to_string()))?;
for info in plugins {
// Get WASM blob
let wasm_bytes = match self.inner.db.get_wasm_blob(&info.id) {
Ok(Some(bytes)) => bytes,
_ => continue,
};
// Get manifest
let manifest = match self.inner.db.get_manifest(&info.id) {
Ok(Some(yaml)) => match serde_yaml::from_str::<Manifest>(&yaml) {
Ok(m) => m,
_ => continue,
},
_ => continue,
};
// Compile with cache (fixes Problem #5)
let component = match compile_or_cache(
&self.inner.wasmtime,
&wasm_bytes,
&self.inner.compile_cache,
) {
Ok(c) => Arc::new(c),
_ => continue,
};
// Store WASM on disk for future recompilation (fixes Problem #4)
let wasm_path = self.inner.config.data_dir.join(format!(
"plugins/installed/{}/plugin.wasm",
info.id.replace('.', "_")
));
if let Some(parent) = wasm_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(&wasm_path, &wasm_bytes);
let record = {
let mut r = crate::registry::PluginRecord::new(
manifest,
component,
self.inner.config.circuit_breaker_threshold,
self.inner.config.circuit_breaker_cooldown_ms,
);
r.enabled = info.enabled;
r
};
self.inner.registry.write().insert(Arc::new(record));
}
Ok(())
}
// ── Plugin Management ───────────────────────────────────────────
pub fn install_plugin(&self, path: &Path) -> Result<PluginInfo, BexError> {
self.check_ready()?;
let data = std::fs::read(path).map_err(|e| BexError::Internal(format!("read: {e}")))?;
self.install_bytes(&data)
}
/// Install a plugin from raw bytes (fixes Issue #13).
pub fn install_bytes(&self, data: &[u8]) -> Result<PluginInfo, BexError> {
self.check_ready()?;
let package = bex_pkg::unpack(data)?;
package
.manifest
.validate(&self.inner.config.host_version)?;
// Compile with cache
let component = compile_or_cache(
&self.inner.wasmtime,
&package.wasm,
&self.inner.compile_cache,
)?;
let component = Arc::new(component);
let id = package.manifest.id.clone();
// Store WASM on disk (fixes Problem #4: don't keep WASM in memory)
let wasm_path = self.inner.config.data_dir.join(format!(
"plugins/installed/{}/plugin.wasm",
id.replace('.', "_")
));
if let Some(parent) = wasm_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(&wasm_path, &package.wasm);
// Persist WASM blob and manifest to database
self.inner
.db
.save_wasm_blob(&id, &package.wasm)
.map_err(|e| BexError::Storage(e.to_string()))?;
let manifest_yaml = serde_yaml::to_string(&package.manifest)
.map_err(|e| BexError::Internal(e.to_string()))?;
self.inner
.db
.save_manifest(&id, &manifest_yaml)
.map_err(|e| BexError::Storage(e.to_string()))?;
let record = Arc::new(crate::registry::PluginRecord::new(
package.manifest,
component,
self.inner.config.circuit_breaker_threshold,
self.inner.config.circuit_breaker_cooldown_ms,
));
let info = record.to_plugin_info();
self.inner
.db
.save_plugin_info(&info)
.map_err(|e| BexError::Storage(e.to_string()))?;
self.inner.registry.write().insert(record);
Ok(info)
}
/// Install a plugin directly from WASM bytes and a manifest (no package format).
pub fn install_plugin_raw(
&self,
manifest: Manifest,
wasm_bytes: Vec<u8>,
) -> Result<PluginInfo, BexError> {
self.check_ready()?;
manifest.validate(&self.inner.config.host_version)?;
let component = compile_or_cache(
&self.inner.wasmtime,
&wasm_bytes,
&self.inner.compile_cache,
)?;
let component = Arc::new(component);
let id = manifest.id.clone();
// Store WASM on disk
let wasm_path = self.inner.config.data_dir.join(format!(
"plugins/installed/{}/plugin.wasm",
id.replace('.', "_")
));
if let Some(parent) = wasm_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(&wasm_path, &wasm_bytes);
// Persist
self.inner
.db
.save_wasm_blob(&id, &wasm_bytes)
.map_err(|e| BexError::Storage(e.to_string()))?;
let manifest_yaml = serde_yaml::to_string(&manifest)
.map_err(|e| BexError::Internal(e.to_string()))?;
self.inner
.db
.save_manifest(&id, &manifest_yaml)
.map_err(|e| BexError::Storage(e.to_string()))?;
let record = Arc::new(crate::registry::PluginRecord::new(
manifest,
component,
self.inner.config.circuit_breaker_threshold,
self.inner.config.circuit_breaker_cooldown_ms,
));
let info = record.to_plugin_info();
self.inner
.db
.save_plugin_info(&info)
.map_err(|e| BexError::Storage(e.to_string()))?;
self.inner.registry.write().insert(record);
Ok(info)
}
pub fn uninstall_plugin(&self, id: &str) -> Result<(), BexError> {
self.check_ready()?;
// Wait for active calls to drain (fixes Issue #9)
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(5);
while self.inner.active_calls.load(Ordering::Acquire) > 0 {
if std::time::Instant::now() > deadline {
return Err(BexError::Internal(
"timeout waiting for active calls to drain".into(),
));
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
// Remove WASM from disk
let wasm_path = self.inner.config.data_dir.join(format!(
"plugins/installed/{}/plugin.wasm",
id.replace('.', "_")
));
let _ = std::fs::remove_file(&wasm_path);
// Try to remove the directory too if empty
if let Some(parent) = wasm_path.parent() {
let _ = std::fs::remove_dir(parent);
}
// Evict plugin's JS context from the pool
self.inner.js_pool.evict_plugin(id);
self.inner.registry.write().remove(id);
self.inner
.db
.remove_plugin(id)
.map_err(|e| BexError::Storage(e.to_string()))?;
Ok(())
}
pub fn list_plugins(&self) -> Vec<PluginInfo> {
self.inner.registry.read().list()
}
pub fn get_plugin_info(&self, id: &str) -> Option<PluginInfo> {
self.inner.registry.read().get(id).map(|r| r.to_plugin_info())
}
pub fn enable_plugin(&self, id: &str) -> Result<(), BexError> {
self.check_ready()?;
let mut reg = self.inner.registry.write();
let old = reg
.remove(id)
.ok_or_else(|| BexError::PluginNotFound(id.into()))?;
// Create new record with enabled=true
let new_record = Arc::new(crate::registry::PluginRecord {
id: old.id.clone(),
manifest: old.manifest.clone(),
capabilities: old.capabilities,
enabled: true,
installed_at: old.installed_at,
component: old.component.clone(),
health: crate::registry::PluginHealth::new(
self.inner.config.circuit_breaker_threshold,
self.inner.config.circuit_breaker_cooldown_ms,
),
});
let info = new_record.to_plugin_info();
reg.insert(new_record);
drop(reg);
self.inner
.db
.save_plugin_info(&info)
.map_err(|e| BexError::Storage(e.to_string()))?;
Ok(())
}
pub fn disable_plugin(&self, id: &str) -> Result<(), BexError> {
self.check_ready()?;
let mut reg = self.inner.registry.write();
let old = reg
.remove(id)
.ok_or_else(|| BexError::PluginNotFound(id.into()))?;
let new_record = Arc::new(crate::registry::PluginRecord {
id: old.id.clone(),
manifest: old.manifest.clone(),
capabilities: old.capabilities,
enabled: false,
installed_at: old.installed_at,
component: old.component.clone(),
health: crate::registry::PluginHealth::new(
self.inner.config.circuit_breaker_threshold,
self.inner.config.circuit_breaker_cooldown_ms,
),
});
let info = new_record.to_plugin_info();
reg.insert(new_record);
drop(reg);
self.inner
.db
.save_plugin_info(&info)
.map_err(|e| BexError::Storage(e.to_string()))?;
Ok(())
}
// ── Secret / API Key Management ────────────────────────────────
/// Set a secret/API key for a plugin.
pub fn secret_set(&self, plugin_id: &str, key: &str, value: &str) -> Result<(), BexError> {
self.inner
.db
.secret_set(plugin_id, key, value)
.map_err(|e| BexError::Storage(e.to_string()))
}
/// Get a secret/API key for a plugin.
pub fn secret_get(&self, plugin_id: &str, key: &str) -> Result<Option<String>, BexError> {
self.inner
.db
.secret_get(plugin_id, key)
.map_err(|e| BexError::Storage(e.to_string()))
}
/// Delete a secret/API key for a plugin.
pub fn secret_remove(&self, plugin_id: &str, key: &str) -> Result<bool, BexError> {
self.inner
.db
.secret_remove(plugin_id, key)
.map_err(|e| BexError::Storage(e.to_string()))
}
/// List all secret keys for a plugin.
pub fn secret_keys(&self, plugin_id: &str) -> Result<Vec<String>, BexError> {
self.inner
.db
.secret_keys(plugin_id, "")
.map_err(|e| BexError::Storage(e.to_string()))
}
// ── Instantiation with epoch deadline ───────────────────────────
fn instantiate(&self, plugin_id: &str) -> Result<(Plugin, Store<HostState>), BexError> {
self.check_ready()?;
let record = self
.inner
.registry
.read()
.get(plugin_id)
.cloned()
.ok_or_else(|| BexError::PluginNotFound(plugin_id.into()))?;
if !record.enabled {
return Err(BexError::PluginDisabled(plugin_id.into()));
}
// Circuit breaker check
if !record.health.is_available() {
return Err(BexError::PluginFault(format!(
"plugin '{}' is circuit-broken (too many consecutive failures)",
plugin_id
)));
}
let component = record.component.clone();
// Create store
let mut store = Store::new(
&self.inner.wasmtime,
HostState::new(
self.inner.http.clone(),
self.inner.db.clone(),
self.inner.js_pool.clone(),
self.inner.runtime.clone(),
plugin_id.to_string(),
record.manifest.clone(),
Arc::from(self.inner.config.user_agent.as_str()),
self.inner.config.http_timeout_ms,
self.inner.config.max_response_bytes,
self.inner.config.memory_limit_mb,
),
);
// Set fuel budget
store
.set_fuel(self.inner.config.fuel_per_call)
.map_err(|_| BexError::FuelExhausted)?;
// Set epoch deadline for wall-clock timeout (fixes Problem #3)
let epoch_deadline = (self.inner.config.call_timeout_ms as u64
/ self.inner.config.epoch_interval_ms.max(1))
.max(1);
store.set_epoch_deadline(epoch_deadline);
// Set resource limiter from HostState (fixes Problem #6 - memory limiter)
store.limiter(|state: &mut HostState| -> &mut dyn ResourceLimiter {
&mut state.limiter
});
// Increment active calls counter (fixes Issue #9)
self.inner.active_calls.fetch_add(1, Ordering::AcqRel);
// Instantiate using the pre-built linker (fixes Problem #2)
let instance = self
.inner
.linker
.instantiate(&mut store, &component)
.map_err(|e| classify_trap(e, plugin_id))?;
let plugin = Plugin::new(&mut store, &instance)
.map_err(|e| BexError::PluginFault(format!("bind: {e}")))?;
Ok((plugin, store))
}
/// Wrap a plugin call with panic catching, circuit breaker, and active call tracking.
fn call_plugin_inner<R>(
&self,
plugin_id: &str,
call: impl FnOnce(Plugin, &mut Store<HostState>) -> wasmtime::Result<Result<R, PluginError>>,
) -> Result<R, BexError>
where
R: Send + 'static,
{
let (instance, mut store) = self.instantiate(plugin_id)?;
// Execute the call with panic catching (fixes stability issue)
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
call(instance, &mut store)
}));
// Decrement active calls
self.inner.active_calls.fetch_sub(1, Ordering::AcqRel);
// Get the plugin record for circuit breaker
let record = self.inner.registry.read().get(plugin_id).cloned();
match result {
Ok(Ok(Ok(val))) => {
// Success — record for circuit breaker
if let Some(r) = record {
r.health.record_success();
}
Ok(val)
}
Ok(Ok(Err(plugin_err))) => {
// Plugin returned a structured error (fixes Issue #14)
if let Some(r) = record {
r.health.record_failure();
}
Err(classify_plugin_error(plugin_err))
}
Ok(Err(trap)) => {
// Wasmtime trap — convert to BexError
if let Some(r) = record {
r.health.record_failure();
}
Err(classify_trap(trap, plugin_id))
}
Err(_panic_payload) => {
// Rust panic inside Wasmtime — auto-disable plugin
tracing::error!(plugin = %plugin_id, "WASM call panicked");
let _ = self.disable_plugin(plugin_id);
Err(BexError::PluginFault(
"panic — plugin auto-disabled".into(),
))
}
}
}
// ── Typed plugin API calls ────────────────────────────────────────
pub fn call_get_home(
&self,
plugin_id: &str,
ctx: &RequestContext,
) -> Result<Vec<HomeSection>, BexError> {
self.call_plugin_inner(plugin_id, |instance, store| {
instance.api().call_get_home(store, ctx)
})
}
pub fn call_search(
&self,
plugin_id: &str,
ctx: &RequestContext,
query: &str,
filters: &SearchFilters,
) -> Result<PagedResult, BexError> {
self.call_plugin_inner(plugin_id, |instance, store| {
instance.api().call_search(store, ctx, query, filters)
})
}
pub fn call_get_info(
&self,
plugin_id: &str,
ctx: &RequestContext,
id: &str,
) -> Result<MediaInfo, BexError> {
self.call_plugin_inner(plugin_id, |instance, store| {
instance.api().call_get_info(store, ctx, id)
})
}
pub fn call_get_servers(
&self,
plugin_id: &str,
ctx: &RequestContext,
id: &str,
) -> Result<Vec<Server>, BexError> {
self.call_plugin_inner(plugin_id, |instance, store| {
instance.api().call_get_servers(store, ctx, id)
})
}
pub fn call_resolve_stream(
&self,
plugin_id: &str,
ctx: &RequestContext,
server: &Server,
) -> Result<StreamSource, BexError> {
self.call_plugin_inner(plugin_id, |instance, store| {
instance.api().call_resolve_stream(store, ctx, server)
})
}
pub fn call_search_subtitles(
&self,
plugin_id: &str,
ctx: &RequestContext,
query: &SubtitleQuery,
) -> Result<Vec<SubtitleEntry>, BexError> {
self.call_plugin_inner(plugin_id, |instance, store| {
instance.api().call_search_subtitles(store, ctx, query)
})
}
pub fn call_download_subtitle(
&self,
plugin_id: &str,
ctx: &RequestContext,
id: &str,
) -> Result<SubtitleFile, BexError> {
self.call_plugin_inner(plugin_id, |instance, store| {
instance.api().call_download_subtitle(store, ctx, id)
})
}
pub fn call_get_articles(
&self,
plugin_id: &str,
ctx: &RequestContext,
) -> Result<Vec<ArticleSection>, BexError> {
self.call_plugin_inner(plugin_id, |instance, store| {
instance.api().call_get_articles(store, ctx)
})
}
pub fn call_search_articles(
&self,
plugin_id: &str,
ctx: &RequestContext,
query: &str,
) -> Result<Vec<Article>, BexError> {
self.call_plugin_inner(plugin_id, |instance, store| {
instance.api().call_search_articles(store, ctx, query)
})
}
// ── Convenience: default context ────────────────────────────────
pub fn default_ctx() -> RequestContext {
RequestContext {
request_id: format!(
"{:x}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis()
),
locale: None,
region: None,
safe_mode: false,
hints: vec![],
}
}
// ── JSON-based calls for the Pure C ABI FFI layer ───────────────
pub fn call_get_home_json(&self, plugin_id: &str) -> Result<String, BexError> {
let result = self.call_get_home(plugin_id, &Self::default_ctx())?;
serde_json::to_string(&convert::home_sections_to_json(&result))
.map_err(|e| BexError::Internal(e.to_string()))
}
pub fn call_search_json(&self, plugin_id: &str, query: &str) -> Result<String, BexError> {
let filters = SearchFilters {
kind: None,
page: PageCursor {
token: None,
limit: None,
},
fast_match: false,
};
let result = self.call_search(plugin_id, &Self::default_ctx(), query, &filters)?;
serde_json::to_string(&convert::paged_result_to_json(&result))
.map_err(|e| BexError::Internal(e.to_string()))
}
pub fn call_get_info_json(
&self,
plugin_id: &str,
media_id: &str,
) -> Result<String, BexError> {
let result = self.call_get_info(plugin_id, &Self::default_ctx(), media_id)?;
serde_json::to_string(&convert::media_info_to_json(&result))
.map_err(|e| BexError::Internal(e.to_string()))
}
pub fn call_get_servers_json(
&self,
plugin_id: &str,
id: &str,
) -> Result<String, BexError> {
let result = self.call_get_servers(plugin_id, &Self::default_ctx(), id)?;
serde_json::to_string(&convert::servers_to_json(&result))
.map_err(|e| BexError::Internal(e.to_string()))
}
pub fn call_resolve_stream_json(
&self,
plugin_id: &str,
server_json: &str,
) -> Result<String, BexError> {
let server_json_val: bex_types::stream::Server = serde_json::from_str(server_json)
.map_err(|e| BexError::Internal(format!("parse server: {e}")))?;
let server = convert::json_to_server(&server_json_val);
let result = self.call_resolve_stream(plugin_id, &Self::default_ctx(), &server)?;
serde_json::to_string(&convert::stream_source_to_json(&result))
.map_err(|e| BexError::Internal(e.to_string()))
}
// ── Stats and Shutdown ───────────────────────────────────────────
pub fn stats(&self) -> bex_types::engine_types::EngineStats {
let reg = self.inner.registry.read();
let total = reg.list().len();
let enabled = reg.list().iter().filter(|p| p.enabled).count();
let active = self.inner.active_calls.load(Ordering::Acquire);
bex_types::engine_types::EngineStats {
uptime_ms: self.inner.start_time.elapsed().as_millis() as u64,
total_plugins: total,
enabled_plugins: enabled,
active_calls: active,
}
}
pub fn shutdown(&self) {
tracing::info!("BEX Engine shutting down...");
// Transition to draining state (fixes Issue #8)
self.inner.state.store(STATE_DRAINING, Ordering::Release);
// Wait for active calls to drain (brief window for CLI responsiveness)
let deadline = std::time::Instant::now() + std::time::Duration::from_secs(2);
while self.inner.active_calls.load(Ordering::Acquire) > 0 {
if std::time::Instant::now() > deadline {
tracing::warn!(
"Shutdown: {} active calls still running after 2s timeout",
self.inner.active_calls.load(Ordering::Acquire)
);
break;
}
std::thread::sleep(std::time::Duration::from_millis(50));
}
// Mark as stopped
self.inner.state.store(STATE_STOPPED, Ordering::Release);
tracing::info!("BEX Engine shut down complete");
}
}
// ── Trap classification ─────────────────────────────────────────────
/// Classify a Wasmtime trap into a structured BexError.
fn classify_trap(error: wasmtime::Error, plugin_id: &str) -> BexError {
let msg = error.to_string();
// Check for epoch interruption (timeout)
if msg.contains("epoch") || msg.contains("timeout") {
return BexError::Timeout { ms: 0 };
}
// Check for fuel exhaustion
if msg.contains("fuel") {
return BexError::FuelExhausted;
}
// Check for out-of-memory
if msg.contains("memory") && (msg.contains("grow") || msg.contains("limit")) {
return BexError::PluginFault(format!("out of memory: {}", plugin_id));
}
// Default: generic plugin fault
BexError::PluginFault(msg)
}
/// Classify a plugin error from the WIT bindgen into a structured BexError.
fn classify_plugin_error(err: PluginError) -> BexError {
match err {
PluginError::Network(msg) => BexError::Network(msg),
PluginError::Parse(msg) => BexError::PluginError(msg),
PluginError::NotFound => BexError::PluginError("not found".into()),
PluginError::Unauthorized => BexError::PluginError("unauthorized".into()),
PluginError::Forbidden => BexError::PluginError("forbidden".into()),
PluginError::RateLimited(Some(secs)) => BexError::Timeout {
ms: secs * 1000,
},
PluginError::RateLimited(None) => BexError::Timeout { ms: 0 },
PluginError::Timeout => BexError::Timeout { ms: 0 },
PluginError::Cancelled => BexError::Cancelled,
PluginError::Unsupported => {
BexError::Unsupported("plugin does not support this operation".into())
}
PluginError::InvalidInput(msg) => BexError::PluginError(msg),
PluginError::Internal(msg) => BexError::PluginError(msg),
}
}
// ── Conversion between bindgen types and JSON-serializable types ─────
pub mod convert {
use super::*;
pub fn home_sections_to_json(sections: &[HomeSection]) -> Vec<serde_json::Value> {
sections
.iter()
.map(|s| serde_json::to_value(home_section_to_json(s)).unwrap_or_default())
.collect()
}
pub fn home_section_to_json(s: &HomeSection) -> bex_types::media::HomeSection {
bex_types::media::HomeSection {
id: s.id.clone(),
title: s.title.clone(),
subtitle: s.subtitle.clone(),
items: s.items.iter().map(media_card_to_json).collect(),
next_page: s.next_page.clone(),
layout: format!("{:?}", s.layout).to_lowercase(),
show_rank: s.show_rank,
categories: s
.categories
.iter()
.map(|c| bex_types::media::CategoryLink {
id: c.id.clone(),
title: c.title.clone(),
subtitle: c.subtitle.clone(),
image: c.image.as_ref().map(image_to_json),
})
.collect(),
extra: s
.extra
.iter()
.map(|a| (a.key.clone(), a.value.clone()))
.collect(),
}
}
pub fn media_card_to_json(c: &MediaCard) -> bex_types::media::MediaCard {
bex_types::media::MediaCard {
id: c.id.clone(),
title: c.title.clone(),
kind: c.kind.map(|k| format!("{:?}", k).to_lowercase()),
images: c.images.as_ref().map(image_set_to_json),
original_title: c.original_title.clone(),
tagline: c.tagline.clone(),
year: c.year.clone(),
score: c.score,
genres: c.genres.clone(),
status: c.status.map(|s| format!("{:?}", s).to_lowercase()),
content_rating: c.content_rating.clone(),
url: c.url.clone(),
ids: c
.ids
.iter()
.map(|id| bex_types::media::LinkedId {
source: id.source.clone(),
id: id.id.clone(),
})
.collect(),
extra: c
.extra
.iter()
.map(|a| (a.key.clone(), a.value.clone()))
.collect(),
}
}
pub fn image_set_to_json(s: &ImageSet) -> bex_types::media::ImageSet {
bex_types::media::ImageSet {
low: s.low.as_ref().map(image_to_json),
medium: s.medium.as_ref().map(image_to_json),
high: s.high.as_ref().map(image_to_json),
backdrop: s.backdrop.as_ref().map(image_to_json),
logo: s.logo.as_ref().map(image_to_json),
}
}
pub fn image_to_json(i: &Image) -> bex_types::media::Image {
bex_types::media::Image {
url: i.url.clone(),
layout: format!("{:?}", i.layout).to_lowercase(),
width: i.width,
height: i.height,
blurhash: i.blurhash.clone(),
}
}
pub fn paged_result_to_json(r: &PagedResult) -> bex_types::media::PagedResult {
bex_types::media::PagedResult {
items: r.items.iter().map(media_card_to_json).collect(),
categories: r
.categories
.iter()
.map(|c| bex_types::media::CategoryLink {
id: c.id.clone(),
title: c.title.clone(),
subtitle: c.subtitle.clone(),
image: c.image.as_ref().map(image_to_json),
})
.collect(),
next_page: r.next_page.clone(),
}
}
#[allow(clippy::too_many_lines)]
pub fn media_info_to_json(m: &MediaInfo) -> bex_types::media::MediaInfo {
bex_types::media::MediaInfo {
id: m.id.clone(),
title: m.title.clone(),
kind: format!("{:?}", m.kind).to_lowercase(),
images: m.images.as_ref().map(image_set_to_json),
original_title: m.original_title.clone(),
description: m.description.clone(),
score: m.score,
scored_by: m.scored_by,
year: m.year.clone(),
release_date: m.release_date.clone(),
genres: m.genres.clone(),
tags: m.tags.clone(),
status: m.status.map(|s| format!("{:?}", s).to_lowercase()),
content_rating: m.content_rating.clone(),
seasons: m
.seasons
.iter()
.map(|s| bex_types::media::Season {
id: s.id.clone(),
title: s.title.clone(),
number: s.number,
year: s.year,
episodes: s
.episodes
.iter()
.map(|e| bex_types::media::Episode {
id: e.id.clone(),
title: e.title.clone(),
number: e.number,
season: e.season,
images: e.images.as_ref().map(image_set_to_json),
description: e.description.clone(),
released: e.released.clone(),
score: e.score,
url: e.url.clone(),
tags: e.tags.clone(),
extra: e
.extra
.iter()
.map(|a| (a.key.clone(), a.value.clone()))
.collect(),
})
.collect(),
})
.collect(),
cast: m
.cast
.iter()
.map(|p| bex_types::media::Person {
id: p.id.clone(),
name: p.name.clone(),
image: p.image.as_ref().map(image_set_to_json),
role: p.role.clone(),
url: p.url.clone(),
})
.collect(),
crew: m
.crew
.iter()
.map(|p| bex_types::media::Person {
id: p.id.clone(),
name: p.name.clone(),
image: p.image.as_ref().map(image_set_to_json),
role: p.role.clone(),
url: p.url.clone(),
})
.collect(),
runtime_minutes: m.runtime_minutes,
trailer_url: m.trailer_url.clone(),
ids: m
.ids
.iter()
.map(|id| bex_types::media::LinkedId {
source: id.source.clone(),
id: id.id.clone(),
})
.collect(),
studio: m.studio.clone(),
country: m.country.clone(),
language: m.language.clone(),
url: m.url.clone(),
extra: m
.extra
.iter()
.map(|a| (a.key.clone(), a.value.clone()))
.collect(),
}
}
pub fn servers_to_json(servers: &[Server]) -> Vec<bex_types::stream::Server> {
servers
.iter()
.map(|s| bex_types::stream::Server {
id: s.id.clone(),
label: s.label.clone(),
url: s.url.clone(),
priority: s.priority,
extra: s
.extra
.iter()
.map(|a| (a.key.clone(), a.value.clone()))
.collect(),
})
.collect()
}
pub fn json_to_server(s: &bex_types::stream::Server) -> Server {
Server {
id: s.id.clone(),
label: s.label.clone(),
url: s.url.clone(),
priority: s.priority,
extra: s
.extra
.iter()
.map(|(k, v)| Attr {
key: k.clone(),
value: v.clone(),
})
.collect(),
}
}
pub fn stream_source_to_json(s: &StreamSource) -> bex_types::stream::StreamSource {
bex_types::stream::StreamSource {
id: s.id.clone(),
label: s.label.clone(),
format: format!("{:?}", s.format).to_lowercase(),
manifest_url: s.manifest_url.clone(),
videos: s
.videos
.iter()
.map(|v| bex_types::stream::VideoTrack {
resolution: bex_types::stream::VideoResolution {
width: v.resolution.width,
height: v.resolution.height,
hdr: v.resolution.hdr,
label: v.resolution.label.clone(),
},
url: v.url.clone(),
mime_type: v.mime_type.clone(),
bitrate: v.bitrate,
codecs: v.codecs.clone(),
})
.collect(),
subtitles: s
.subtitles
.iter()
.map(|st| bex_types::stream::SubtitleTrack {
label: st.label.clone(),
url: st.url.clone(),
language: st.language.clone(),
format: st.format.clone(),
})
.collect(),
headers: s
.headers
.iter()
.map(|a| (a.key.clone(), a.value.clone()))
.collect(),
extra: s
.extra
.iter()
.map(|a| (a.key.clone(), a.value.clone()))
.collect(),
}
}
}