Zum Inhalt springen
Deutsch

PersistentActor

PersistentActor<Cmd, Event, State> ist die Event-Sourcing-Basisklasse des Frameworks. Das Modell:

  • Commands kommen rein.
  • Der Command-Handler validiert und entscheidet, welche Fakten passiert sind — diese Fakten sind Events, die ins Journal persistiert werden.
  • Nach der Persistenz aktualisiert ein reiner Event-Handler den State.

Beim Start liest das Framework das Journal und spielt jedes Event durch den Event-Handler ab. Der resultierende State ist das, was der Actor bei seinem ersten Command sieht — wo auch immer die letzte Instanz aufgehört hat, diese macht weiter.

import { PersistentActor, ActorSystem, Props } from 'actor-ts';
import { InMemoryJournal, InMemorySnapshotStore } from 'actor-ts';
import { match } from 'ts-pattern';
type Cmd =
| { kind: 'deposit'; amount: number }
| { kind: 'withdraw'; amount: number };
type Event =
| { kind: 'deposited'; amount: number; ts: number }
| { kind: 'withdrawn'; amount: number; ts: number };
interface State { balance: number; }
class Account extends PersistentActor<Cmd, Event, State> {
readonly persistenceId = 'account-42';
initialState(): State { return { balance: 0 }; }
// Rein: state + event → neuer state. Läuft sowohl bei Persist als auch bei Recovery.
onEvent(state: State, e: Event): State {
return match(e)
.with({ kind: 'deposited' }, (ev) => ({ balance: state.balance + ev.amount }))
.with({ kind: 'withdrawn' }, (ev) => ({ balance: state.balance - ev.amount }))
.exhaustive();
}
// Ein Command behandeln — validieren, bei Gültigkeit persistieren.
onCommand(state: State, cmd: Cmd): void {
match(cmd)
.with({ kind: 'deposit' }, (c) => {
this.persist(
{ kind: 'deposited', amount: c.amount, ts: Date.now() },
(next) => { /* Seiteneffekte mit dem neuen State */ },
);
})
.with({ kind: 'withdraw' }, (c) => {
if (state.balance < c.amount) {
// Ablehnen — nichts persistieren.
this.log.warn(`insufficient funds`);
return;
}
this.persist(
{ kind: 'withdrawn', amount: c.amount, ts: Date.now() },
() => {},
);
})
.exhaustive();
}
}
// Setup — Journal + Snapshot-Store inline via die Create-Option verdrahten:
const system = ActorSystem.create('demo', {
persistence: {
journal: new InMemoryJournal(),
snapshotStore: new InMemorySnapshotStore(),
},
});
const account = system.spawn(Props.create(() => new Account()), 'account-42');
account.tell({ kind: 'deposit', amount: 100 });
account.tell({ kind: 'withdraw', amount: 30 });
// state ist { balance: 70 } — und bleibt das über Neustarts hinweg.

Jede PersistentActor-Subklasse implementiert drei Methoden:

Der State, bevor irgendwelche Events da sind. Wird am Anfang der Recovery aufgerufen, und das Ergebnis ist das, worauf onEvent aufbaut.

initialState(): State {
return { balance: 0 };
}

Rein — keine Seiteneffekte, keine tells, keine awaits. Nur state + event → state. Diese Funktion läuft sowohl:

  • Beim Replay — einmal pro persistiertem Event, wenn der Actor startet und das Journal abgespielt wird.
  • Nach dem Persist — einmal, wenn ein neues Event landet.

Reinheit ist wichtig, weil dieselben Events über die Lebensdauer des Actors viele Male abgespielt werden. Ein Seiteneffekt in onEvent läuft bei jeder Recovery, was duplizierte E-Mails / duplizierte HTTP-Calls / alles dupliziert bedeutet.

Validiert das Command gegen den aktuellen State und entscheidet, welche Events persistiert werden. Drei gültige Ergebnisse:

  • this.persist(event, cb) — ein Event persistieren. Der Callback läuft mit dem neuen State, sobald das Event angehängt und per onEvent angewendet ist. Seiteneffekte gehören hierher.
  • Antworten ohne Persist — für Read-only-Commands (z. B. { kind: 'get-balance' }) greifst du auf state zu und schickst die Antwort direkt per tell.
  • Ablehnen — loggen, ignorieren oder mit einem Fehler antworten. Keine Events werden geschrieben.
onCommand(state: State, cmd: Cmd): void {
if (cmd.kind === 'get-balance') {
cmd.replyTo.tell(state.balance); // read-only — kein Persist
return;
}
if (cmd.kind === 'withdraw') {
if (state.balance < cmd.amount) return; // ablehnen
this.persist({ kind: 'withdrawn', amount: cmd.amount, ts: Date.now() },
() => {});
}
}
this.persist(event, (newState) => {
// 1. Das Event wurde ins Journal geschrieben
// 2. onEvent wurde aufgerufen; this.state und `newState` spiegeln es wider
// 3. Es ist sicher, hier Seiteneffekte auszuführen
this.sender.forEach(s => s.tell({ ok: true, balance: newState.balance }));
});

