Dwex Logo

Server-Sent Events (SSE)

Stream real-time data to clients using Server-Sent Events

Server-Sent Events

Learn how to implement real-time data streaming in your Dwex applications using Server-Sent Events (SSE). SSE provides a simple, efficient way to push updates from the server to the client over HTTP.

Overview

Dwex provides first-class support for Server-Sent Events through the @Sse() decorator. SSE allows servers to push data to web clients over a single HTTP connection, making it perfect for:

  • Live notifications and updates
  • Real-time dashboards
  • Progress indicators
  • Live feeds and activity streams
  • Server monitoring

Unlike WebSockets, SSE is unidirectional (server-to-client only) and works over standard HTTP, making it simpler to implement and deploy.

Quick Start

Dwex offers two approaches for implementing SSE endpoints:

The simplest way to create an SSE endpoint is using async generators:

import { Controller, Sse } from "@dwex/core";

@Controller("events")
export class EventsController {
	@Sse("updates")
	async *streamUpdates() {
		// Send initial connection message
		yield {
			data: { message: "Connected" },
			event: "connected",
		};

		// Stream data
		for (let i = 0; i < 10; i++) {
			yield {
				data: { count: i, timestamp: new Date().toISOString() },
				event: "update",
				id: String(i),
			};
			await new Promise((resolve) => setTimeout(resolve, 1000));
		}

		// Send completion message
		yield {
			data: { message: "Stream complete" },
			event: "complete",
		};
	}
}

Approach 2: SseStream (Imperative)

For more control over the stream lifecycle, use the SseStream class:

import { Controller, Sse } from "@dwex/core";
import { SseStream } from "@dwex/common";

@Controller("events")
export class EventsController {
	@Sse("time")
	streamTime() {
		const stream = new SseStream();

		let count = 0;
		const interval = setInterval(() => {
			stream.send({
				data: { time: new Date().toISOString(), count: count++ },
				event: "time",
			});

			if (count >= 10) {
				clearInterval(interval);
				stream.close();
			}
		}, 1000);

		// Clean up when client disconnects
		stream.onClose(() => {
			clearInterval(interval);
		});

		return stream;
	}
}

SSE Event Format

Each SSE event can include the following fields:

interface SseEvent {
	// The data payload (required)
	data: string | object | number | boolean | null;

	// Event type (optional) - clients can filter by this
	event?: string;

	// Event ID (optional) - used for reconnection
	id?: string;

	// Reconnection time in milliseconds (optional)
	retry?: number;

	// Comment for debugging/keep-alive (optional)
	comment?: string;
}

Example with All Fields

@Sse("notifications")
async *streamNotifications() {
	yield {
		data: {
			title: "New Message",
			body: "You have a new message",
			timestamp: Date.now(),
		},
		event: "notification", // Event type
		id: "msg-123", // Unique ID
		retry: 3000, // Retry after 3 seconds on disconnect
		comment: "User notification event", // For debugging
	};
}

Using with Dependency Injection

SSE endpoints work seamlessly with Dwex's dependency injection:

import { Controller, Sse, Query } from "@dwex/core";
import { Injectable } from "@dwex/core";

@Injectable()
export class NotificationService {
	async *subscribe(userId: string) {
		// Your notification logic
		while (true) {
			const notification = await this.getNextNotification(userId);
			if (notification) yield notification;
			await new Promise((r) => setTimeout(r, 1000));
		}
	}
}

@Controller("notifications")
export class NotificationsController {
	constructor(private notificationService: NotificationService) {}

	@Sse("stream")
	async *streamNotifications(@Query("userId") userId: string) {
		yield { data: { message: "Connected" }, event: "connected" };

		for await (const notification of this.notificationService.subscribe(
			userId,
		)) {
			yield {
				data: notification,
				event: "notification",
				id: notification.id,
			};
		}
	}
}

SseStream API Reference

The SseStream class provides an imperative API for SSE:

Methods

send(event: SseEvent): void

Send an SSE event to the client:

stream.send({
	data: { message: "Hello" },
	event: "greeting",
	id: "1",
});

sendData(data: any): void

Send just the data without other SSE fields:

stream.sendData({ status: "ok", count: 42 });

sendComment(comment: string): void

Send a keep-alive comment:

stream.sendComment("keep-alive");

close(): void

Close the stream:

stream.close();

onClose(callback: () => void): void

