Skip to main content
S2 currently has official SDKs for the following languages: The REST API is probably the easiest way to get started in a language that does not have an SDK yet.

Sessions

Append Session

An append session is a long-lived connection for high-throughput, ordered appends with client-driven backpressure.
const session = await stream.appendSession({
  maxInflightBytes: 10 * 1024 * 1024,  // 10 MiB
  maxInflightBatches: 100,              // Optional
});

// Submit records (applies backpressure when at capacity)
const ack = await session.submit([
  AppendRecord.make("message-1"),
  AppendRecord.make("message-2"),
]);

console.log("Appended:", ack.start.seq_num, "to", ack.end.seq_num);

// Graceful shutdown
await session.close();

Append Session Options

OptionDefaultDescription
maxInflightBytes10 MiBMaximum total bytes of unacknowledged batches. Controls memory usage and backpressure.
maxInflightBatchesNoneMaximum number of unacknowledged batches. Set explicitly to limit.

Session Methods

MethodDescription
submit()Enqueue a batch of records. Waits when inflight capacity is exhausted. Returns when the server acknowledges.
close()Gracefully close the session after all pending batches are acknowledged.
lastAckedPosition()Returns the position of the last acknowledged batch, if any.

Batching

Append session doesn’t batch records automatically. For batching based on time, size, or record count, use the opt-in batching helpers:
BatchTransform is a TransformStream that collects records and flushes them as batches:
import { BatchTransform, AppendRecord } from "@s2-dev/streamstore";

const batcher = new BatchTransform({
  lingerMs: 5,           // Flush after 5ms of inactivity
  maxBatchRecords: 100,  // Or after 100 records
  maxBatchBytes: 256 * 1024,  // Or after 256 KiB
});

const session = await stream.appendSession();
const pipePromise = batcher.readable.pipeTo(session.writable);

// Pipe through batcher and session
const writer = batcher.writable.getWriter();

await writer.write(AppendRecord.make("record-1"));
await writer.write(AppendRecord.make("record-2"));
// Records are automatically batched and flushed

await writer.close();

await pipePromise;

Batching Options

OptionDefaultDescription
lingerMs5msTime to wait before flushing a partial batch.
maxBatchRecords1000Maximum records per batch (max: 1000).
maxBatchBytes1 MiBMaximum batch size in bytes (max: 1 MiB).
match_seq_numNoneOptional starting sequence number (auto-increments per batch).
fencing_tokenNoneOptional fencing token applied to all batches.

Read Session

A read session provides an iterator for consuming records from a stream with automatic reconnection on transient failures.
const session = await stream.readSession({
  seq_num: 0,
  count: 1000,
});

for await (const record of session) {
  console.log("Record:", record.seq_num, record.body);
}

// Get position info
const nextPos = session.nextReadPosition();
const tail = session.lastObservedTail();

Session Methods

TypeScript read sessions implement AsyncIterable, so you consume records with for await...of:
MethodDescription
for await (const record of session)Iterate over records. Ends when done or on fatal error.
nextReadPosition()Returns the sequence number where reading will resume.
lastObservedTail()Returns the last known tail position of the stream.

Retry Logic

SDKs automatically retry transient failures.

Retry Configuration

const s2 = new S2({
  accessToken: process.env.S2_ACCESS_TOKEN!,
  retryConfig: {
    maxAttempts: 3,
    retryBackoffDurationMillis: 100,
    appendRetryPolicy: "all",
    requestTimeoutMillis: 5000,
  },
});

Configuration Options

OptionDefaultDescription
maxAttempts3Total number of attempts including the initial try. A value of 1 means no retries.
retryBackoff100msBase delay between retry attempts.
appendRetryPolicy"all"Policy for retrying append operations. See Append Retry Policies.
requestTimeout5sMaximum time to wait for a response per attempt.

Retryable Errors

SDKs automatically retry requests that fail with these HTTP status codes:
Status CodeNameDescription
408Request TimeoutThe request took too long to complete.
429Too Many RequestsRate limit exceeded.
500Internal Server ErrorUnexpected server error.
502Bad GatewayUpstream service unavailable.
503Service UnavailableServer temporarily unavailable.
Additionally, network errors (connection resets, DNS failures, etc.) are also retried.
Client errors (4xx except 408/429) are not retried as they indicate issues that won’t be resolved by retrying (e.g., validation errors, authentication failures).

Backoff Strategy

SDKs use a fixed backoff with jitter:
delay = baseDelay * random(0.5, 1.5)

Append Retry Policies

Append operations require special consideration because retrying a partially-completed append could cause duplicate records.
PolicyBehavior
all (default)Retry all append operations. Use when duplicates are acceptable.
noSideEffectsOnly retry appends that include matchSeqNum. These are idempotent because the server rejects duplicates.
Use matchSeqNum when you need exactly-once semantics. The server will reject the append if records already exist at that sequence number, making retries safe.

Example: Safe Retries with Conditional Append

const tail = await stream.checkTail();

const ack = await stream.append({
  records: [AppendRecord.make("important-message")],
  match_seq_num: tail.tail.seq_num,
});