Persistence query
The PersistenceQuery interface is the read-side API for the
journal. It’s how projections consume events; it’s also the
interface for any one-shot read of historical events.
Two flavors:
| Flavor | Returns | Use |
|---|---|---|
current* | Promise<Event[]> | Batch read of events at call time. Resolves once. |
events* | AsyncIterable<Event> | Live stream. Polls until the consumer stops. |
Implementations
Section titled “Implementations”Each journal has a matching query:
| Query | Pairs with |
|---|---|
InMemoryQuery | InMemoryJournal |
SqliteQuery | SqliteJournal |
CassandraQuery | CassandraJournal |
The interface is the same across all of them — just construct the matching query for your journal backend.
import { SqliteQuery } from 'actor-ts';
const query = new SqliteQuery({ path: '/var/lib/events.db' });The four methods
Section titled “The four methods”currentEventsByPersistenceId
Section titled “currentEventsByPersistenceId”const events = await query.currentEventsByPersistenceId<AccountEvent>( 'account-42', /* fromSeq */ 1, /* toSeq? */ 100,);One-shot read of every event for one entity, between sequence
numbers fromSeq and toSeq (inclusive).
Use for:
- Audit replays — get every event for one user.
- One-off backfills — process events from one entity into a separate system.
eventsByPersistenceId
Section titled “eventsByPersistenceId”for await (const event of query.eventsByPersistenceId<AccountEvent>('account-42', 1)) { console.log(event.event.kind, event.sequenceNr); if (someCondition) break; // stops the polling}Live stream. Yields past events first (chronological by
sequenceNr), then new events as they’re appended. The stream
never completes on its own — break out of the loop to stop.
Use for:
- Live per-entity views — a real-time feed of one user’s activity.
- Single-entity projections built ad-hoc rather than via
ProjectionActor.byPersistenceId.
currentEventsByTag
Section titled “currentEventsByTag”const events = await query.currentEventsByTag<Event>( 'account', offsetStart,);One-shot read of every event matching a tag (or tag filter), starting from an offset.
Returns [Event, Offset][] pairs — the offset is what you’d
pass to a subsequent call to read the next batch.
Use for:
- Reports — sum all
accountevents for the current month. - Batch ETL — periodic export of all tagged events.
eventsByTag
Section titled “eventsByTag”for await (const [event, offset] of query.eventsByTag<Event>('account', offsetStart)) { await handle(event); await offsetStore.save('my-projection', offset);}Live stream of tag-matched events. This is what ProjectionActor.byTag
uses internally — the offset lets the projection persist its
progress and resume from there.
Offsets
Section titled “Offsets”type Offset = { readonly timestamp: number; readonly persistenceId: string; readonly sequenceNr: number;};Compound: (timestamp, persistenceId, sequenceNr). This is the
order events stream out.
timestamp— the journal’s write time for the event.persistenceId— tiebreaker for events with the same timestamp.sequenceNr— final tiebreaker for events in the same pid.
The compound order means events from different entities interleave by timestamp, while events from the same entity stay in their original sequence.
offsetStart is the sentinel “from the beginning.” Persist the
offset returned alongside each event; resume from the persisted
offset on restart.
Tag filters
Section titled “Tag filters”import type { TagFilter } from 'actor-ts';
// All events that have at least one of these tags:query.eventsByTag<E>({ any: ['account', 'audit'] }, offset);
// Events that have ALL these tags:query.eventsByTag<E>({ all: ['account', 'production'] }, offset);
// AND across two groups:query.eventsByTag<E>({ all: ['account'], any: ['debit', 'credit'],}, offset);For simple cases, a single tag string works:
query.eventsByTag<E>('account', offset); // equivalent to { all: ['account'] }Polling
Section titled “Polling”const stream = query.eventsByTag<E>('account', offset, { pollIntervalMs: 500, // default 1000});Live streams poll the journal. Configurable via LiveQueryOptions:
| Option | Default | What |
|---|---|---|
pollIntervalMs | 1000 | How often to check for new events. |
batchSize | journal-specific | How many events to fetch per poll. |
For sub-poll-interval delivery, see Push-based query.
When to use it directly vs via ProjectionActor
Section titled “When to use it directly vs via ProjectionActor”// Manual iteration — full control:for await (const [event, offset] of query.eventsByTag(...)) { // ... custom error handling, custom dispatching ...}
// ProjectionActor — supervised, restartable, offset-managed:ProjectionActor.byTag({ ... });ProjectionActor wraps the query API with:
- Offset persistence (you don’t manage it).
- Supervised restart on handler errors.
- Coordinated shutdown awareness.
- Configurable retry / backoff.
For production projections, use ProjectionActor. For
one-off scripts and ad-hoc admin queries, use the query
API directly.
Performance notes
Section titled “Performance notes”current*reads scan the journal once, return a materialized array. For large pids / many tag-matches, this can be slow + memory-heavy. Use the streamingevents*variants for unbounded reads.- Live polling is light at idle — empty queries return
quickly. Under heavy write load, polling can pile up if the
read side is slower than the write side. Tune
batchSize+ parallelize the handler if you see lag. - Tag indexes matter —
events_tagsindex is what makes by-tag queries fast. The SQLite journal creates this automatically.
Where to next
Section titled “Where to next”- Persistence overview — the bigger picture.
- Projections — the high-level wrapper around the query API.
- Push-based query — for sub-poll-interval live delivery.
- Journals — the write side feeding the query.
The PersistenceQuery
API reference covers the full surface.