S2 currently has official SDKs for the following languages:
The REST API is probably the easiest way to get started in a language that does not have an SDK yet.
Sessions
Append Session
An append session is a long-lived connection for high-throughput, ordered appends with client-driven backpressure.
const session = await stream.appendSession({
maxInflightBytes: 10 * 1024 * 1024, // 10 MiB
maxInflightBatches: 100, // Optional
});
// Submit records (applies backpressure when at capacity)
const ack = await session.submit([
AppendRecord.make("message-1"),
AppendRecord.make("message-2"),
]);
console.log("Appended:", ack.start.seq_num, "to", ack.end.seq_num);
// Graceful shutdown
await session.close();
Append Session Options
| Option | Default | Description |
|---|
maxInflightBytes | 10 MiB | Maximum total bytes of unacknowledged batches. Controls memory usage and backpressure. |
maxInflightBatches | None | Maximum number of unacknowledged batches. Set explicitly to limit. |
Session Methods
| Method | Description |
|---|
submit() | Enqueue a batch of records. Waits when inflight capacity is exhausted. Returns when the server acknowledges. |
close() | Gracefully close the session after all pending batches are acknowledged. |
lastAckedPosition() | Returns the position of the last acknowledged batch, if any. |
Batching
Append session doesn’t batch records automatically. For batching based on time, size, or record count, use the opt-in batching helpers:
BatchTransform is a TransformStream that collects records and flushes them as batches:import { BatchTransform, AppendRecord } from "@s2-dev/streamstore";
const batcher = new BatchTransform({
lingerMs: 5, // Flush after 5ms of inactivity
maxBatchRecords: 100, // Or after 100 records
maxBatchBytes: 256 * 1024, // Or after 256 KiB
});
const session = await stream.appendSession();
const pipePromise = batcher.readable.pipeTo(session.writable);
// Pipe through batcher and session
const writer = batcher.writable.getWriter();
await writer.write(AppendRecord.make("record-1"));
await writer.write(AppendRecord.make("record-2"));
// Records are automatically batched and flushed
await writer.close();
await pipePromise;
Batching Options
| Option | Default | Description |
|---|
lingerMs | 5ms | Time to wait before flushing a partial batch. |
maxBatchRecords | 1000 | Maximum records per batch (max: 1000). |
maxBatchBytes | 1 MiB | Maximum batch size in bytes (max: 1 MiB). |
match_seq_num | None | Optional starting sequence number (auto-increments per batch). |
fencing_token | None | Optional fencing token applied to all batches. |
Read Session
A read session provides an iterator for consuming records from a stream with automatic reconnection on transient failures.
const session = await stream.readSession({
seq_num: 0,
count: 1000,
});
for await (const record of session) {
console.log("Record:", record.seq_num, record.body);
}
// Get position info
const nextPos = session.nextReadPosition();
const tail = session.lastObservedTail();
Session Methods
TypeScript read sessions implement AsyncIterable, so you consume records with for await...of:| Method | Description |
|---|
for await (const record of session) | Iterate over records. Ends when done or on fatal error. |
nextReadPosition() | Returns the sequence number where reading will resume. |
lastObservedTail() | Returns the last known tail position of the stream. |
Retry Logic
SDKs automatically retry transient failures.
Retry Configuration
const s2 = new S2({
accessToken: process.env.S2_ACCESS_TOKEN!,
retryConfig: {
maxAttempts: 3,
retryBackoffDurationMillis: 100,
appendRetryPolicy: "all",
requestTimeoutMillis: 5000,
},
});
Configuration Options
| Option | Default | Description |
|---|
maxAttempts | 3 | Total number of attempts including the initial try. A value of 1 means no retries. |
retryBackoff | 100ms | Base delay between retry attempts. |
appendRetryPolicy | "all" | Policy for retrying append operations. See Append Retry Policies. |
requestTimeout | 5s | Maximum time to wait for a response per attempt. |
Retryable Errors
SDKs automatically retry requests that fail with these HTTP status codes:
| Status Code | Name | Description |
|---|
| 408 | Request Timeout | The request took too long to complete. |
| 429 | Too Many Requests | Rate limit exceeded. |
| 500 | Internal Server Error | Unexpected server error. |
| 502 | Bad Gateway | Upstream service unavailable. |
| 503 | Service Unavailable | Server temporarily unavailable. |
Additionally, network errors (connection resets, DNS failures, etc.) are also retried.
Client errors (4xx except 408/429) are not retried as they indicate issues that won’t be resolved by retrying (e.g., validation errors, authentication failures).
Backoff Strategy
SDKs use a fixed backoff with jitter:
delay = baseDelay * random(0.5, 1.5)
Append Retry Policies
Append operations require special consideration because retrying a partially-completed append could cause duplicate records.
| Policy | Behavior |
|---|
all (default) | Retry all append operations. Use when duplicates are acceptable. |
noSideEffects | Only retry appends that include matchSeqNum. These are idempotent because the server rejects duplicates. |
Use matchSeqNum when you need exactly-once semantics. The server will reject the append if records already exist at that sequence number, making retries safe.
Example: Safe Retries with Conditional Append
const tail = await stream.checkTail();
const ack = await stream.append({
records: [AppendRecord.make("important-message")],
match_seq_num: tail.tail.seq_num,
});