Register a callback for when the stream closes:

const interval = setInterval(() => {
	stream.send({ data: "ping" });
}, 1000);

stream.onClose(() => {
	clearInterval(interval);
	console.log("Client disconnected");
});

isClosed(): boolean

Check if the stream is closed:

if (!stream.isClosed()) {
	stream.send({ data: "Still open" });
}

Examples

Countdown Timer

Stream a countdown from 10 to 1:

@Controller("events")
export class EventsController {
	@Sse("countdown")
	async *countdown() {
		for (let i = 10; i >= 1; i--) {
			yield {
				data: { count: i },
				event: "countdown",
				id: String(10 - i + 1),
			};
			await new Promise((resolve) => setTimeout(resolve, 1000));
		}

		yield {
			data: { message: "Countdown complete!" },
			event: "complete",
		};
	}
}

Live Server Metrics

Stream server metrics every second:

import { SseStream } from "@dwex/common";

@Controller("monitoring")
export class MonitoringController {
	@Sse("metrics")
	streamMetrics() {
		const stream = new SseStream();

		const interval = setInterval(async () => {
			const metrics = {
				cpu: process.cpuUsage(),
				memory: process.memoryUsage(),
				uptime: process.uptime(),
			};

			stream.send({
				data: metrics,
				event: "metrics",
				id: String(Date.now()),
			});

			// Send keep-alive every 30 seconds
			if (Date.now() % 30000 < 1000) {
				stream.sendComment("keep-alive");
			}
		}, 1000);

		stream.onClose(() => clearInterval(interval));

		return stream;
	}
}

Event-Driven Updates

Stream updates from an event emitter:

import { EventEmitter } from "node:events";

@Injectable()
export class ActivityService extends EventEmitter {
	trackActivity(userId: string, activity: string) {
		this.emit("activity", { userId, activity, timestamp: Date.now() });
	}
}

@Controller("activity")
export class ActivityController {
	constructor(private activityService: ActivityService) {}

	@Sse("stream")
	streamActivity() {
		const stream = new SseStream();

		const handler = (activity: any) => {
			stream.send({
				data: activity,
				event: "activity",
				id: String(activity.timestamp),
			});
		};

		this.activityService.on("activity", handler);

		stream.onClose(() => {
			this.activityService.off("activity", handler);
		});

		return stream;
	}
}

Progress Tracking

Stream job progress updates:

@Controller("jobs")
export class JobsController {
	@Sse("progress/:jobId")
	async *trackProgress(@Param("jobId") jobId: string) {
		yield { data: { status: "started", progress: 0 }, event: "start" };

		for (let progress = 0; progress <= 100; progress += 10) {
			yield {
				data: { jobId, progress, status: "running" },
				event: "progress",
				id: `${jobId}-${progress}`,
			};
			await new Promise((resolve) => setTimeout(resolve, 500));
		}

		yield {
			data: { jobId, progress: 100, status: "completed" },
			event: "complete",
		};
	}
}

Client-Side Usage

JavaScript/TypeScript

const eventSource = new EventSource("http://localhost:9929/events/updates");

// Listen to default messages
eventSource.onmessage = (event) => {
	const data = JSON.parse(event.data);
	console.log("Message:", data);
};

// Listen to specific event types
eventSource.addEventListener("notification", (event) => {
	const notification = JSON.parse(event.data);
	console.log("Notification:", notification);
});

// Handle connection open
eventSource.onopen = () => {
	console.log("Connection opened");
};

// Handle errors
eventSource.onerror = (error) => {
	console.error("EventSource error:", error);
	eventSource.close();
};

// Close connection
// eventSource.close();

React Hook Example

import { useEffect, useState } from "react";

function useSSE<T>(url: string) {
	const [data, setData] = useState<T | null>(null);
	const [error, setError] = useState<Error | null>(null);

	useEffect(() => {
		const eventSource = new EventSource(url);

		eventSource.onmessage = (event) => {
			setData(JSON.parse(event.data));
		};

		eventSource.onerror = (err) => {
			setError(new Error("SSE connection error"));
			eventSource.close();
		};

		return () => {
			eventSource.close();
		};
	}, [url]);

	return { data, error };
}

// Usage
function Dashboard() {
	const { data, error } = useSSE("http://localhost:9929/events/metrics");

	if (error) return <div>Error: {error.message}</div>;
	if (!data) return <div>Connecting...</div>;

	return <div>Metrics: {JSON.stringify(data)}</div>;
}