Drei Garantien, die der Callback dir gibt:

  1. Das Event ist durable — wenn der Prozess danach abstürzt, hat das Journal es trotzdem. Die nächste Recovery nimmt es auf.
  2. Der State spiegelt es wideronEvent ist gelaufen; this.state ist der neue State.
  3. Commands werden während des Persist gestasht — eingehende Commands warten, bis der Persist abgeschlossen ist und sein Callback feuert. Keine Verschachtelung.

Der dritte Punkt macht persist sicher als “Transaktions-Grenze” für die Command-Verarbeitung. Seiteneffekte (Antworten, Benachrichtigungen, Follow-up-Tells) gehören in den Callback.

Wenn der Actor startet:

preStart() läuft:
→ neuesten Snapshot laden (falls vorhanden) → state, seqNr setzen
→ Events aus dem Journal lesen ab seqNr+1
→ für jedes Event: state = onEvent(state, event)
→ onRecoveryComplete(state)
→ bereit, das erste Command zu verarbeiten

Während die Recovery läuft, werden keine Commands verarbeitet. Die Mailbox füllt sich; sobald die Recovery fertig ist, arbeitet der Actor sie der Reihe nach gegen den wiederhergestellten State ab.

onRecoveryComplete(state) ist ein optionaler Hook, der nach dem Abspielen des letzten Events feuert. Verwende ihn für einmaliges Post-Recovery-Setup (Watchers registrieren, verwandte Actors beschaffen, etc.) — aber nicht für Seiteneffekte pro Event, die bei jedem Neustart duplizieren würden.

Jeder PersistentActor deklariert eine persistenceId:

class Account extends PersistentActor<...> {
readonly persistenceId = 'account-42';
}

Die ID identifiziert den Event-Stream im Journal. Zwei Actors mit derselben persistenceId würden sich das gleiche Event-Log teilen — meistens ein Bug.

Für Per-Entity-Actors (ein Account pro User, ein Cart pro User) mache die ID abhängig von der Entity:

class Account extends PersistentActor<...> {
constructor(public readonly userId: string) { super(); }
readonly persistenceId = `account-${this.userId}`;
}

Für sharded Entities übergibt die Shard-Region typischerweise die Entity-ID per Konstruktor, und die Persistenz-ID leitet sich daraus ab.

100 000 Events beim Start abspielen ist langsam. Konfiguriere eine Snapshot-Policy:

import { everyNEvents } from 'actor-ts';
class Account extends PersistentActor<...> {
override snapshotPolicy() { return everyNEvents(100); }
// Nach jeweils 100 Events wird der aktuelle State gesnappshottet.
}

Das Framework schreibt einen Snapshot über den Snapshot Store; bei der Recovery lädt es den Snapshot zuerst und spielt nur Events nach der seqNr dieses Snapshots ab.

everyNEvents(N) ist der häufige Fall. Für individuelle Policies (Snapshot bei einer bestimmten Event-Art, zeitbasiert), implementiere:

override snapshotPolicy() {
return (seqNr, state, event) => event.kind === 'finalized';
}

Siehe Snapshots für die vollständige Konfiguration.

class Account extends PersistentActor<...> {
override tagsFor(event: Event): ReadonlyArray<string> | undefined {
return ['account']; // oder basierend auf der Event-Art
}
}

Projektionen lesen Events aus dem Journal nach Tag. Ein Event zu taggen macht es entdeckbar für eine Read-Side-View, die 'account'-Events abonniert.

undefined zurückzugeben (der Default) bedeutet “keine Tags” — in Ordnung, wenn du noch keine Projektionen hast.

Wenn sich Event-Formen über die Zeit ändern (ein Feld wird umbenannt, ein Wert wird gesplittet, ein Enum wird hinzugefügt), bleiben alte Events für immer im Journal. Der Event-Adapter rüstet sie beim Lesen auf:

import { EventAdapter } from 'actor-ts';
class V1ToV2Adapter implements EventAdapter<EventV2> {
upcast(stored: unknown, version: number): EventV2 {
if (version === 1) return migrate(stored as EventV1);
return stored as EventV2;
}
}
class Account extends PersistentActor<...> {
override eventAdapter() { return new V1ToV2Adapter(); }
}

Mit einem konfigurierten Adapter wird jedes Event in einem { _v, _t, _e }-Envelope (Version + Typ + Payload) zur Persist-Zeit verpackt und beim Lesen über adapter.upcast(stored, version) entpackt. Rückwärtskompatibilität ist die Verantwortung des Actors.

Siehe Migration im Überblick für die vollständige Migrations-Story.

Die PersistentActor-API-Referenz deckt die vollständige Basisklassen-Oberfläche ab.