Zum Inhalt springen
Deutsch

Persistence Query

Das PersistenceQuery-Interface ist die Read-Side-API für das Journal. So konsumieren Projektionen Events; es ist auch das Interface für jeden One-Shot-Read historischer Events.

Zwei Varianten:

VarianteGibt zurückVerwendung
current*Promise<Event[]>Batch-Read von Events zur Aufrufzeit. Wird einmal aufgelöst.
events*AsyncIterable<Event>Live-Stream. Pollt, bis der Konsument stoppt.

Jedes Journal hat eine passende Query:

QueryPaart sich mit
InMemoryQueryInMemoryJournal
SqliteQuerySqliteJournal
CassandraQueryCassandraJournal

Das Interface ist über alle hinweg gleich — konstruiere einfach die passende Query für dein Journal-Backend.

import { SqliteQuery } from 'actor-ts';
const query = new SqliteQuery({ path: '/var/lib/events.db' });
const events = await query.currentEventsByPersistenceId<AccountEvent>(
'account-42',
/* fromSeq */ 1,
/* toSeq? */ 100,
);

One-Shot-Read jedes Events für eine Entity, zwischen den Sequenznummern fromSeq und toSeq (inklusiv).

Verwende für:

  • Audit-Replays — hole jedes Event für einen User.
  • Einmalige Backfills — verarbeite Events von einer Entity in ein separates System.
for await (const event of query.eventsByPersistenceId<AccountEvent>('account-42', 1)) {
console.log(event.event.kind, event.sequenceNr);
if (someCondition) break; // stoppt das Polling
}

Live-Stream. Yieldet zuerst vergangene Events (chronologisch nach sequenceNr), dann neue Events, sobald sie angehängt werden. Der Stream beendet sich nie von selbst — breake aus der Schleife, um zu stoppen.

Verwende für:

  • Live-Per-Entity-Views — ein Echtzeit-Feed der Aktivität eines Users.
  • Single-Entity-Projektionen, die ad hoc gebaut werden statt über ProjectionActor.byPersistenceId.
const events = await query.currentEventsByTag<Event>(
'account',
offsetStart,
);

One-Shot-Read jedes Events, das zu einem Tag passt (oder Tag-Filter), ab einem Offset.

Gibt [Event, Offset][]-Paare zurück — der Offset ist das, was du einem folgenden Aufruf weitergeben würdest, um den nächsten Batch zu lesen.

Verwende für:

  • Berichte — summiere alle account-Events des aktuellen Monats.
  • Batch-ETL — periodischer Export aller getaggten Events.
for await (const [event, offset] of query.eventsByTag<Event>('account', offsetStart)) {
await handle(event);
await offsetStore.save('my-projection', offset);
}

Live-Stream tag-gematchter Events. Das ist das, was ProjectionActor.byTag intern verwendet — der Offset lässt die Projektion ihren Fortschritt persistieren und von dort fortsetzen.

type Offset = {
readonly timestamp: number;
readonly persistenceId: string;
readonly sequenceNr: number;
};

Compound: (timestamp, persistenceId, sequenceNr). Das ist die Reihenfolge, in der Events herausströmen.

  • timestamp — die Schreibzeit des Journals für das Event.
  • persistenceId — Tiebreaker für Events mit demselben Timestamp.
  • sequenceNr — finaler Tiebreaker für Events in derselben pid.

Die Compound-Reihenfolge bedeutet, dass Events aus verschiedenen Entities nach Timestamp verschachtelt werden, während Events aus derselben Entity in ihrer ursprünglichen Sequenz bleiben.

offsetStart ist die Sentinel-Konstante “vom Anfang.” Persistiere den Offset, der zusammen mit jedem Event zurückgegeben wird; setze beim Neustart vom persistierten Offset fort.

import type { TagFilter } from 'actor-ts';
// Alle Events, die mindestens einen dieser Tags haben:
query.eventsByTag<E>({ any: ['account', 'audit'] }, offset);
// Events, die ALLE diese Tags haben:
query.eventsByTag<E>({ all: ['account', 'production'] }, offset);
// AND über zwei Gruppen:
query.eventsByTag<E>({
all: ['account'],
any: ['debit', 'credit'],
}, offset);

Für einfache Fälle funktioniert ein einzelner Tag-String:

query.eventsByTag<E>('account', offset); // äquivalent zu { all: ['account'] }
const stream = query.eventsByTag<E>('account', offset, {
pollIntervalMs: 500, // Default 1000
});

Live-Streams pollen das Journal. Konfigurierbar über LiveQueryOptions:

OptionDefaultWas
pollIntervalMs1000Wie oft auf neue Events geprüft wird.
batchSizeJournal-spezifischWie viele Events pro Poll geholt werden.

Für Sub-Poll-Intervall-Delivery siehe Push-basierte Query.

// Manuelle Iteration — volle Kontrolle:
for await (const [event, offset] of query.eventsByTag(...)) {
// ... benutzerdefinierte Fehlerbehandlung, benutzerdefiniertes Dispatching ...
}
// ProjectionActor — supervised, neustartbar, offset-managed:
ProjectionActor.byTag({ ... });

ProjectionActor wickelt die Query-API ein mit:

  • Offset-Persistenz (du verwaltest sie nicht).
  • Supervised-Restart bei Handler-Fehlern.
  • Bewusstsein für koordiniertes Shutdown.
  • Konfigurierbarem Retry / Backoff.

Für Produktions-Projektionen verwende ProjectionActor. Für Einmal-Skripte und Ad-hoc-Admin-Queries verwende die Query-API direkt.

  • current*-Reads scannen das Journal einmal, geben ein materialisiertes Array zurück. Für große pids / viele Tag-Matches kann das langsam und speicherintensiv sein. Verwende die streamenden events*-Varianten für unbegrenzte Reads.
  • Live-Polling ist im Idle leicht — leere Queries kehren schnell zurück. Unter hoher Schreiblast kann sich das Polling stauen, wenn die Read-Seite langsamer ist als die Write-Seite. Tune batchSize + parallelisiere den Handler, wenn du Lag siehst.
  • Tag-Indexes sind wichtig — der events_tags-Index ist das, was By-Tag-Queries schnell macht. Das SQLite-Journal erstellt diesen automatisch.

Die PersistenceQuery-API-Referenz deckt die vollständige Oberfläche ab.