PersistentActor
PersistentActor<Cmd, Event, State> ist die Event-Sourcing-Basisklasse
des Frameworks. Das Modell:
- Commands kommen rein.
- Der Command-Handler validiert und entscheidet, welche Fakten passiert sind — diese Fakten sind Events, die ins Journal persistiert werden.
- Nach der Persistenz aktualisiert ein reiner Event-Handler den State.
Beim Start liest das Framework das Journal und spielt jedes Event durch den Event-Handler ab. Der resultierende State ist das, was der Actor bei seinem ersten Command sieht — wo auch immer die letzte Instanz aufgehört hat, diese macht weiter.
Ein minimales Beispiel
Abschnitt betitelt „Ein minimales Beispiel“import { PersistentActor, ActorSystem, Props } from 'actor-ts';import { InMemoryJournal, InMemorySnapshotStore } from 'actor-ts';import { match } from 'ts-pattern';
type Cmd = | { kind: 'deposit'; amount: number } | { kind: 'withdraw'; amount: number };
type Event = | { kind: 'deposited'; amount: number; ts: number } | { kind: 'withdrawn'; amount: number; ts: number };
interface State { balance: number; }
class Account extends PersistentActor<Cmd, Event, State> { readonly persistenceId = 'account-42';
initialState(): State { return { balance: 0 }; }
// Rein: state + event → neuer state. Läuft sowohl bei Persist als auch bei Recovery. onEvent(state: State, e: Event): State { return match(e) .with({ kind: 'deposited' }, (ev) => ({ balance: state.balance + ev.amount })) .with({ kind: 'withdrawn' }, (ev) => ({ balance: state.balance - ev.amount })) .exhaustive(); }
// Ein Command behandeln — validieren, bei Gültigkeit persistieren. onCommand(state: State, cmd: Cmd): void { match(cmd) .with({ kind: 'deposit' }, (c) => { this.persist( { kind: 'deposited', amount: c.amount, ts: Date.now() }, (next) => { /* Seiteneffekte mit dem neuen State */ }, ); }) .with({ kind: 'withdraw' }, (c) => { if (state.balance < c.amount) { // Ablehnen — nichts persistieren. this.log.warn(`insufficient funds`); return; } this.persist( { kind: 'withdrawn', amount: c.amount, ts: Date.now() }, () => {}, ); }) .exhaustive(); }}
// Setup — Journal + Snapshot-Store inline via die Create-Option verdrahten:const system = ActorSystem.create('demo', { persistence: { journal: new InMemoryJournal(), snapshotStore: new InMemorySnapshotStore(), },});
const account = system.spawn(Props.create(() => new Account()), 'account-42');account.tell({ kind: 'deposit', amount: 100 });account.tell({ kind: 'withdraw', amount: 30 });// state ist { balance: 70 } — und bleibt das über Neustarts hinweg.Die drei abstrakten Methoden
Abschnitt betitelt „Die drei abstrakten Methoden“Jede PersistentActor-Subklasse implementiert drei Methoden:
initialState()
Abschnitt betitelt „initialState()“Der State, bevor irgendwelche Events da sind. Wird am Anfang der
Recovery aufgerufen, und das Ergebnis ist das, worauf onEvent
aufbaut.
initialState(): State { return { balance: 0 };}onEvent(state, event) → newState
Abschnitt betitelt „onEvent(state, event) → newState“Rein — keine Seiteneffekte, keine tells, keine awaits. Nur
state + event → state. Diese Funktion läuft sowohl:
- Beim Replay — einmal pro persistiertem Event, wenn der Actor startet und das Journal abgespielt wird.
- Nach dem Persist — einmal, wenn ein neues Event landet.
Reinheit ist wichtig, weil dieselben Events über die Lebensdauer des
Actors viele Male abgespielt werden. Ein Seiteneffekt in onEvent
läuft bei jeder Recovery, was duplizierte E-Mails / duplizierte
HTTP-Calls / alles dupliziert bedeutet.
onCommand(state, cmd) → void | Promise<void>
Abschnitt betitelt „onCommand(state, cmd) → void | Promise<void>“Validiert das Command gegen den aktuellen State und entscheidet, welche Events persistiert werden. Drei gültige Ergebnisse:
this.persist(event, cb)— ein Event persistieren. Der Callback läuft mit dem neuen State, sobald das Event angehängt und peronEventangewendet ist. Seiteneffekte gehören hierher.- Antworten ohne Persist — für Read-only-Commands (z. B.
{ kind: 'get-balance' }) greifst du aufstatezu und schickst die Antwort direkt pertell. - Ablehnen — loggen, ignorieren oder mit einem Fehler antworten. Keine Events werden geschrieben.
onCommand(state: State, cmd: Cmd): void { if (cmd.kind === 'get-balance') { cmd.replyTo.tell(state.balance); // read-only — kein Persist return; } if (cmd.kind === 'withdraw') { if (state.balance < cmd.amount) return; // ablehnen this.persist({ kind: 'withdrawn', amount: cmd.amount, ts: Date.now() }, () => {}); }}Der persist-Callback
Abschnitt betitelt „Der persist-Callback“this.persist(event, (newState) => { // 1. Das Event wurde ins Journal geschrieben // 2. onEvent wurde aufgerufen; this.state und `newState` spiegeln es wider // 3. Es ist sicher, hier Seiteneffekte auszuführen this.sender.forEach(s => s.tell({ ok: true, balance: newState.balance }));});Drei Garantien, die der Callback dir gibt:
- Das Event ist durable — wenn der Prozess danach abstürzt, hat das Journal es trotzdem. Die nächste Recovery nimmt es auf.
- Der State spiegelt es wider —
onEventist gelaufen;this.stateist der neue State. - Commands werden während des Persist gestasht — eingehende Commands warten, bis der Persist abgeschlossen ist und sein Callback feuert. Keine Verschachtelung.
Der dritte Punkt macht persist sicher als
“Transaktions-Grenze” für die Command-Verarbeitung. Seiteneffekte
(Antworten, Benachrichtigungen, Follow-up-Tells) gehören in den
Callback.
Recovery
Abschnitt betitelt „Recovery“Wenn der Actor startet:
preStart() läuft: → neuesten Snapshot laden (falls vorhanden) → state, seqNr setzen → Events aus dem Journal lesen ab seqNr+1 → für jedes Event: state = onEvent(state, event) → onRecoveryComplete(state) → bereit, das erste Command zu verarbeitenWährend die Recovery läuft, werden keine Commands verarbeitet. Die Mailbox füllt sich; sobald die Recovery fertig ist, arbeitet der Actor sie der Reihe nach gegen den wiederhergestellten State ab.
onRecoveryComplete(state) ist ein optionaler Hook, der nach dem
Abspielen des letzten Events feuert. Verwende ihn für einmaliges
Post-Recovery-Setup (Watchers registrieren, verwandte Actors
beschaffen, etc.) — aber nicht für Seiteneffekte pro Event, die bei
jedem Neustart duplizieren würden.
Persistenz-ID
Abschnitt betitelt „Persistenz-ID“Jeder PersistentActor deklariert eine persistenceId:
class Account extends PersistentActor<...> { readonly persistenceId = 'account-42';}Die ID identifiziert den Event-Stream im Journal. Zwei Actors
mit derselben persistenceId würden sich das gleiche Event-Log
teilen — meistens ein Bug.
Für Per-Entity-Actors (ein Account pro User, ein Cart pro User) mache die ID abhängig von der Entity:
class Account extends PersistentActor<...> { constructor(public readonly userId: string) { super(); } readonly persistenceId = `account-${this.userId}`;}Für sharded Entities übergibt die Shard-Region typischerweise die Entity-ID per Konstruktor, und die Persistenz-ID leitet sich daraus ab.
Snapshots
Abschnitt betitelt „Snapshots“100 000 Events beim Start abspielen ist langsam. Konfiguriere eine Snapshot-Policy:
import { everyNEvents } from 'actor-ts';
class Account extends PersistentActor<...> { override snapshotPolicy() { return everyNEvents(100); } // Nach jeweils 100 Events wird der aktuelle State gesnappshottet.}Das Framework schreibt einen Snapshot über den Snapshot Store; bei der Recovery lädt es den Snapshot zuerst und spielt nur Events nach der seqNr dieses Snapshots ab.
everyNEvents(N) ist der häufige Fall. Für individuelle Policies
(Snapshot bei einer bestimmten Event-Art, zeitbasiert),
implementiere:
override snapshotPolicy() { return (seqNr, state, event) => event.kind === 'finalized';}Siehe Snapshots für die vollständige Konfiguration.
Events für Projektionen taggen
Abschnitt betitelt „Events für Projektionen taggen“class Account extends PersistentActor<...> { override tagsFor(event: Event): ReadonlyArray<string> | undefined { return ['account']; // oder basierend auf der Event-Art }}Projektionen lesen Events aus
dem Journal nach Tag. Ein Event zu taggen macht es entdeckbar
für eine Read-Side-View, die 'account'-Events abonniert.
undefined zurückzugeben (der Default) bedeutet “keine Tags” —
in Ordnung, wenn du noch keine Projektionen hast.
Schema-Evolution — Event-Adapter
Abschnitt betitelt „Schema-Evolution — Event-Adapter“Wenn sich Event-Formen über die Zeit ändern (ein Feld wird umbenannt, ein Wert wird gesplittet, ein Enum wird hinzugefügt), bleiben alte Events für immer im Journal. Der Event-Adapter rüstet sie beim Lesen auf:
import { EventAdapter } from 'actor-ts';
class V1ToV2Adapter implements EventAdapter<EventV2> { upcast(stored: unknown, version: number): EventV2 { if (version === 1) return migrate(stored as EventV1); return stored as EventV2; }}
class Account extends PersistentActor<...> { override eventAdapter() { return new V1ToV2Adapter(); }}Mit einem konfigurierten Adapter wird jedes Event in einem
{ _v, _t, _e }-Envelope (Version + Typ + Payload) zur Persist-Zeit
verpackt und beim Lesen über adapter.upcast(stored, version)
entpackt. Rückwärtskompatibilität ist die Verantwortung des Actors.
Siehe Migration im Überblick für die vollständige Migrations-Story.
Häufige Stolperfallen
Abschnitt betitelt „Häufige Stolperfallen“Wie geht’s weiter
Abschnitt betitelt „Wie geht’s weiter“- Persistenz im Überblick — das größere Bild: PersistentActor vs. DurableStateActor.
- Snapshots — Replay-Fenster-Reduktion im Detail.
- Projektionen — Read-Side-Views über den Event-Stream.
- Journals — SQLite — Produktions-Journal für Single-Node.
- Migration im Überblick — Event-Schemas weiterentwickeln.
- Replicated Event Sourcing — Multi-Writer Event Sourcing für Cluster-Setups.
Die PersistentActor-API-Referenz
deckt die vollständige Basisklassen-Oberfläche ab.