Skip to content

Persistence query

The PersistenceQuery interface is the read-side API for the journal. It’s how projections consume events; it’s also the interface for any one-shot read of historical events.

Two flavors:

FlavorReturnsUse
current*Promise<Event[]>Batch read of events at call time. Resolves once.
events*AsyncIterable<Event>Live stream. Polls until the consumer stops.

Each journal has a matching query:

QueryPairs with
InMemoryQueryInMemoryJournal
SqliteQuerySqliteJournal
CassandraQueryCassandraJournal

The interface is the same across all of them — just construct the matching query for your journal backend.

import { SqliteQuery } from 'actor-ts';
const query = new SqliteQuery({ path: '/var/lib/events.db' });
const events = await query.currentEventsByPersistenceId<AccountEvent>(
'account-42',
/* fromSeq */ 1,
/* toSeq? */ 100,
);

One-shot read of every event for one entity, between sequence numbers fromSeq and toSeq (inclusive).

Use for:

  • Audit replays — get every event for one user.
  • One-off backfills — process events from one entity into a separate system.
for await (const event of query.eventsByPersistenceId<AccountEvent>('account-42', 1)) {
console.log(event.event.kind, event.sequenceNr);
if (someCondition) break; // stops the polling
}

Live stream. Yields past events first (chronological by sequenceNr), then new events as they’re appended. The stream never completes on its own — break out of the loop to stop.

Use for:

  • Live per-entity views — a real-time feed of one user’s activity.
  • Single-entity projections built ad-hoc rather than via ProjectionActor.byPersistenceId.
const events = await query.currentEventsByTag<Event>(
'account',
offsetStart,
);

One-shot read of every event matching a tag (or tag filter), starting from an offset.

Returns [Event, Offset][] pairs — the offset is what you’d pass to a subsequent call to read the next batch.

Use for:

  • Reports — sum all account events for the current month.
  • Batch ETL — periodic export of all tagged events.
for await (const [event, offset] of query.eventsByTag<Event>('account', offsetStart)) {
await handle(event);
await offsetStore.save('my-projection', offset);
}

Live stream of tag-matched events. This is what ProjectionActor.byTag uses internally — the offset lets the projection persist its progress and resume from there.

type Offset = {
readonly timestamp: number;
readonly persistenceId: string;
readonly sequenceNr: number;
};

Compound: (timestamp, persistenceId, sequenceNr). This is the order events stream out.

  • timestamp — the journal’s write time for the event.
  • persistenceId — tiebreaker for events with the same timestamp.
  • sequenceNr — final tiebreaker for events in the same pid.

The compound order means events from different entities interleave by timestamp, while events from the same entity stay in their original sequence.

offsetStart is the sentinel “from the beginning.” Persist the offset returned alongside each event; resume from the persisted offset on restart.

import type { TagFilter } from 'actor-ts';
// All events that have at least one of these tags:
query.eventsByTag<E>({ any: ['account', 'audit'] }, offset);
// Events that have ALL these tags:
query.eventsByTag<E>({ all: ['account', 'production'] }, offset);
// AND across two groups:
query.eventsByTag<E>({
all: ['account'],
any: ['debit', 'credit'],
}, offset);

For simple cases, a single tag string works:

query.eventsByTag<E>('account', offset); // equivalent to { all: ['account'] }
const stream = query.eventsByTag<E>('account', offset, {
pollIntervalMs: 500, // default 1000
});

Live streams poll the journal. Configurable via LiveQueryOptions:

OptionDefaultWhat
pollIntervalMs1000How often to check for new events.
batchSizejournal-specificHow many events to fetch per poll.

For sub-poll-interval delivery, see Push-based query.

When to use it directly vs via ProjectionActor

Section titled “When to use it directly vs via ProjectionActor”
// Manual iteration — full control:
for await (const [event, offset] of query.eventsByTag(...)) {
// ... custom error handling, custom dispatching ...
}
// ProjectionActor — supervised, restartable, offset-managed:
ProjectionActor.byTag({ ... });

ProjectionActor wraps the query API with:

  • Offset persistence (you don’t manage it).
  • Supervised restart on handler errors.
  • Coordinated shutdown awareness.
  • Configurable retry / backoff.

For production projections, use ProjectionActor. For one-off scripts and ad-hoc admin queries, use the query API directly.

  • current* reads scan the journal once, return a materialized array. For large pids / many tag-matches, this can be slow + memory-heavy. Use the streaming events* variants for unbounded reads.
  • Live polling is light at idle — empty queries return quickly. Under heavy write load, polling can pile up if the read side is slower than the write side. Tune batchSize + parallelize the handler if you see lag.
  • Tag indexes matterevents_tags index is what makes by-tag queries fast. The SQLite journal creates this automatically.

The PersistenceQuery API reference covers the full surface.