Documentation
Framework
Version
Class References
Function References
Interface References
Type Alias References
Variable References

StreamProcessor

Class: StreamProcessor

Defined in: activities/chat/stream/processor.ts:117

StreamProcessor - State machine for processing AI response streams

Manages the full UIMessage[] conversation and emits events on changes.

State tracking:

  • Full message array
  • Current assistant message being streamed
  • Text content accumulation
  • Multiple parallel tool calls
  • Tool call completion detection

Tool call completion is detected when:

  1. A new tool call starts at a different index
  2. Text content arrives
  3. Stream ends

Constructors

Constructor

ts
new StreamProcessor(options): StreamProcessor;
new StreamProcessor(options): StreamProcessor;

Defined in: activities/chat/stream/processor.ts:145

Parameters

options

StreamProcessorOptions = {}

Returns

StreamProcessor

Methods

addToolApprovalResponse()

ts
addToolApprovalResponse(approvalId, approved): void;
addToolApprovalResponse(approvalId, approved): void;

Defined in: activities/chat/stream/processor.ts:287

Add an approval response (called by client after handling onApprovalRequest)

Parameters

approvalId

string

approved

boolean

Returns

void


addToolResult()

ts
addToolResult(
   toolCallId, 
   output, 
   error?): void;
addToolResult(
   toolCallId, 
   output, 
   error?): void;

Defined in: activities/chat/stream/processor.ts:243

Add a tool result (called by client after handling onToolCall)

Parameters

toolCallId

string

output

any

error?

string

Returns

void


addUserMessage()

ts
addUserMessage(content, id?): UIMessage;
addUserMessage(content, id?): UIMessage;

Defined in: activities/chat/stream/processor.ts:192

Add a user message to the conversation. Supports both simple string content and multimodal content arrays.

Parameters

content

The message content (string or array of content parts)

string | ContentPart[]

id?

string

Optional custom message ID (generated if not provided)

Returns

UIMessage

The created UIMessage

Example

ts
// Simple text message
processor.addUserMessage('Hello!')

// Multimodal message with image
processor.addUserMessage([
  { type: 'text', content: 'What is in this image?' },
  { type: 'image', source: { type: 'url', value: 'https://example.com/photo.jpg' } }
])

// With custom ID
processor.addUserMessage('Hello!', 'custom-id-123')
// Simple text message
processor.addUserMessage('Hello!')

// Multimodal message with image
processor.addUserMessage([
  { type: 'text', content: 'What is in this image?' },
  { type: 'image', source: { type: 'url', value: 'https://example.com/photo.jpg' } }
])

// With custom ID
processor.addUserMessage('Hello!', 'custom-id-123')

areAllToolsComplete()

ts
areAllToolsComplete(): boolean;
areAllToolsComplete(): boolean;

Defined in: activities/chat/stream/processor.ts:318

Check if all tool calls in the last assistant message are complete Useful for auto-continue logic

Returns

boolean


clearMessages()

ts
clearMessages(): void;
clearMessages(): void;

Defined in: activities/chat/stream/processor.ts:362

Clear all messages

Returns

void


finalizeStream()

ts
finalizeStream(): void;
finalizeStream(): void;

Defined in: activities/chat/stream/processor.ts:859

Finalize the stream - complete all pending operations

Returns

void


getMessages()

ts
getMessages(): UIMessage[];
getMessages(): UIMessage[];

Defined in: activities/chat/stream/processor.ts:310

Get current messages

Returns

UIMessage[]


getRecording()

ts
getRecording(): ChunkRecording | null;
getRecording(): ChunkRecording | null;

Defined in: activities/chat/stream/processor.ts:938

Get the current recording

Returns

ChunkRecording | null


getState()

ts
getState(): ProcessorState;
getState(): ProcessorState;

Defined in: activities/chat/stream/processor.ts:911

Get current processor state

Returns

ProcessorState


process()

ts
process(stream): Promise<ProcessorResult>;
process(stream): Promise<ProcessorResult>;

Defined in: activities/chat/stream/processor.ts:375

Process a stream and emit events through handlers

Parameters

stream

AsyncIterable<any>

Returns

Promise<ProcessorResult>


processChunk()

ts
processChunk(chunk): void;
processChunk(chunk): void;

Defined in: activities/chat/stream/processor.ts:403

Process a single chunk from the stream

Parameters

chunk

AGUIEvent

Returns

void


removeMessagesAfter()

ts
removeMessagesAfter(index): void;
removeMessagesAfter(index): void;

Defined in: activities/chat/stream/processor.ts:354

Remove messages after a certain index (for reload/retry)

Parameters

index

number

Returns

void


reset()

ts
reset(): void;
reset(): void;

Defined in: activities/chat/stream/processor.ts:961

Full reset (including messages)

Returns

void


setMessages()

ts
setMessages(messages): void;
setMessages(messages): void;

Defined in: activities/chat/stream/processor.ts:164

Set the messages array (e.g., from persisted state)

Parameters

messages

UIMessage[]

Returns

void


startAssistantMessage()

ts
startAssistantMessage(): string;
startAssistantMessage(): string;

Defined in: activities/chat/stream/processor.ts:219

Start streaming a new assistant message Returns the message ID

Returns

string


startRecording()

ts
startRecording(): void;
startRecording(): void;

Defined in: activities/chat/stream/processor.ts:925

Start recording chunks

Returns

void


toModelMessages()

ts
toModelMessages(): ModelMessage<
  | string
  | ContentPart<unknown, unknown, unknown, unknown, unknown>[]
  | null>[];
toModelMessages(): ModelMessage<
  | string
  | ContentPart<unknown, unknown, unknown, unknown, unknown>[]
  | null>[];

Defined in: activities/chat/stream/processor.ts:299

Get the conversation as ModelMessages (for sending to LLM)

Returns

ModelMessage< | string | ContentPart<unknown, unknown, unknown, unknown, unknown>[] | null>[]


replay()

ts
static replay(recording, options?): Promise<ProcessorResult>;
static replay(recording, options?): Promise<ProcessorResult>;

Defined in: activities/chat/stream/processor.ts:970

Replay a recording through the processor

Parameters

recording

ChunkRecording

options?

StreamProcessorOptions

Returns

Promise<ProcessorResult>