Documentation Index
Fetch the complete documentation index at: https://s2.dev/docs/llms.txt
Use this file to discover all available pages before exploring further.
Read
At the simplest level, you can read a batch of records with a single call:
TypeScript
Python
Go
Rust
const batch = await stream.read({
start: { from: { seqNum: 0 } },
stop: { limits: { count: 100 } },
});
for (const record of batch.records) {
console.log(`[${record.seqNum}] ${record.body}`);
}
batch = await stream.read(
start=SeqNum(0),
limit=ReadLimit(count=100),
)
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
batch, _ := stream.Read(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Count: s2.Uint64(100),
})
for _, record := range batch.Records {
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
let batch = stream
.read(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_limits(ReadLimits::new().with_count(100))),
)
.await?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
Unary reads return at most one batch: up to 1000 records or 1 MiB of data. For larger reads or to follow the stream in real-time, use a read session.
Navigating a stream
Every record in a stream is accessible by its sequence number, timestamp, or offset from the current tail.
This is the case for both unary and streaming reads.
Streaming reads that do not specify a stop condition will continue to follow updates in real-time.
Starting from a sequence number
TypeScript
Python
Go
Rust
const session = await stream.readSession({
start: { from: { seqNum: 0 } },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
async for batch in stream.read_session(start=SeqNum(0)):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
readSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
})
defer readSession.Close()
for readSession.Next() {
record := readSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
if err := readSession.Err(); err != nil {
log.Fatal(err)
}
let mut session = stream
.read_session(ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0))))
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Starting from a tail offset
Read the last N records in the stream, then follow for new ones:
TypeScript
Python
Go
Rust
// Start reading from 10 records before the current tail
const session = await stream.readSession({
start: { from: { tailOffset: 10 } },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
# Start reading from 10 records before the current tail
async for batch in stream.read_session(start=TailOffset(10)):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
// Start reading from 10 records before the current tail
tailOffsetSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
TailOffset: s2.Int64(10),
})
defer tailOffsetSession.Close()
for tailOffsetSession.Next() {
record := tailOffsetSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Start reading from 10 records before the current tail
let mut session = stream
.read_session(
ReadInput::new().with_start(ReadStart::new().with_from(ReadFrom::TailOffset(10))),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Starting from a timestamp
Read records starting from a point in time:
TypeScript
Python
Go
Rust
// Start reading from a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
start: { from: { timestamp: oneHourAgo } },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
# Start reading from a specific timestamp
one_hour_ago_ms = int((time.time() - 3600) * 1000)
async for batch in stream.read_session(start=Timestamp(one_hour_ago_ms)):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
// Start reading from a specific timestamp
oneHourAgo := uint64(time.Now().Add(-time.Hour).UnixMilli())
timestampSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
Timestamp: &oneHourAgo,
})
defer timestampSession.Close()
for timestampSession.Next() {
record := timestampSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Start reading from a specific timestamp
let one_hour_ago = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64
- 3600 * 1000;
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::Timestamp(one_hour_ago))),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Reading until a timestamp
Read records up to a point in time, then stop:
TypeScript
Python
Go
Rust
// Read records until a specific timestamp
const oneHourAgo = new Date(Date.now() - 3600 * 1000);
const session = await stream.readSession({
start: { from: { seqNum: 0 } },
stop: { untilTimestamp: oneHourAgo },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
# Read records until a specific timestamp
one_hour_ago_ms = int((time.time() - 3600) * 1000)
async for batch in stream.read_session(
start=SeqNum(0),
until_timestamp=one_hour_ago_ms,
):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
// Read records until a specific timestamp
oneHourAgo = uint64(time.Now().Add(-time.Hour).UnixMilli())
untilSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Until: &oneHourAgo,
})
defer untilSession.Close()
for untilSession.Next() {
record := untilSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Read records until a specific timestamp
let one_hour_ago = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_millis() as u64
- 3600 * 1000;
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_until(..one_hour_ago)),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Read Session
A read session streams records from a starting position. It handles reconnection on transient failures and provides a simple iterator interface.
Following live updates
By default, a read session without stop conditions will follow the stream indefinitely, waiting for new records as they arrive. When you provide a stop condition (count, bytes, or until), the read stops when either the condition is met or it reaches the current tail—whichever comes first.
See follow live updates in the API docs for the full semantics.
Long polling
The wait parameter controls how long to wait for new records when caught up to the tail. This works for both unary reads (long polling) and sessions.
TypeScript
Python
Go
Rust
// Read all available records, and once reaching the current tail, wait an additional 30 seconds for new ones
const session = await stream.readSession({
start: { from: { seqNum: 0 } },
stop: { waitSecs: 30 },
});
for await (const record of session) {
console.log(`[${record.seqNum}] ${record.body}`);
}
# Read all available records, then wait up to 30 seconds for new ones
async for batch in stream.read_session(
start=SeqNum(0),
wait=30,
):
for record in batch.records:
print(f"[{record.seq_num}] {record.body}")
// Read all available records, and once reaching the current tail, wait an additional 30 seconds for new ones
waitSession, _ := stream.ReadSession(ctx, &s2.ReadOptions{
SeqNum: s2.Uint64(0),
Wait: s2.Int32(30),
})
defer waitSession.Close()
for waitSession.Next() {
record := waitSession.Record()
fmt.Printf("[%d] %s\n", record.SeqNum, string(record.Body))
}
// Read all available records, and once reaching the current tail, wait an additional 30 seconds
// for new ones
let mut session = stream
.read_session(
ReadInput::new()
.with_start(ReadStart::new().with_from(ReadFrom::SeqNum(0)))
.with_stop(ReadStop::new().with_wait(30)),
)
.await?;
while let Some(batch) = session.next().await {
let batch = batch?;
for record in batch.records {
println!("[{}] {:?}", record.seq_num, record.body);
}
}
Check Tail
To find the current end of the stream without reading any records:
TypeScript
Python
Go
Rust
const { tail } = await stream.checkTail();
console.log(`Stream has ${tail.seqNum} records`);
tail = await stream.check_tail()
print(f"Stream has {tail.seq_num} records")
tail, _ := stream.CheckTail(ctx)
fmt.Printf("Stream has %d records\n", tail.Tail.SeqNum)
let tail = stream.check_tail().await?;
println!("Stream has {} records", tail.seq_num);
The tail position tells you the sequence number that will be assigned to the next record appended to the stream.