Skip to content

Streaming

The GraphRunner supports two execution modes: run() (returns the final state) and stream() (yields typed events as they occur). Streaming enables real-time monitoring, token-by-token output, and reactive UIs without polling.

import { GraphRunner, isTerminalEvent } from '@cycgraph/orchestrator';
const runner = new GraphRunner(graph, initialState, options);
for await (const event of runner.stream()) {
switch (event.type) {
case 'node:start':
console.log(`Starting ${event.node_id}`);
break;
case 'agent:token_delta':
process.stdout.write(event.token);
break;
}
if (isTerminalEvent(event)) {
console.log(`Final status: ${event.state.status}`);
}
}

Events are a discriminated union on the type field. They split into two categories: non-terminal (lightweight, no state copy) and terminal (carry the full WorkflowState).

EventFieldsDescription
workflow:startworkflow_id, run_idWorkflow execution has begun.
workflow:rollbackworkflow_id, run_idCompensation stack is being unwound.
node:startnode_id, typeA node has started executing.
node:completenode_id, type, duration_msA node has finished successfully.
node:failednode_id, type, error, attemptA node execution failed (may retry).
node:retrynode_id, attempt, backoff_msA failed node is being retried after a backoff delay.
action:appliedaction_id, action_type, node_id, memory_diff?A reducer has applied an action to state. Includes memory diff when keys changed.
state:persistedrun_id, iterationState has been persisted (via persistStateFn).
agent:token_deltarun_id, node_id, tokenA single token from an LLM response (real-time streaming).
tool:call_startrun_id, node_id, tool_name, tool_call_id, argsA tool has started executing.
tool:call_finishrun_id, node_id, tool_name, tool_call_id, duration_ms, success, error?A tool has finished executing.
budget:threshold_reachedrun_id, threshold_pct, cost_usd, budget_usdCost has crossed a budget threshold (50%, 75%, 90%, 100%).
model:resolvedrun_id, node_id, agent_id, original_model, resolved_model, reason, preference, remaining_budget_usd?A model identifier has been resolved (e.g., via budget-aware fallback).

All events include a timestamp field (Unix ms).

Terminal events carry the full WorkflowState in their state field. Use the isTerminalEvent() type guard to narrow the union:

EventFieldsDescription
workflow:completestate, duration_msWorkflow finished successfully.
workflow:failedstate, errorWorkflow failed with an unrecoverable error.
workflow:timeoutstate, elapsed_msWorkflow exceeded max_execution_time_ms.
workflow:waitingstate, waiting_forWorkflow paused for human input (HITL).
import { isTerminalEvent } from '@cycgraph/orchestrator';
if (isTerminalEvent(event)) {
// TypeScript knows event.state exists here
console.log(event.state.status);
}

The agent:token_delta event delivers individual tokens as they arrive from the LLM. This enables typewriter-style output in CLIs and real-time display in web UIs:

for await (const event of runner.stream()) {
if (event.type === 'agent:token_delta') {
process.stdout.write(event.token);
}
}

Token deltas are only emitted for agent nodes that use streamText (the default execution mode).

The tool:call_start and tool:call_finish events fire in real-time as tools execute within an agent node. These events are powered by the AI SDK’s experimental_onToolCallStart and experimental_onToolCallFinish callbacks, so they fire during the LLM interaction — not post-hoc.

for await (const event of runner.stream()) {
if (event.type === 'tool:call_start') {
console.log(`Calling ${event.tool_name}...`);
}
if (event.type === 'tool:call_finish') {
const status = event.success ? 'done' : `failed: ${event.error}`;
console.log(` ${event.tool_name} ${status} (${event.duration_ms}ms)`);
}
}

Tool call events are also available via the event listener API (see below).

The action:applied event includes an optional memory_diff field that shows exactly which memory keys were added, changed, or removed by the action. This enables real-time UIs to display state changes without polling or comparing full snapshots.

for await (const event of runner.stream()) {
if (event.type === 'action:applied' && event.memory_diff) {
const { added, changed, removed, values } = event.memory_diff;
if (added.length > 0) console.log(' Added:', added);
if (changed.length > 0) console.log(' Changed:', changed);
if (removed.length > 0) console.log(' Removed:', removed);
}
}

The MemoryDiff type is exported from @cycgraph/orchestrator:

FieldTypeDescription
addedstring[]Keys that were added (not present before).
changedstring[]Keys whose values changed.
removedstring[]Keys that were removed.
valuesRecord<string, unknown>New values for added and changed keys.

When no memory keys changed (e.g., goto_node or set_status actions), memory_diff is undefined — no overhead is incurred.

stream() returns an async iterable, which maps directly to a Server-Sent Events handler. Below is a minimal Express endpoint that streams every workflow event to a connected client:

import express from 'express';
import { GraphRunner, isTerminalEvent } from '@cycgraph/orchestrator';
const app = express();
app.get('/runs/:runId/stream', async (req, res) => {
res.set({
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
const runner = new GraphRunner(graph, state, options);
for await (const event of runner.stream()) {
res.write(`event: ${event.type}\n`);
res.write(`data: ${JSON.stringify(event)}\n\n`);
if (isTerminalEvent(event)) break;
}
res.end();
});

WebSocket transports follow the same shape — replace res.write with socket.send and serialize each event as JSON. Because terminal events carry the full WorkflowState, clients can compute the final result without polling.

When using run() instead of stream(), you can still observe events via the EventEmitter-style API:

const runner = new GraphRunner(graph, state, options);
runner.on('node:start', ({ node_id, type }) => {
console.log(`Node started: ${node_id} (${type})`);
});
runner.on('budget:threshold_reached', ({ threshold_pct }) => {
console.warn(`${threshold_pct}% of budget used`);
});
const finalState = await runner.run();

Both APIs emit the same events. Use stream() when you need to consume events as an async iterable (e.g., forwarding to a client over SSE/WebSocket). Use run() with .on() when you just need side-effect logging.