Event Loop
This document describes how the V8 runtime communicates with the Rust event loop to handle asynchronous operations.
Quick Summary
| Direction | Channel | Purpose |
|---|---|---|
| JS → Rust | SchedulerMessage | Request async operations |
| Rust → JS | CallbackMessage | Return results to callbacks |
Overview
The event loop bridges JavaScript’s async world with Rust’s tokio runtime:
┌─────────────────────────────────────────────────────────────────┐
│ V8 Runtime (sync) │
│ │
│ fetch('https://...') │
│ │ │
│ ▼ │
│ __nativeFetchStreaming(request, callback) │
│ │ │
│ ├─► Store callback in fetch_callbacks[callback_id] │
│ │ │
│ └─► scheduler_tx.send(FetchStreaming(callback_id, req)) │
└─────────────────────────────────────────────────────────────────┘
│
mpsc::UnboundedChannel
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Event Loop (async tokio) │
│ │
│ while let Some(msg) = scheduler_rx.recv().await { │
│ match msg { │
│ FetchStreaming(id, req) => { │
│ tokio::spawn(async { │
│ let result = ops.handle(Fetch(req)).await; │
│ callback_tx.send(FetchSuccess(id, result)); │
│ }); │
│ } │
│ } │
│ } │
└─────────────────────────────────────────────────────────────────┘
│
mpsc::UnboundedChannel
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ V8 Runtime (process_callbacks) │
│ │
│ while let Ok(msg) = callback_rx.try_recv() { │
│ match msg { │
│ FetchSuccess(id, result) => { │
│ let callback = fetch_callbacks.remove(id); │
│ callback.call(result); │
│ } │
│ } │
│ } │
└─────────────────────────────────────────────────────────────────┘ Message Types
SchedulerMessage (JS → Rust)
Sent from V8 to request async operations:
pub enum SchedulerMessage {
// Timers
ScheduleTimeout(CallbackId, u64), // setTimeout
ScheduleInterval(CallbackId, u64), // setInterval
ClearTimer(CallbackId), // clearTimeout/clearInterval
// Fetch
FetchStreaming(CallbackId, HttpRequest),
// Bindings
BindingFetch(CallbackId, String, HttpRequest), // Assets binding
BindingStorage(CallbackId, String, StorageOp), // Storage binding
BindingKv(CallbackId, String, KvOp), // KV binding
// Streams
StreamRead(CallbackId, StreamId), // Read next chunk
StreamCancel(StreamId), // Cancel stream
// Logging
Log(LogLevel, String), // Fire-and-forget
Shutdown, // Cleanup
} CallbackMessage (Rust → JS)
Sent from event loop back to V8 with results:
pub enum CallbackMessage {
// Timers
ExecuteTimeout(CallbackId),
ExecuteInterval(CallbackId),
// Fetch
FetchError(CallbackId, String),
FetchStreamingSuccess(CallbackId, HttpResponseMeta, StreamId),
// Streams
StreamChunk(CallbackId, StreamChunk),
// Bindings
StorageResult(CallbackId, StorageResult),
KvResult(CallbackId, KvResult),
} Callback Pattern
All async operations follow the same pattern:
1. JS calls native function
// User code
const response = await fetch('https://api.example.com');
// Internally becomes:
__nativeFetchStreaming(request, (result) => {
// This callback is stored and invoked later
resolve(result);
}); 2. Native function stores callback
fn native_fetch_streaming(/* ... */) {
// Generate unique ID
let callback_id = next_callback_id();
// Store JS callback for later
fetch_callbacks.insert(callback_id, callback);
// Send request to event loop
scheduler_tx.send(FetchStreaming(callback_id, request));
} 3. Event loop processes request
// In run_event_loop()
SchedulerMessage::FetchStreaming(callback_id, request) => {
tokio::spawn(async move {
// Execute via OperationsHandler
let result = ops.handle(Operation::Fetch(request)).await;
// Send result back
callback_tx.send(FetchStreamingSuccess(callback_id, result));
});
} 4. Runtime invokes callback
// In process_callbacks()
CallbackMessage::FetchStreamingSuccess(callback_id, meta, stream_id) => {
// Retrieve stored callback
let callback = fetch_callbacks.remove(&callback_id);
// Call JavaScript function with result
callback.call(scope, &[meta_obj.into()]);
} Timer Implementation
setTimeout
setTimeout(fn, 100)
│
▼
ScheduleTimeout(id, 100) ──► Event Loop
│
▼
tokio::spawn(async {
sleep(100ms).await;
callback_tx.send(ExecuteTimeout(id));
})
│
▼
ExecuteTimeout(id) ──► __executeTimer(id)
│
▼
Calls stored fn setInterval
Similar, but the spawned task loops:
tokio::spawn(async move {
let mut interval = tokio::time::interval(duration);
interval.tick().await; // Skip first immediate tick
loop {
interval.tick().await;
if callback_tx.send(ExecuteInterval(id)).is_err() {
break; // Channel closed, stop
}
}
}); clearTimeout / clearInterval
SchedulerMessage::ClearTimer(callback_id) => {
if let Some(handle) = running_tasks.remove(&callback_id) {
handle.abort(); // Cancel the tokio task
}
} Binding Operations
Bindings (Storage, KV) follow the same pattern:
env.KV.get('key')
│
▼
__nativeBindingKv('KV', 'get', {key}, callback)
│
▼
BindingKv(id, 'KV', KvOp::Get{key}) ──► Event Loop
│
▼
ops.handle(BindingKv{...})
│
▼
Runner executes with credentials
│
▼
KvResult(id, result) ──► callback(result) The binding name ('KV') is passed to the Runner, which looks up the actual credentials and executes the operation.
Stream Handling
Streams require multiple round-trips:
fetch(...).then(r => r.body.getReader())
│
▼
FetchStreaming ──► Event Loop
│
▼
Create StreamId, start background task
│
▼
FetchStreamingSuccess(id, meta, stream_id)
│
▼
reader.read() ──► StreamRead(callback_id, stream_id)
│
▼
Read from mpsc channel
│
▼
StreamChunk(callback_id, Data(bytes))
│
▼
... repeat until Done ... Two Loops Architecture
There are actually two separate loops that work together:
1. Scheduler Loop (background task)
Runs continuously in the background, spawning async tasks:
// runtime/mod.rs - run_event_loop()
async fn run_event_loop(scheduler_rx, callback_tx, ops) {
loop {
select! {
msg = scheduler_rx.recv() => {
match msg {
FetchStreaming(id, req) => {
tokio::spawn(async {
let result = ops.handle(Fetch(req)).await;
callback_tx.send(FetchSuccess(id, result));
});
}
ScheduleTimeout(id, ms) => { /* spawn timer */ }
}
}
}
}
} 2. Execution Loop (in exec())
Polls for results and checks termination conditions:
// worker.rs - exec()
async fn exec(&mut self, task: Task) {
self.trigger_fetch_event(request); // Start JS execution
for iteration in 0..5000 {
// Check termination (timeout, CPU limit)
if wall_guard.was_triggered() {
return Err(WallClockTimeout);
}
// Process available callbacks (NON-BLOCKING)
self.runtime.process_callbacks();
// Check if response is ready
if response_ready { break; }
// Wait for scheduler to signal callback ready (event-driven)
tokio::select! {
_ = callback_notify.notified() => {} // Wake immediately
_ = tokio::time::sleep(100ms) => {} // Periodic guard check
}
}
} Why Two Loops?
┌──────────────────────────────────────────────────────────────────┐
│ exec() loop │
│ │
│ ┌─────────┐ ┌───────────────────┐ ┌─────────┐ │
│ │ Check │───►│ process_callbacks │───►│ Check │───► select!│
│ │ timeout │ │ (try_recv) │ │ ready │ │ │
│ └─────────┘ └───────────────────┘ └─────────┘ │ │
│ │ ▲ │ │
│ │ │ results + notify ◄─────────────┘ │
│ │ │ │
│ ▼ │ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Scheduler (background task) │ │
│ │ │ │
│ │ recv() ──► spawn fetch ──► await reqwest ──► send() │ │
│ │ + notify │ │
│ └──────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘ - Scheduler: Handles async I/O, notifies exec() when results are ready
- exec() loop: Waits for notifications, enforces timeouts, controls V8
The select! in exec() is event-driven: when the scheduler sends a callback, it also calls notify_one() to wake up the exec() loop immediately. No polling, no wasted CPU.
Integration Points
process_callbacks()
Called periodically during execution to:
- Pump V8 message loop - Process V8 internal tasks
- Process CallbackMessages - Invoke JS callbacks with results
- Run microtasks - Execute Promise continuations
pub fn process_callbacks(&mut self) {
// 1. V8 internal tasks (Atomics, WebAssembly, etc.)
while v8::Platform::pump_message_loop(platform, scope, false) {}
// 2. Our custom callbacks (NON-BLOCKING: try_recv returns immediately)
while let Ok(msg) = self.callback_rx.try_recv() {
match msg { /* ... */ }
}
// 3. Microtasks (Promises, async/await)
scope.perform_microtask_checkpoint();
} Important: try_recv() is non-blocking - it returns Err(Empty) immediately if no messages are available. This allows the exec() loop to check for timeouts and response readiness without waiting.
OperationsHandler
All I/O goes through the Runner’s OperationsHandler:
pub enum Operation {
Fetch(HttpRequest),
BindingFetch { binding: String, request: HttpRequest },
BindingStorage { binding: String, op: StorageOp },
BindingKv { binding: String, op: KvOp },
Log { level: LogLevel, message: String },
} This allows the Runner to:
- Inject credentials for bindings
- Apply rate limits
- Log operations
- Enforce security policies
Key Files
| File | Purpose |
|---|---|
openworkers-runtime-v8/src/worker.rs | Worker struct, exec() loop |
openworkers-runtime-v8/src/runtime/mod.rs | Runtime struct, scheduler, process_callbacks |
openworkers-runtime-v8/src/runtime/bindings/ | Native V8 functions |
openworkers-runtime-v8/src/runtime/stream_manager.rs | Stream coordination |
openworkers-core/src/ops.rs | Operation, OperationResult enums |
Design Decisions
Unbounded Channels
We use mpsc::unbounded_channel for simplicity. Backpressure is handled at higher levels (stream buffer, semaphore for worker pool).
Fire-and-Forget Logging
Log messages don’t return a result - they’re sent and forgotten. The Runner handles delivery to NATS.
Callback Storage
Callbacks are stored in HashMap<CallbackId, v8::Global<v8::Function>>. The v8::Global prevents garbage collection while the operation is pending.