If you are building a product where AI agents run multi-step tasks — querying databases, searching code, reading documents — and you want users to watch the reasoning happen in real time, you will hit a set of problems that are not obvious from the outset. Naive architectures break when multiple users need to watch the same stream, when browsers disconnect and reconnect, when your server process restarts mid-stream, or when a user wants to cancel a 30-second agent run.
This post describes an architecture that solves these problems. It spans Python and TypeScript, uses SSE for streaming, and mediates between producers and consumers through the database. The code examples are drawn from a production system that operates both multi-tenant SaaS and single-tenant private cloud clusters from the same codebase.
Limitations of Python
Python is the lingua franca of the LLM ecosystem. The SDKs, model providers and third-party tooling all ship Python first. When Anthropic patches a streaming bug or adds a new content block type, the fix lands in the Python SDK within hours. Writing your own language wrapper, in TypeScript or elsewhere, condemnds you to maintaining a fork of a fast-moving target.
But Python has real costs as a production language. The type system is bolted on and partially enforced. Runtime errors that TypeScript would catch at compile time slip through. Refactoring is riskier. The web framework ecosystem, while competent, lacks the deployment maturity of Next.js, including server components, server actions, middleware, edge functions and ISR.
The solution is to draw a clean line: treat Python as a stateless compute kernel. It receives a request (conversation history plus a new message), runs the agent, streams events, and forgets everything. All persistence, authentication, authorization, and UI logic lives in TypeScript. This gives you upstream LLM compatibility maintained for free, and product logic in a more robust production language.
The key discipline is that the Python side holds no state between requests. Every call builds a fresh agent, streams its output, and exits. If you let session state leak into Python — cached conversations, user preferences, connection pools to your product database — you end up with two sources of truth and an ever-growing surface area for bugs.
But You Can't Pipe Directly From Python to the Browser
The first architecture most people reach for is straightforward: pipe the Python SSE stream straight through to the browser. The server acts as a transparent proxy.
This breaks in three ways.
Multiple viewers, one pipe. Two browser tabs watching the same agent run cannot share a single HTTP response stream. You need either a fan-out mechanism or a durable buffer. A direct pipe serves exactly one client.
Disconnects lose events. A laptop goes to sleep, a mobile browser loses signal, a user navigates away and comes back. With a direct pipe, all events emitted during the disconnection are gone. The client has no way to catch up.
Server restarts lose everything. If your Node.js process recycles mid-stream (a deploy, a crash, an OOM kill), the entire stream is lost. The agent may still be running on the Python side, emitting events into the void.
The solution is to decouple the producer (Python → Node.js) from the consumers (Node.js → browsers) by persisting events to the database as they arrive. This turns the problem from "pipe bytes to a socket" into "write rows to a table and let clients poll."
The Full Data Flow
Here Kitewing's current architecture, addressing all three problems:
sequenceDiagram
participant B as Browser
participant N as Next.js Server
participant DB as PostgreSQL
participant A as Python Agents
B->>N: POST /api/investigation (server action)
N->>N: Validate, authorize, mark DB streaming
N->>A: POST /investigation/stream (localhost)
activate A
A->>A: Build agent with history + tools
loop SSE stream
A-->>N: data: {event_type, data}\n\n
N->>N: Parse SSE, buffer events
N->>DB: Flush encrypted events (every 100ms)
end
A-->>N: data: {event_type: "complete", ...}\n\n
deactivate A
N->>DB: Persist final messages, delete raw events
B->>N: POST /api/investigation/subscribe
activate N
loop Poll DB (every 100ms)
N->>DB: Read new encrypted events
DB-->>N: RawStreamEvent rows
N->>N: Decrypt events
N-->>B: data: {event_type, data}\n\n (SSE)
end
N-->>B: complete event, close stream
deactivate N
There are two separate SSE connections: one between the Next.js server and Python (server-to-server), and one between the browser and the Next.js server (client-to-server). The database decouples them. This is the central design decision: Everything else follows from it.
Consuming the Agent Stream Server-Side
When a user sends a message, a server action validates the input, marks the investigation as streaming in the database, and fires off the stream consumption as a background task. The server action returns immediately. The user does not wait for the agent to finish before the page updates. This matters because agent runs can take 30 seconds or more.
const investigation = await markAsStreaming(investigationId);
runInvestigationStream({ investigation, investigationRequest, workspaceId, memberId }).catch(
(error) => logger.error("Stream consumption failed", { error }),
);
The runner calls the Python service and pipes the response through a TransformStream. We use TransformStream because it lets us intercept events in flight without buffering the full response. As bytes flow through, the transform callback parses SSE events and forwards them to the stream manager for database persistence.
async function consumeAndPersistStream({ investigation, investigationRequest, workspaceId, memberId }) {
const manager = getRawStreamManager();
const abortController = new AbortController();
const unregisterWriter = manager.registerWriter(investigationId, abortController);
try {
const response = await sendInvestigationRequest(investigationRequest, {
signal: abortController.signal,
});
const { transformStream, resultPromise } = createStreamProcessor({
workspaceId,
memberId,
investigationId,
onEvent: (event) => manager.pushEvent(investigationId, event),
});
const reader = response.body!.pipeThrough(transformStream).getReader();
while (!(await reader.read()).done) {}
const result = await resultPromise;
await manager.completeStream({ investigationId, workspaceId, memberId, result });
} finally {
unregisterWriter();
}
}
The resultPromise resolves when the stream ends. It collects the complete event's payload (the new messages to persist) so the caller does not need to track terminal events separately. Note the AbortController. It is registered with the stream manager so that cancellation requests (discussed later) can abort the fetch.
Writing Our Own SSE Parser
Do not use EventSource. It only supports GET requests and does not give access to the raw byte stream. Instead, write a parser that implements the SSE specification incrementally: accumulate bytes across chunk boundaries, split on double newlines, and hold the last incomplete block in the buffer until the next chunk arrives. TCP can split a message at any byte boundary.
export class SSEParser {
private buffer = "";
private readonly decoder = new TextDecoder();
parseChunk(chunk: Uint8Array): StreamEvent[] {
this.buffer += this.decoder.decode(chunk, { stream: true });
const events: StreamEvent[] = [];
const blocks = this.buffer.split(/\r?\n\r?\n/);
this.buffer = blocks.pop() ?? "";
for (const block of blocks) {
const event = parseSSEEventBlock(block);
if (event) events.push(event);
}
return events;
}
flush(): StreamEvent[] {
this.buffer += this.decoder.decode();
if (!this.buffer) return [];
const tail = this.buffer;
this.buffer = "";
const event = parseSSEEventBlock(tail);
return event ? [event] : [];
}
}
The flush() method handles the edge case where the stream ends without a trailing double newline. Call it when the response completes to capture the final event.
The block parser extracts data: lines, joins them, and validates against a Zod schema. Invalid events are logged and dropped rather than crashing the stream:
export function parseSSEEventBlock(block: string): StreamEvent | null {
const dataLines: string[] = [];
for (const rawLine of block.split(/\r?\n/)) {
if (!rawLine || rawLine.startsWith(":")) continue; // Skip empty lines and SSE comments
if (rawLine.startsWith("data:")) {
const value = rawLine.startsWith("data: ") ? rawLine.slice(6) : rawLine.slice(5);
dataLines.push(value);
}
}
if (dataLines.length === 0) return null;
const jsonStr = dataLines.join("\n");
try {
const parsed = JSON.parse(jsonStr);
const result = StreamEventSchema.safeParse(parsed);
if (result.success) return result.data;
// Log but don't crash on unknown event types — forward compatibility
logger.warn("Received unknown SSE event type", { block });
return null;
} catch {
logger.error("Failed to parse SSE event JSON", { block });
return null;
}
}
Reusing this same SSEParser class on both the server (consuming the Python stream) and the browser (consuming the Next.js stream) means we have one implementation, tested once.
Making the Python Side Stateless
On the Python side, a FastAPI endpoint builds a fresh agent for each request, streams events, and exits. There is no shared state between requests. The agent receives the full conversation history, the new user message, and a list of available integrations. This makes the service a pure function from (history, message) to a stream of events. This makes it easy to reason about, easy to scale horizontally, and easy to restart.
@app.post("/investigation/stream")
async def investigation_stream(
request: InvestigationRequest,
agent_service: AgentService = Depends(get_agent_service),
):
cancellation_token = CancellationToken()
cancellation_registry.register(request.investigation_id, cancellation_token)
async def event_generator():
try:
async for event in agent_service.process_investigation(
history=request.history,
new_user_message=request.new_user_message,
integrations=request.integrations,
cancellation_token=cancellation_token,
):
if cancellation_token.is_cancelled():
return
yield event.to_sse()
except Exception as e:
error_event = StreamEvent(
event_type=StreamEventType.ERROR,
data={"error": str(e), "error_type": type(e).__name__},
)
yield error_event.to_sse()
finally:
cancellation_registry.unregister(request.investigation_id)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
The to_sse() method serialises each event as a JSON line prefixed with data: and terminated by a double newline, per the SSE specification. Pydantic's model_dump_json() gives you schema validation on the way out. Note the error handling: exceptions are caught and emitted as typed error events on the stream, rather than causing a 500 response that kills the connection. The cancellation machinery is covered in a later section.
Normalizing Raw SDK Events with Translator Functions
Raw events from LLM SDKs arrive as untyped dicts with inconsistent key shapes. Normalizing them through a chain of translator functions means each one inspects a single key and returns a typed tuple or None. The first match wins.
def _translate_reasoning_text(event: dict[str, Any]) -> tuple[StreamEventType, dict] | None:
if "reasoningText" not in event:
return None
return StreamEventType.REASONING_TEXT, {
"text": str(event["reasoningText"]),
"reasoning": bool(event.get("reasoning", True)),
}
def _translate_tool_use(event: dict[str, Any]) -> tuple[StreamEventType, dict] | None:
if "current_tool_use" not in event:
return None
tool = event["current_tool_use"]
return StreamEventType.TOOL_USE, {
"name": str(tool.get("name", "")),
"tool_use_id": str(tool.get("toolUseId", "")),
"input": tool.get("input", {}),
}
TOP_LEVEL_TRANSLATORS = (_translate_reasoning_text, _translate_tool_use, ...)
This pattern avoids a monolithic if/elif chain that grows unwieldy as event types multiply. Each translator is independently testable. Adding a new event type means writing one function and appending it to the tuple. Unrecognized events are silently dropped, so the SDK can ship new event shapes without breaking our service.
Buffer Writes, Poll for Reads
The stream manager is the bridge between the producer (consuming the Python stream) and the consumers (serving browser clients). On the write side, it buffers events in memory and batch-flushes to the database every 100ms. Batching is critical: LLM streams can produce dozens of events per second (individual token deltas, reasoning chunks, tool progress), and a database write per event would be too expensive.
Encrypt events before storage. Raw LLM output may contain sensitive user data and should not be readable at rest.
class RawStreamManager {
private writeBuffers = new Map<string, StreamEvent[]>();
pushEvent(investigationId: string, event: StreamEvent) {
const buffer = this.writeBuffers.get(investigationId) ?? [];
buffer.push(event);
this.writeBuffers.set(investigationId, buffer);
}
async flush() {
const toFlush = this.writeBuffers;
this.writeBuffers = new Map();
for (const [investigationId, events] of toFlush) {
const encrypted = events.map((e) => ({
investigationId,
data: encryptObject(e),
}));
await db.rawStreamEvent.createMany({ data: encrypted });
}
}
}
Note the swap-and-flush pattern: this.writeBuffers is replaced with a fresh Map before the async database write begins. This prevents events that arrive during the flush from being lost or double-written.
On the read side, the manager polls the database for new rows and dispatches them to subscribers. Each subscriber tracks a cursor (a timestamp) so it only receives events newer than its last read.
Serve Clients with Cursor-Based SSE
The subscribe endpoint creates a ReadableStream, registers a subscriber with the stream manager, and pushes SSE-formatted events as they arrive. If the browser reconnects after a network interruption, it passes its last-seen timestamp as the cursor and picks up without gaps.
export async function POST(request: NextRequest) {
// ... validate, authorize, look up investigation
const cursor = investigation.streamingStartedAt ?? investigation.streamingAt;
const stream = new ReadableStream<Uint8Array>({
start(controller) {
const manager = getRawStreamManager();
const unsubscribe = manager.subscribe({
investigationId,
cursor,
onEvents: (events) => {
for (const event of events) {
controller.enqueue(encoder.encode(`data: ${JSON.stringify(event)}\n\n`));
if (isCompleteEvent(event) || isErrorEvent(event)) {
setTimeout(() => {
unsubscribe();
controller.close();
}, 200);
return;
}
}
},
onError: (error) => {
controller.enqueue(encodeErrorSSE(error.message));
controller.close();
},
});
// Clean up when client disconnects
request.signal.addEventListener("abort", () => {
unsubscribe();
controller.close();
});
},
});
return new NextResponse(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
},
});
}
The setTimeout before closing on a terminal event gives the browser a moment to read the enqueued data. Without it, the stream can close before the client has processed the final event.
This design means page reloads work naturally: the client reconnects, the subscribe endpoint reads all events from the database starting at the stream's beginning, and the UI reconstructs the full state. No special "replay" logic is needed. The same code path handles both live streaming and catch-up.
Consuming Events on the Client with a Reducer
On the browser, subscribe to the server's SSE endpoint and feed events through a reducer. Use useReducer rather than useState because the stream produces many rapid updates and a reducer consolidates the transition logic in one place, making it easier to test and debug.
The readSSEStream async generator reuses the same SSEParser class from the server side. Each event is validated against the Zod schema before being yielded. Invalid events are dropped rather than crashing the UI.
function useInvestigationStream(investigationId: string) {
const [state, dispatch] = useReducer(streamReducer, initialState);
async function subscribe() {
const response = await fetch("/api/investigation/subscribe", {
method: "POST",
body: JSON.stringify({ investigationId }),
});
for await (const event of readSSEStream(response)) {
dispatch(reduceStreamEvent(event));
}
}
}
The reducer is a pure function that maps event types to state updates. Keeping it pure makes the streaming UI deterministic: Given the same sequence of events, you always get the same rendered state. This is what makes page-reload recovery work. Replaying the events from the database through the same reducer gives you the same UI.
function reduceStreamEvent(state: StreamState, event: StreamEvent): StreamState {
switch (event.event_type) {
case "reasoning_text":
return { ...state, reasoning: state.reasoning + event.data.text };
case "tool_use":
return { ...state, activeTools: [...state.activeTools, event.data] };
case "complete":
return { ...state, status: "complete", messages: event.data.new_messages };
default:
return state;
}
}
Keeping Types in Sync Across Languages
A discriminated union of event types is defined on both sides and must stay aligned. In Python, an enum is the source of truth for event emission:
class StreamEventType(str, Enum):
REASONING_TEXT = "reasoning_text"
TOOL_USE = "tool_use"
TOOL_STREAM = "tool_stream"
MESSAGE = "message"
COMPLETE = "complete"
ERROR = "error"
INTERRUPT = "interrupt"
# ... 10 more event types
In TypeScript, a Zod discriminated union is the source of truth for event consumption:
const StreamEventSchema = z.discriminatedUnion("event_type", [
ReasoningTextEventSchema,
ToolUseEventSchema,
ToolStreamEventSchema,
CompleteEventSchema,
ErrorEventSchema,
InterruptEventSchema,
// ... 11 more schemas
]);
Parity is enforced with cross-language tests. The Python test suite reads the TypeScript schema definitions. The TypeScript test suite validates that every Python event type has a corresponding Zod schema. When someone adds an event type to one side and forgets the other, CI fails.
Unknown event types are handled gracefully on the client. They're parsed through a catch-all schema and logged rather than crashing the stream parser. This provides forward compatibility when the Python side ships a new event type before the client is updated.
Implementing Cooperative Cancellation
Cancellation spans all three layers and requires a cooperative protocol:
- Browser → server action writes
streamAbortedByIdto the database. - Next.js server → polls for abort flags every 100ms and calls
abortController.abort()on the fetch to Python. - Python → a
CancellationTokenis checked at tool-call boundaries. ACancellationHookregistered with the agent framework intercepts every tool call and cancels it if the token is set.
class CancellationHook(HookProvider):
def __init__(self, token: CancellationToken):
self._token = token
def register_hooks(self, registry: HookRegistry, **kwargs):
registry.add_callback(BeforeToolCallEvent, self._check_cancellation)
def _check_cancellation(self, event: BeforeToolCallEvent):
if self._token.is_cancelled():
event.cancel_tool = "Operation cancelled by user"
# Stop the agent event loop entirely
request_state = event.invocation_state.get("request_state")
if request_state is not None:
request_state["stop_event_loop"] = True
The CancellationToken is thread-safe (backed by threading.Event) because the cancel signal may arrive from a different async task than the one running the agent. The two-phase check — abort flag in the database, cancellation token in memory — means the user gets immediate feedback even though the Python process may take a moment to reach a safe stopping point between tool calls.
Routing Callbacks from Python Back to TypeScript
The data flow is not one-directional. The Python agent needs to call back to the Next.js server to execute integration operations — SQL queries, GitHub searches, Notion reads — because the web server holds the encrypted credentials and manages authorization.
class WebClient:
def __init__(self, *, ctx: LoggingContext):
web_port = os.getenv("WEB_PORT", "3000")
local_secret = os.getenv("LOCAL_SECRET", "")
self._client = httpx.AsyncClient(
base_url=f"http://localhost:{web_port}",
headers={"x-local-secret": local_secret},
timeout=httpx.Timeout(60.0),
)
async def query(self, integration_id: str, sql: str) -> QueryResponse:
response = await self._client.post(
f"/api/agents/integrations/{integration_id}/query",
json={"sql": sql},
)
return QueryResponse.model_validate(response.json())
The Python process should never see a database password or an API key for a third-party integration.
Trade-Offs
Latency. The database-mediated pub/sub adds roughly 100-200ms of latency compared to a direct pipe. For a streaming LLM response that runs for 10-60 seconds, this is acceptable. For a latency-sensitive chat application where you need sub-50ms time-to-first-token, it might not be.
Cross-language type drift. Despite the parity tests, the type contract between Python and TypeScript is maintained by convention, not by a shared schema language like Protocol Buffers. This works because the SSE protocol is simple — flat JSON objects with an event_type discriminator — and the parity tests catch most regressions. A more complex protocol might warrant code generation from a shared definition.
No backpressure. The Python side streams events as fast as the model produces them. If the database flush cannot keep up (which would require the database to be severely degraded), events buffer in memory. In practice this has not been a problem because LLM token generation is slower than database writes, but it is worth noting.
Each of these are areas where complexity may be added as we scale in order to address problems we see coming down the road.
Summary
The naive architecture of piping a stream straight from the LLM to the browser fails as soon as you need multiple viewers, disconnect recovery, or crash resilience. Interposing the database as a durable buffer solves all three, at the cost of a small latency penalty that is negligible for the timescales of LLM agent execution.
The language split reinforces this. Keep Python stateless: a pure function from (history, message) to a stream of events. Keep TypeScript stateful: persistence, auth, encryption, UI. Draw the boundary in your SSE protocol, enforce it with cross-language parity tests, and let each language do what it does best.