Persistence Query
Das PersistenceQuery-Interface ist die Read-Side-API für das
Journal. So konsumieren Projektionen Events; es ist auch das
Interface für jeden One-Shot-Read historischer Events.
Zwei Varianten:
| Variante | Gibt zurück | Verwendung |
|---|---|---|
current* | Promise<Event[]> | Batch-Read von Events zur Aufrufzeit. Wird einmal aufgelöst. |
events* | AsyncIterable<Event> | Live-Stream. Pollt, bis der Konsument stoppt. |
Implementierungen
Abschnitt betitelt „Implementierungen“Jedes Journal hat eine passende Query:
| Query | Paart sich mit |
|---|---|
InMemoryQuery | InMemoryJournal |
SqliteQuery | SqliteJournal |
CassandraQuery | CassandraJournal |
Das Interface ist über alle hinweg gleich — konstruiere einfach die passende Query für dein Journal-Backend.
import { SqliteQuery } from 'actor-ts';
const query = new SqliteQuery({ path: '/var/lib/events.db' });Die vier Methoden
Abschnitt betitelt „Die vier Methoden“currentEventsByPersistenceId
Abschnitt betitelt „currentEventsByPersistenceId“const events = await query.currentEventsByPersistenceId<AccountEvent>( 'account-42', /* fromSeq */ 1, /* toSeq? */ 100,);One-Shot-Read jedes Events für eine Entity, zwischen den
Sequenznummern fromSeq und toSeq (inklusiv).
Verwende für:
- Audit-Replays — hole jedes Event für einen User.
- Einmalige Backfills — verarbeite Events von einer Entity in ein separates System.
eventsByPersistenceId
Abschnitt betitelt „eventsByPersistenceId“for await (const event of query.eventsByPersistenceId<AccountEvent>('account-42', 1)) { console.log(event.event.kind, event.sequenceNr); if (someCondition) break; // stoppt das Polling}Live-Stream. Yieldet zuerst vergangene Events (chronologisch nach
sequenceNr), dann neue Events, sobald sie angehängt werden. Der
Stream beendet sich nie von selbst — breake aus der Schleife,
um zu stoppen.
Verwende für:
- Live-Per-Entity-Views — ein Echtzeit-Feed der Aktivität eines Users.
- Single-Entity-Projektionen, die ad hoc gebaut werden statt
über
ProjectionActor.byPersistenceId.
currentEventsByTag
Abschnitt betitelt „currentEventsByTag“const events = await query.currentEventsByTag<Event>( 'account', offsetStart,);One-Shot-Read jedes Events, das zu einem Tag passt (oder Tag-Filter), ab einem Offset.
Gibt [Event, Offset][]-Paare zurück — der Offset ist das, was du
einem folgenden Aufruf weitergeben würdest, um den nächsten Batch
zu lesen.
Verwende für:
- Berichte — summiere alle
account-Events des aktuellen Monats. - Batch-ETL — periodischer Export aller getaggten Events.
eventsByTag
Abschnitt betitelt „eventsByTag“for await (const [event, offset] of query.eventsByTag<Event>('account', offsetStart)) { await handle(event); await offsetStore.save('my-projection', offset);}Live-Stream tag-gematchter Events. Das ist das, was
ProjectionActor.byTag intern verwendet — der Offset lässt die
Projektion ihren Fortschritt persistieren und von dort
fortsetzen.
Offsets
Abschnitt betitelt „Offsets“type Offset = { readonly timestamp: number; readonly persistenceId: string; readonly sequenceNr: number;};Compound: (timestamp, persistenceId, sequenceNr). Das ist die
Reihenfolge, in der Events herausströmen.
timestamp— die Schreibzeit des Journals für das Event.persistenceId— Tiebreaker für Events mit demselben Timestamp.sequenceNr— finaler Tiebreaker für Events in derselben pid.
Die Compound-Reihenfolge bedeutet, dass Events aus verschiedenen Entities nach Timestamp verschachtelt werden, während Events aus derselben Entity in ihrer ursprünglichen Sequenz bleiben.
offsetStart ist die Sentinel-Konstante “vom Anfang.” Persistiere
den Offset, der zusammen mit jedem Event zurückgegeben wird; setze
beim Neustart vom persistierten Offset fort.
Tag-Filter
Abschnitt betitelt „Tag-Filter“import type { TagFilter } from 'actor-ts';
// Alle Events, die mindestens einen dieser Tags haben:query.eventsByTag<E>({ any: ['account', 'audit'] }, offset);
// Events, die ALLE diese Tags haben:query.eventsByTag<E>({ all: ['account', 'production'] }, offset);
// AND über zwei Gruppen:query.eventsByTag<E>({ all: ['account'], any: ['debit', 'credit'],}, offset);Für einfache Fälle funktioniert ein einzelner Tag-String:
query.eventsByTag<E>('account', offset); // äquivalent zu { all: ['account'] }Polling
Abschnitt betitelt „Polling“const stream = query.eventsByTag<E>('account', offset, { pollIntervalMs: 500, // Default 1000});Live-Streams pollen das Journal. Konfigurierbar über
LiveQueryOptions:
| Option | Default | Was |
|---|---|---|
pollIntervalMs | 1000 | Wie oft auf neue Events geprüft wird. |
batchSize | Journal-spezifisch | Wie viele Events pro Poll geholt werden. |
Für Sub-Poll-Intervall-Delivery siehe Push-basierte Query.
Wann direkt verwenden vs. über ProjectionActor
Abschnitt betitelt „Wann direkt verwenden vs. über ProjectionActor“// Manuelle Iteration — volle Kontrolle:for await (const [event, offset] of query.eventsByTag(...)) { // ... benutzerdefinierte Fehlerbehandlung, benutzerdefiniertes Dispatching ...}
// ProjectionActor — supervised, neustartbar, offset-managed:ProjectionActor.byTag({ ... });ProjectionActor wickelt die Query-API ein mit:
- Offset-Persistenz (du verwaltest sie nicht).
- Supervised-Restart bei Handler-Fehlern.
- Bewusstsein für koordiniertes Shutdown.
- Konfigurierbarem Retry / Backoff.
Für Produktions-Projektionen verwende ProjectionActor. Für
Einmal-Skripte und Ad-hoc-Admin-Queries verwende die
Query-API direkt.
Performance-Hinweise
Abschnitt betitelt „Performance-Hinweise“current*-Reads scannen das Journal einmal, geben ein materialisiertes Array zurück. Für große pids / viele Tag-Matches kann das langsam und speicherintensiv sein. Verwende die streamendenevents*-Varianten für unbegrenzte Reads.- Live-Polling ist im Idle leicht — leere Queries kehren
schnell zurück. Unter hoher Schreiblast kann sich das Polling
stauen, wenn die Read-Seite langsamer ist als die Write-Seite.
Tune
batchSize+ parallelisiere den Handler, wenn du Lag siehst. - Tag-Indexes sind wichtig — der
events_tags-Index ist das, was By-Tag-Queries schnell macht. Das SQLite-Journal erstellt diesen automatisch.
Wie geht’s weiter
Abschnitt betitelt „Wie geht’s weiter“- Persistenz im Überblick — das größere Bild.
- Projektionen — der Higher-Level-Wrapper um die Query-API.
- Push-basierte Query — für Sub-Poll-Intervall-Live-Delivery.
- Journals — die Write-Seite, die die Query füttert.
Die PersistenceQuery-API-Referenz
deckt die vollständige Oberfläche ab.