cURL Testing

Test SSE endpoints with curl:

# Basic SSE stream
curl -N http://localhost:9929/events/countdown

# With headers
curl -N -H "Authorization: Bearer token" http://localhost:9929/events/stream

Best Practices

Keep-Alive Comments

Send periodic comments to prevent connection timeouts:

@Sse("updates")
streamUpdates() {
	const stream = new SseStream();

	// Send keep-alive every 15 seconds
	const keepAlive = setInterval(() => {
		stream.sendComment("keep-alive");
	}, 15000);

	stream.onClose(() => clearInterval(keepAlive));

	return stream;
}

Error Handling

Handle errors gracefully in async generators:

@Sse("data")
async *streamData() {
	try {
		yield { data: { status: "started" }, event: "start" };

		for await (const item of this.dataService.stream()) {
			yield { data: item, event: "data" };
		}
	} catch (error) {
		yield {
			data: { error: error.message },
			event: "error",
		};
	} finally {
		yield { data: { status: "ended" }, event: "end" };
	}
}

Resource Cleanup

Always clean up resources when the stream closes:

@Sse("events")
streamEvents() {
	const stream = new SseStream();
	const subscription = this.eventService.subscribe();

	stream.onClose(() => {
		subscription.unsubscribe();
		console.log("Stream closed, resources cleaned up");
	});

	subscription.on("event", (event) => {
		stream.send({ data: event });
	});

	return stream;
}

Event IDs and Reconnection

Provide event IDs to enable automatic reconnection:

@Sse("updates")
async *streamUpdates(@Headers("last-event-id") lastEventId?: string) {
	let startId = lastEventId ? parseInt(lastEventId) : 0;

	for (let i = startId; i < 100; i++) {
		yield {
			data: { count: i },
			id: String(i), // Client can reconnect from this point
			retry: 3000, // Retry after 3 seconds
		};
		await new Promise((r) => setTimeout(r, 1000));
	}
}

Working with Guards and Interceptors

SSE endpoints work with Guards and Interceptors:

import { UseGuards } from "@dwex/core";
import { AuthGuard } from "./auth.guard";

@Controller("private")
export class PrivateEventsController {
	@Sse("notifications")
	@UseGuards(AuthGuard) // Protected SSE endpoint
	async *streamNotifications(@Req() req: any) {
		const userId = req.user.id;

		for await (const notification of this.getNotifications(userId)) {
			yield { data: notification, event: "notification" };
		}
	}
}

Protocol Details

SSE uses a simple text-based protocol over HTTP:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache, no-transform
Connection: keep-alive
X-Accel-Buffering: no

event: greeting
id: 1
data: {"message":"Hello"}

event: update
id: 2
retry: 3000
data: {"count":42}

: This is a comment

data: Simple message without event type

Dwex automatically sets the required headers:

  • Content-Type: text/event-stream
  • Cache-Control: no-cache, no-transform
  • Connection: keep-alive
  • X-Accel-Buffering: no (disables buffering in nginx)

Troubleshooting

Connection Closes Immediately

If connections close immediately, check:

  1. Buffering: Ensure nginx/proxies don't buffer SSE responses
  2. Timeouts: Configure appropriate timeout values
  3. Keep-alive: Send periodic comments to keep the connection alive

Events Not Received

If clients don't receive events:

  1. Format: Ensure events end with \n\n (handled automatically by Dwex)
  2. Flush: Data is sent immediately (Bun handles this automatically)
  3. CORS: Configure CORS if accessing from different domains

Memory Leaks

Prevent memory leaks by:

  1. Always using stream.onClose() to clean up resources
  2. Clearing intervals/timers when streams close
  3. Unsubscribing from event emitters
  4. Limiting the number of concurrent connections if needed

Comparison with WebSockets

FeatureSSEWebSockets
DirectionUnidirectional (server → client)Bidirectional
ProtocolHTTPWebSocket protocol
ReconnectionAutomaticManual
Browser SupportExcellentExcellent
Proxies/FirewallsWorks through HTTP proxiesMay be blocked
ComplexitySimpleMore complex
Use CaseServer pushes, live feedsReal-time chat, gaming

Choose SSE when you only need server-to-client communication. Use WebSockets when you need bidirectional communication.