Server-Sent Events (SSE)
Stream real-time data to clients using Server-Sent Events
Server-Sent Events
Learn how to implement real-time data streaming in your Dwex applications using Server-Sent Events (SSE). SSE provides a simple, efficient way to push updates from the server to the client over HTTP.
Overview
Dwex provides first-class support for Server-Sent Events through the @Sse() decorator. SSE allows servers to push data to web clients over a single HTTP connection, making it perfect for:
- Live notifications and updates
- Real-time dashboards
- Progress indicators
- Live feeds and activity streams
- Server monitoring
Unlike WebSockets, SSE is unidirectional (server-to-client only) and works over standard HTTP, making it simpler to implement and deploy.
Quick Start
Dwex offers two approaches for implementing SSE endpoints:
Approach 1: Async Generators (Recommended)
The simplest way to create an SSE endpoint is using async generators:
import { Controller, Sse } from "@dwex/core";
@Controller("events")
export class EventsController {
@Sse("updates")
async *streamUpdates() {
// Send initial connection message
yield {
data: { message: "Connected" },
event: "connected",
};
// Stream data
for (let i = 0; i < 10; i++) {
yield {
data: { count: i, timestamp: new Date().toISOString() },
event: "update",
id: String(i),
};
await new Promise((resolve) => setTimeout(resolve, 1000));
}
// Send completion message
yield {
data: { message: "Stream complete" },
event: "complete",
};
}
}Approach 2: SseStream (Imperative)
For more control over the stream lifecycle, use the SseStream class:
import { Controller, Sse } from "@dwex/core";
import { SseStream } from "@dwex/common";
@Controller("events")
export class EventsController {
@Sse("time")
streamTime() {
const stream = new SseStream();
let count = 0;
const interval = setInterval(() => {
stream.send({
data: { time: new Date().toISOString(), count: count++ },
event: "time",
});
if (count >= 10) {
clearInterval(interval);
stream.close();
}
}, 1000);
// Clean up when client disconnects
stream.onClose(() => {
clearInterval(interval);
});
return stream;
}
}SSE Event Format
Each SSE event can include the following fields:
interface SseEvent {
// The data payload (required)
data: string | object | number | boolean | null;
// Event type (optional) - clients can filter by this
event?: string;
// Event ID (optional) - used for reconnection
id?: string;
// Reconnection time in milliseconds (optional)
retry?: number;
// Comment for debugging/keep-alive (optional)
comment?: string;
}Example with All Fields
@Sse("notifications")
async *streamNotifications() {
yield {
data: {
title: "New Message",
body: "You have a new message",
timestamp: Date.now(),
},
event: "notification", // Event type
id: "msg-123", // Unique ID
retry: 3000, // Retry after 3 seconds on disconnect
comment: "User notification event", // For debugging
};
}Using with Dependency Injection
SSE endpoints work seamlessly with Dwex's dependency injection:
import { Controller, Sse, Query } from "@dwex/core";
import { Injectable } from "@dwex/core";
@Injectable()
export class NotificationService {
async *subscribe(userId: string) {
// Your notification logic
while (true) {
const notification = await this.getNextNotification(userId);
if (notification) yield notification;
await new Promise((r) => setTimeout(r, 1000));
}
}
}
@Controller("notifications")
export class NotificationsController {
constructor(private notificationService: NotificationService) {}
@Sse("stream")
async *streamNotifications(@Query("userId") userId: string) {
yield { data: { message: "Connected" }, event: "connected" };
for await (const notification of this.notificationService.subscribe(
userId,
)) {
yield {
data: notification,
event: "notification",
id: notification.id,
};
}
}
}SseStream API Reference
The SseStream class provides an imperative API for SSE:
Methods
send(event: SseEvent): void
Send an SSE event to the client:
stream.send({
data: { message: "Hello" },
event: "greeting",
id: "1",
});sendData(data: any): void
Send just the data without other SSE fields:
stream.sendData({ status: "ok", count: 42 });sendComment(comment: string): void
Send a keep-alive comment:
stream.sendComment("keep-alive");close(): void
Close the stream:
stream.close();onClose(callback: () => void): void
Register a callback for when the stream closes:
const interval = setInterval(() => {
stream.send({ data: "ping" });
}, 1000);
stream.onClose(() => {
clearInterval(interval);
console.log("Client disconnected");
});isClosed(): boolean
Check if the stream is closed:
if (!stream.isClosed()) {
stream.send({ data: "Still open" });
}Examples
Countdown Timer
Stream a countdown from 10 to 1:
@Controller("events")
export class EventsController {
@Sse("countdown")
async *countdown() {
for (let i = 10; i >= 1; i--) {
yield {
data: { count: i },
event: "countdown",
id: String(10 - i + 1),
};
await new Promise((resolve) => setTimeout(resolve, 1000));
}
yield {
data: { message: "Countdown complete!" },
event: "complete",
};
}
}Live Server Metrics
Stream server metrics every second:
import { SseStream } from "@dwex/common";
@Controller("monitoring")
export class MonitoringController {
@Sse("metrics")
streamMetrics() {
const stream = new SseStream();
const interval = setInterval(async () => {
const metrics = {
cpu: process.cpuUsage(),
memory: process.memoryUsage(),
uptime: process.uptime(),
};
stream.send({
data: metrics,
event: "metrics",
id: String(Date.now()),
});
// Send keep-alive every 30 seconds
if (Date.now() % 30000 < 1000) {
stream.sendComment("keep-alive");
}
}, 1000);
stream.onClose(() => clearInterval(interval));
return stream;
}
}Event-Driven Updates
Stream updates from an event emitter:
import { EventEmitter } from "node:events";
@Injectable()
export class ActivityService extends EventEmitter {
trackActivity(userId: string, activity: string) {
this.emit("activity", { userId, activity, timestamp: Date.now() });
}
}
@Controller("activity")
export class ActivityController {
constructor(private activityService: ActivityService) {}
@Sse("stream")
streamActivity() {
const stream = new SseStream();
const handler = (activity: any) => {
stream.send({
data: activity,
event: "activity",
id: String(activity.timestamp),
});
};
this.activityService.on("activity", handler);
stream.onClose(() => {
this.activityService.off("activity", handler);
});
return stream;
}
}Progress Tracking
Stream job progress updates:
@Controller("jobs")
export class JobsController {
@Sse("progress/:jobId")
async *trackProgress(@Param("jobId") jobId: string) {
yield { data: { status: "started", progress: 0 }, event: "start" };
for (let progress = 0; progress <= 100; progress += 10) {
yield {
data: { jobId, progress, status: "running" },
event: "progress",
id: `${jobId}-${progress}`,
};
await new Promise((resolve) => setTimeout(resolve, 500));
}
yield {
data: { jobId, progress: 100, status: "completed" },
event: "complete",
};
}
}Client-Side Usage
JavaScript/TypeScript
const eventSource = new EventSource("http://localhost:9929/events/updates");
// Listen to default messages
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("Message:", data);
};
// Listen to specific event types
eventSource.addEventListener("notification", (event) => {
const notification = JSON.parse(event.data);
console.log("Notification:", notification);
});
// Handle connection open
eventSource.onopen = () => {
console.log("Connection opened");
};
// Handle errors
eventSource.onerror = (error) => {
console.error("EventSource error:", error);
eventSource.close();
};
// Close connection
// eventSource.close();React Hook Example
import { useEffect, useState } from "react";
function useSSE<T>(url: string) {
const [data, setData] = useState<T | null>(null);
const [error, setError] = useState<Error | null>(null);
useEffect(() => {
const eventSource = new EventSource(url);
eventSource.onmessage = (event) => {
setData(JSON.parse(event.data));
};
eventSource.onerror = (err) => {
setError(new Error("SSE connection error"));
eventSource.close();
};
return () => {
eventSource.close();
};
}, [url]);
return { data, error };
}
// Usage
function Dashboard() {
const { data, error } = useSSE("http://localhost:9929/events/metrics");
if (error) return <div>Error: {error.message}</div>;
if (!data) return <div>Connecting...</div>;
return <div>Metrics: {JSON.stringify(data)}</div>;
}cURL Testing
Test SSE endpoints with curl:
# Basic SSE stream
curl -N http://localhost:9929/events/countdown
# With headers
curl -N -H "Authorization: Bearer token" http://localhost:9929/events/streamBest Practices
Keep-Alive Comments
Send periodic comments to prevent connection timeouts:
@Sse("updates")
streamUpdates() {
const stream = new SseStream();
// Send keep-alive every 15 seconds
const keepAlive = setInterval(() => {
stream.sendComment("keep-alive");
}, 15000);
stream.onClose(() => clearInterval(keepAlive));
return stream;
}Error Handling
Handle errors gracefully in async generators:
@Sse("data")
async *streamData() {
try {
yield { data: { status: "started" }, event: "start" };
for await (const item of this.dataService.stream()) {
yield { data: item, event: "data" };
}
} catch (error) {
yield {
data: { error: error.message },
event: "error",
};
} finally {
yield { data: { status: "ended" }, event: "end" };
}
}Resource Cleanup
Always clean up resources when the stream closes:
@Sse("events")
streamEvents() {
const stream = new SseStream();
const subscription = this.eventService.subscribe();
stream.onClose(() => {
subscription.unsubscribe();
console.log("Stream closed, resources cleaned up");
});
subscription.on("event", (event) => {
stream.send({ data: event });
});
return stream;
}Event IDs and Reconnection
Provide event IDs to enable automatic reconnection:
@Sse("updates")
async *streamUpdates(@Headers("last-event-id") lastEventId?: string) {
let startId = lastEventId ? parseInt(lastEventId) : 0;
for (let i = startId; i < 100; i++) {
yield {
data: { count: i },
id: String(i), // Client can reconnect from this point
retry: 3000, // Retry after 3 seconds
};
await new Promise((r) => setTimeout(r, 1000));
}
}Working with Guards and Interceptors
SSE endpoints work with Guards and Interceptors:
import { UseGuards } from "@dwex/core";
import { AuthGuard } from "./auth.guard";
@Controller("private")
export class PrivateEventsController {
@Sse("notifications")
@UseGuards(AuthGuard) // Protected SSE endpoint
async *streamNotifications(@Req() req: any) {
const userId = req.user.id;
for await (const notification of this.getNotifications(userId)) {
yield { data: notification, event: "notification" };
}
}
}Protocol Details
SSE uses a simple text-based protocol over HTTP:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache, no-transform
Connection: keep-alive
X-Accel-Buffering: no
event: greeting
id: 1
data: {"message":"Hello"}
event: update
id: 2
retry: 3000
data: {"count":42}
: This is a comment
data: Simple message without event typeDwex automatically sets the required headers:
Content-Type: text/event-streamCache-Control: no-cache, no-transformConnection: keep-aliveX-Accel-Buffering: no(disables buffering in nginx)
Troubleshooting
Connection Closes Immediately
If connections close immediately, check:
- Buffering: Ensure nginx/proxies don't buffer SSE responses
- Timeouts: Configure appropriate timeout values
- Keep-alive: Send periodic comments to keep the connection alive
Events Not Received
If clients don't receive events:
- Format: Ensure events end with
\n\n(handled automatically by Dwex) - Flush: Data is sent immediately (Bun handles this automatically)
- CORS: Configure CORS if accessing from different domains
Memory Leaks
Prevent memory leaks by:
- Always using
stream.onClose()to clean up resources - Clearing intervals/timers when streams close
- Unsubscribing from event emitters
- Limiting the number of concurrent connections if needed
Comparison with WebSockets
| Feature | SSE | WebSockets |
|---|---|---|
| Direction | Unidirectional (server → client) | Bidirectional |
| Protocol | HTTP | WebSocket protocol |
| Reconnection | Automatic | Manual |
| Browser Support | Excellent | Excellent |
| Proxies/Firewalls | Works through HTTP proxies | May be blocked |
| Complexity | Simple | More complex |
| Use Case | Server pushes, live feeds | Real-time chat, gaming |
Choose SSE when you only need server-to-client communication. Use WebSockets when you need bidirectional communication.