Zum Inhalt springen
Deutsch

PostgreSQL

Das Postgres-Backend liefert alle drei Persistence-Komponenten gegen eine einzige PostgreSQL-Datenbank, über den pg-Treiber:

  • PostgresJournal — das Event-Journal für PersistentActors.
  • PostgresSnapshotStore — Snapshots, um die Recovery zu begrenzen.
  • PostgresDurableStateStore — Key-Value-Durable-State für DurableStateActors. Das ist der erste SQL-basierte Durable-State-Store (SQLite und Cassandra liefern nur Journal + Snapshot).

Wie Cassandra ist Postgres über Cluster-Nodes geteilt — jeder Node kann jede persistenceId lesen/schreiben — aber relational, transaktional und mit ziemlicher Sicherheit schon in deinem Stack.

pg ist eine optionale Peer-Dependency — installiere sie neben actor-ts:

Terminal-Fenster
bun add pg

Das Framework lazy-importiert pg erst beim ersten Zugriff auf einen Postgres-Store, hält es also bis zum Opt-in aus deinem Bundle.

Registriere Journal + Snapshot-Store an der PersistenceExtension und erhalte einen einsatzbereiten Durable-State-Store. Gib einen geteilten pool mit, damit alle drei Komponenten einen Connection-Pool nutzen:

import { Pool } from 'pg';
import {
ActorSystem,
PersistenceExtensionId,
registerPostgresPlugins,
} from 'actor-ts';
const system = new ActorSystem({
name: 'my-app',
// Postgres-Plugins als aktives Journal + Snapshot-Store wählen.
config: `
actor-ts.persistence.journal.plugin = "actor-ts.persistence.journal.postgres"
actor-ts.persistence.snapshot-store.plugin = "actor-ts.persistence.snapshot-store.postgres"
`,
});
const ext = system.extension(PersistenceExtensionId);
const { durableStateStore } = registerPostgresPlugins(ext, {
// Ein Pool, geteilt von Journal + Snapshot + Durable-State (empfohlen).
pool: new Pool({ connectionString: 'postgres://user:pass@host:5432/app' }),
journal: { /* eventsTable, tagsTable, autoCreateTables */ },
snapshotStore: { keepN: 3 },
durableStateStore: { /* table */ },
});

registerPostgresPlugins registriert Journal + Snapshot-Store über die Extension (ausgewählt durch die Plugin-IDs oben) und gibt den Durable-State-Store zurück. PersistenceExtension hat kein Durable-State-Registry — reiche durableStateStore direkt in die Settings deines DurableStateActor (gleiches Muster wie beim Object-Storage-Plugin).

Ohne geteilten pool baut jede Komponente lazy ihren eigenen Pool aus url / poolConfig:

registerPostgresPlugins(ext, {
journal: { url: 'postgres://user:pass@host:5432/app' },
snapshotStore: { url: 'postgres://user:pass@host:5432/app', keepN: 3 },
durableStateStore: { url: 'postgres://user:pass@host:5432/app' },
});
  • Du betreibst ohnehin Postgres. Ein bewegliches Teil weniger als ein Cassandra-Aufbau — nutze deine bestehende Managed-Instanz (RDS, Cloud SQL, Supabase, …).
  • Cluster-geteilte Persistence. Geshardete Entities, die zwischen Nodes wandern, oder Cross-Node-Projektionen brauchen ein Journal, das jeder Node lesen kann.
  • Du willst Durable-State in SQL. Der einzige SQL-Durable-State-Store im Framework.

Für Single-Node-Entwicklung ist SqliteJournal einfacher (eine Datei, kein Server).

interface PostgresConnection {
url?: string; // postgres://user:pass@host:5432/db
poolConfig?: Record<string, unknown>; // weitere pg.Pool-Optionen (ssl, max, …)
pool?: PgPoolLike; // vorgebauter / geteilter Pool
}
interface PostgresJournalOptions extends PostgresConnection {
eventsTable?: string; // Default 'events'
tagsTable?: string; // Default '<eventsTable>_tags'
autoCreateTables?: boolean; // Default true
}
interface PostgresSnapshotStoreOptions extends PostgresConnection {
snapshotsTable?: string; // Default 'snapshots'
keepN?: number; // neueste N pro pid; Default 3, <=0 = alle behalten
autoCreateTables?: boolean;
}
interface PostgresDurableStateStoreOptions extends PostgresConnection {
table?: string; // Default 'durable_state'
autoCreateTables?: boolean;
}

Tabellennamen stammen aus der Config (nicht aus User-Input) und werden gegen ein Safe-Identifier-Muster geprüft; alles andere — persistenceIds, Tags, Payloads — geht als Bind-Parameter ($1, $2, …), nie per String-Concat.

Mit autoCreateTables (Default) führt das Backend beim ersten Zugriff CREATE TABLE IF NOT EXISTS aus:

CREATE TABLE events (
persistence_id TEXT NOT NULL,
sequence_nr BIGINT NOT NULL,
payload TEXT NOT NULL, -- JSON
tags TEXT, -- CSV (auch in events_tags gespiegelt)
timestamp BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr)
);
CREATE TABLE events_tags ( -- indizierte Tag-Lookups für Projektionen
persistence_id TEXT NOT NULL,
sequence_nr BIGINT NOT NULL,
tag TEXT NOT NULL,
timestamp BIGINT NOT NULL,
PRIMARY KEY (tag, timestamp, persistence_id, sequence_nr)
);
CREATE TABLE snapshots (
persistence_id TEXT NOT NULL,
sequence_nr BIGINT NOT NULL,
payload TEXT NOT NULL,
timestamp BIGINT NOT NULL,
PRIMARY KEY (persistence_id, sequence_nr)
);
CREATE TABLE durable_state (
persistence_id TEXT PRIMARY KEY,
revision BIGINT NOT NULL,
payload TEXT NOT NULL,
timestamp BIGINT NOT NULL
);

Lege die Tabellen vorab an (und vergib nur INSERT/SELECT/UPDATE/DELETE) und setze autoCreateTables: false, wenn deine DB-Rolle kein DDL darf.

  • Journal-Append führt SELECT MAX(sequence_nr) und die Inserts in EINER Transaktion aus; das expectedSeq des Callers wird gegen den aktuellen Head geprüft. Ein durchrutschender konkurrierender Writer verletzt den Primary-Key-Unique-Constraint (SQLSTATE 23505), was als Backstop in JournalConcurrencyError übersetzt wird.
  • Durable-State-CAS nutzt die revision-Spalte: Anlegen (expectedRevision === 0) ist INSERT … ON CONFLICT DO NOTHING, ein Update ist UPDATE … WHERE revision = expected. Null betroffene Zeilen ⇒ DurableStateConcurrencyError, mit der aktuellen Revision für den Caller zurückgelesen.

BIGINT-Spalten liefert pg als Strings; das Backend coerct sequence_nr / revision / timestamp an der Mapping-Grenze zu number.

  • MariaDB — das Geschwister aus der MySQL-Familie.
  • Cassandra-Journal — verteilt, für Skalierung jenseits einer einzelnen relationalen Instanz.
  • SQLite-Journal — Single-Node, kein Server.
  • Durable State — die State-orientierte Alternative zum Event Sourcing.
  • Snapshots — den Recovery-Scan begrenzen.