Skip to main content
Version: 0.1.0

Flows

Execute server-side workflows and stream their progress.

FlowClient

The FlowClient runs flows over HTTP with SSE streaming. Use it for long-running workflows, background jobs, and human-in-the-loop processes.

Creating a FlowClient

Access via the Database instance:

const db = client.database('myapp');
const flowClient = db.flow;

The db.flow getter returns a lazily-created, cached FlowClient pre-configured with the correct base URL, repository, auth manager, and WebSocket-backed FlowsApi.

Options

interface FlowClientOptions {
requestTimeout?: number;
fetch?: typeof fetch;
}

Methods

run()

Start a flow and return immediately.

async run(
flowPath: string,
input?: Record<string, unknown>,
options?: { signal?: AbortSignal }
): Promise<FlowRunResponse>
interface FlowRunResponse {
instance_id: string;
job_id: string;
status: string;
}

runAndWait()

Start a flow and wait until it completes or fails.

async runAndWait(
flowPath: string,
input?: Record<string, unknown>,
options?: { signal?: AbortSignal }
): Promise<FlowRunResult>
interface FlowRunResult {
instanceId: string;
status: 'completed' | 'failed';
output?: unknown;
error?: string;
}

runAndCollect()

Start a flow and collect all events into an array.

async runAndCollect(
flowPath: string,
input?: Record<string, unknown>,
options?: { signal?: AbortSignal }
): Promise<FlowCollectResult>
interface FlowCollectResult {
instanceId: string;
events: FlowExecutionEvent[];
}

streamEvents()

Stream events from a running flow as an async iterable.

async *streamEvents(
instanceId: string,
options?: { signal?: AbortSignal }
): AsyncIterable<FlowExecutionEvent>

Example:

for await (const event of flowClient.streamEvents(instanceId)) {
console.log(event.type, event);
}

createEventStream()

Create a closeable event stream (alternative to streamEvents).

async createEventStream(
instanceId: string,
options?: { signal?: AbortSignal }
): Promise<{ events: AsyncIterable<FlowExecutionEvent>; close: () => void }>

getInstanceStatus()

Check the current status of a flow instance.

async getInstanceStatus(
instanceId: string,
options?: { signal?: AbortSignal }
): Promise<FlowInstanceStatusResponse>
interface FlowInstanceStatusResponse {
id: string;
status: FlowInstanceStatus;
variables: Record<string, unknown>;
flow_path: string;
started_at: string;
error?: string;
}

type FlowInstanceStatus =
| 'queued'
| 'pending'
| 'running'
| 'completed'
| 'failed'
| 'waiting'
| 'cancelled';

resume()

Resume a flow that is waiting for external input.

async resume(
instanceId: string,
data: unknown,
options?: { signal?: AbortSignal }
): Promise<void>

respondToHumanTask()

Respond to a specific human task within a waiting flow.

async respondToHumanTask(
instanceId: string,
taskId: string,
response: unknown,
options?: { signal?: AbortSignal }
): Promise<void>

FlowsApi (WebSocket)

When using a WebSocket client, flows are also available via db.flows():

const flows = db.flows();

const { instance_id } = await flows.run('flows/my-flow', { key: 'value' });
const status = await flows.getInstanceStatus(instance_id);
const resumed = await flows.resume(instance_id, { approved: true });
await flows.cancel(instance_id);

// Stream events
const { events, unsubscribe } = await flows.subscribeEvents(instance_id);
for await (const event of events) {
console.log(event.type);
}
await unsubscribe();

Flow Execution Events

Event TypeKey FieldsDescription
step_startednode_id, step_typeA step began executing
step_completednode_id, output, duration_msA step finished
step_failednode_id, error, duration_msA step failed
flow_waitingnode_id, wait_type, reasonFlow is paused waiting for input
flow_resumednode_id, wait_duration_msFlow was resumed
flow_completedoutput, total_duration_msFlow finished successfully
flow_failederror, total_duration_msFlow failed
text_chunktextStreaming text from an AI step
thought_chunktextAI reasoning/thinking text
tool_call_startedtool_call_id, function_name, argumentsAI is calling a tool
tool_call_completedtool_call_id, result, error?Tool call finished
conversation_createdconversation_path, workspaceChat conversation created
message_savedmessage_path, roleMessage persisted
loglevel, message, node_id?Log entry

Helper

import { isTerminalEvent } from '@raisindb/client';

if (isTerminalEvent(event)) {
// event is FlowCompletedEvent | FlowFailedEvent
}

useFlow (React Hook)

React hook for executing and monitoring flows with reactive state.

import React from 'react';
import { useFlow } from '@raisindb/client';

const db = client.database('myapp');

function OrderProcessor() {
const flow = useFlow(React, { database: db });

return (
<div>
<button onClick={() => flow.run('/flows/process-order', { orderId: '123' })}>
Run
</button>
<p>Status: {flow.status}</p>
{flow.events.map((e, i) => <div key={i}>{e.type}</div>)}
</div>
);
}

Options

interface UseFlowOptions {
database?: Database;
clientOptions?: FlowClientOptions;
}

Return Value

interface UseFlowReturn {
events: FlowExecutionEvent[];
status: 'idle' | 'running' | 'waiting' | 'completed' | 'failed';
isRunning: boolean;
error: string | null;
output: unknown | null;
instanceId: string | null;
run: (flowPath: string, input?: Record<string, unknown>) => Promise<void>;
resume: (data: unknown) => Promise<void>;
reset: () => void;
}