Streaming
Stream data from server to client with streaming responses and Server-Sent Events (SSE)
Creating a Streaming Response
Use the stream() function to create streaming responses:
// app/api/stream/route.ts
import { route, stream, type StreamingResponse } from "@ademattos/bunbox";
export const streamText = route.get().handle((): StreamingResponse<string> => {
return stream(async function* () {
const words = ["Hello", " ", "from", " ", "Bunbox"];
for (const word of words) {
yield word;
await Bun.sleep(100);
}
});
});Server-Sent Events (SSE)
For structured data streaming, use sse() which automatically formats messages:
// app/api/updates/route.ts
import { route, sse, type SSEResponse } from "@ademattos/bunbox";
interface Update {
message: string;
progress: number;
}
export const getUpdates = route.get().handle((): SSEResponse<Update> => {
return sse(async function* () {
for (let i = 0; i <= 100; i += 10) {
yield {
message: `Processing...`,
progress: i,
};
await Bun.sleep(500);
}
});
});Client-Side Usage with useStream
Use the useStream hook from the auto-generated API client:
import { api } from "@/.bunbox/api-client";
export default function StreamPage() {
const { data, latest, loading, error, start, abort } =
api.stream.streamText.useStream({
enabled: false, // Don't start automatically
onMessage: (msg) => console.log("Received:", msg),
onFinish: () => console.log("Stream finished"),
onError: (err) => console.error("Stream error:", err),
});
return (
<div>
<button onClick={start} disabled={loading}>
{loading ? "Streaming..." : "Start Stream"}
</button>
{loading && <button onClick={abort}>Abort</button>}
<div>
{data.map((msg, i) => (
<span key={i}>{msg.token}</span>
))}
</div>
{error && <div>Error: {error.message}</div>}
</div>
);
}Hook Options
The useStream hook accepts these options (all flattened):
const stream = api.updates.getUpdates.useStream({
// Auto-start the stream (default: true)
enabled: true,
// Callback when each message arrives
onMessage: (data) => console.log(data),
// Callback when stream completes
onFinish: () => console.log("Done"),
// Callback on error
onError: (error) => console.error(error),
// Optional headers
headers: { "Authorization": "Bearer token" },
});Hook Return Values
The hook returns:
{
// All received messages
data: TResponse[],
// Most recent message
latest: TResponse | null,
// Loading state
loading: boolean,
// Error if any
error: Error | undefined,
// Manually start the stream
start: () => Promise<void>,
// Abort ongoing stream
abort: () => void,
}Use Cases
Real-time Progress Updates
// Server
export const trackProgress = route.get().handle((): SSEResponse<{ progress: number; status: string }> => {
return sse(async function* () {
const steps = ["Initializing", "Processing", "Finalizing", "Complete"];
for (let i = 0; i < steps.length; i++) {
yield {
progress: (i + 1) * 25,
status: steps[i],
};
await Bun.sleep(1000);
}
});
});// Client
const { data, latest } = api.progress.trackProgress.useStream();
return (
<div>
<div>Progress: {latest?.progress}%</div>
<div>Status: {latest?.status}</div>
</div>
);AI Text Generation
// Server
export const generateText = route
.post()
.body(z.object({ prompt: z.string() }))
.handle(({ body }): SSEResponse<{ token: string }> => {
return sse(async function* () {
const response = await generateAIResponse(body.prompt);
for await (const chunk of response) {
yield { token: chunk };
}
});
});// Client - body fields are flattened
const { data, loading, start } = api.ai.generateText.useStream({
enabled: false,
prompt: "Write a story", // Flattened, not { body: { prompt } }
});
return (
<div>
<button onClick={start}>Generate</button>
<p>{data.map(d => d.token).join("")}</p>
</div>
);Log Streaming
// Server
export const streamLogs = route.get().handle((): StreamingResponse<string> => {
return stream(async function* () {
const logs = await fetchLogs();
for (const log of logs) {
yield `[${log.timestamp}] ${log.message}\n`;
}
});
});Type Safety
Both stream() and sse() return phantom types that the API client generator recognizes:
// Server
export const countStream = route.get().handle((): SSEResponse<{ count: number }> => {
return sse(async function* () {
for (let i = 0; i < 10; i++) {
yield { count: i };
}
});
});
// Client - automatically typed!
const { data } = api.counter.countStream.useStream();
// data is typed as Array<{ count: number }>Manual Streaming (Without Hook)
You can also consume streams manually:
const response = await api.stream.streamText();
for await (const message of response) {
console.log(message);
}Aborting Streams
Streams can be aborted using the abort() function:
const { abort, loading } = api.stream.streamText.useStream();
if (loading) {
return <button onClick={abort}>Cancel</button>;
}Error Handling
Handle stream errors gracefully:
const { error } = api.stream.streamText.useStream({
onError: (err) => {
// Log to error service
console.error(err);
},
});
if (error) {
return <div>Failed to load stream: {error.message}</div>;
}