Zum Inhalt springen
Deutsch

Sharded Daemon Process

Manchmal brauchst du N Hintergrund-Worker, verteilt über den Cluster, jeder mit stabiler Identität:

  • Ein Worker pro Arbeits-Partition (denke Kafka-Consumer-Group-Muster, aber innerhalb des Clusters).
  • N geplante Koordinatoren, einer pro Zeitzone, einer pro Region.
  • N Hintergrund-Reconciler, jeder besitzt eine Scheibe der Work-IDs.

Ein Singleton gibt dir einen davon. Sharding gibt dir einen pro Entity-ID. ShardedDaemonProcess gibt dir genau N, indiziert 0..N-1, gleichmäßig über den Cluster verteilt, automatisch bei Failover neu gespawnt.

import { ActorSystem, Cluster, Actor, Props } from 'actor-ts';
import { ShardedDaemonProcess } from 'actor-ts';
class Reconciler extends Actor<{ kind: 'check' }> {
constructor(public readonly index: number) { super(); }
override preStart(): void {
this.log.info(`Reconciler #${this.index} startet`);
// plane periodische Prüfung, z. B.:
this.context.timers.startTimerWithFixedDelay(
'check', { kind: 'check' }, 60_000,
);
}
override onReceive(msg: { kind: 'check' }): void {
this.log.info(`Reconciler #${this.index} tickt`);
// ... Per-Index-Arbeit
}
}
const system = ActorSystem.create('my-app');
const cluster = await Cluster.join(system, { host, port, seeds });
const handle = ShardedDaemonProcess.init(system, cluster, {
name: 'reconciler',
numDaemons: 8,
behaviorFor: (i) => Props.create(() => new Reconciler(i)),
});
// Jeder der 8 Daemons startet automatisch. Kein weiteres Setup nötig.
// Optional: schicke eine Nachricht an einen spezifischen Index
handle.tell(3, { kind: 'check' });

Drei Dinge passieren bei init:

  1. Das Framework registriert einen sharded Entity-Typ (daemon-reconciler).
  2. Jede Entity-ID ist einfach der Index des Daemons als String ('0', '1', …, '7').
  3. Interne “Wake-up”-Nachrichten werden an jeden Index geschickt, was die Sharding-Maschinerie zwingt, die Entity-Actors auf den vom Koordinator gewählten Nodes zu materialisieren.

Danach laufen alle 8 Daemons, verteilt über Cluster-Nodes, jeder mit seinem stabilen Index sichtbar via Konstruktor-Argument.

interface ShardedDaemonProcessHandle<T> {
readonly region: ActorRef<DaemonEnvelope<T>>;
tell(index: number, message: T): void;
stop(): void;
}
  • region — der darunterliegende Sharding-Region-Ref. Nachrichten, die hier gesendet werden, müssen in { index, body }-Envelopes verpackt sein; nutze stattdessen handle.tell.
  • tell(index, msg) — Convenience zum Senden an Daemon i.
  • stop() — stoppt den Liveness-Heartbeat und die Cluster-Subscription. Stoppt die Daemons selbst nicht — sie laufen weiter, bis der Cluster heruntergefahren wird.

ShardedDaemonProcess ist ein dünner Wrapper um ClusterSharding:

  • typeName: 'daemon-<name>'.
  • extractEntityId: env => env.index.toString().
  • rememberEntities: true — sodass ein Node-Ausfall den Daemon woanders neu spawnt.
  • allocationStrategy: LeastShardAllocationStrategy per Default — verteilt Daemons gleichmäßig.

Du könntest all das manuell mit ClusterSharding.start schreiben. Der Wrapper existiert, um:

  • Dir Boilerplate zu sparen.
  • Bei Init automatisch Wake-up-Nachrichten zu senden, damit Daemons eager materialisieren.
  • LeaderChanged / MemberRemoved zu abonnieren und Wake-ups erneut zu senden, was sicherstellt, dass Daemons, die laufen sollten, auch tatsächlich laufen.
  • Das livenessIntervalMs-Sicherheitsnetz bereitzustellen.
ShardedDaemonProcess.init(system, cluster, {
name: 'reconciler',
numDaemons: 8,
behaviorFor: (i) => Props.create(() => new Reconciler(i)),
livenessIntervalMs: 30_000, // Default — pingt jeden Index alle 30s
});

Periodisch (Default alle 30 s) schickt der Wrapper eine synthetische “Wake-up” an jeden Index. Wenn ein Daemon für irgendeinen Index unerwartet gestoppt wurde (ein Node, der geht + eine kurze Partition genau im falschen Moment), bringt das Wake-up die Sharding-Maschinerie dazu, ihn auf einem aktuellen Up-Mitglied neu zu spawnen.

Das ist doppelt gemoppelt — das meiste Failover regelt Sharding mit seinen eigenen Cluster-Event-Subscriptions. Der Liveness-Ping deckt den Edge Case ab, in dem ein Event verpasst wurde.

Auf 0 setzen deaktiviert es. Nützlich in Tests, in denen du Cluster-Events deterministisch kontrollierst.

interface ShardedDaemonProcessSettings<T> {
name: string; // eindeutiger Daemon-Set-Identifier
numDaemons: number; // Gesamtdaemons über den Cluster
behaviorFor: (i: number) => Props<T>;
role?: string; // auf spezifische Nodes beschränken
livenessIntervalMs?: number; // Sicherheitsnetz-Ping-Takt; Default 30s
}
FeldWas
nameEin eindeutiger Daemon-Set-Name. Zwei init-Aufrufe mit demselben Namen kollidieren.
numDaemonsWie viele Daemons laufen sollen. Bei Init festgelegt; kann ohne Neustart nicht geändert werden.
behaviorFor(i)Gibt Props für Daemon i zurück. Nutze i, um Per-Index-Logik zu unterscheiden.
roleNur Nodes mit dieser Rolle hosten Daemons (durchgereicht an Sharding).
livenessIntervalMsSicherheitsnetz-Ping-Takt; Default 30s.

Drei gute Anwendungen:

  1. Partitionierte HintergrundarbeitN Reconciliation-Jobs, jeder besitzt eine Scheibe der IDs. Modulo oder Hash, um Arbeit auf den Daemon-Index abzubilden.
  2. Verteilte Scheduler — ein Per-Zeitzone-Scheduler, ein Per-Region-Quota-Replenisher. Der Index jedes Daemons mappt auf seinen Scope.
  3. Stream-Processing-Groups — bei Integration mit Kafka-/Broker-Actors kann jeder Daemon ein Consumer-Group-Mitglied sein.
WerkzeugWas
ClusterSingletonGenau 1 Actor clusterweit.
ShardedDaemonProcessGenau N indizierte Actors, verteilt.
ClusterShardingEin Actor pro Key (potenziell unendlich).
ClusterRouterRoutet Nachrichten über vorab existierende Actors auf jedem Node.

“Das richtige Werkzeug nach Kardinalität”: 1 → Singleton, feste N → Daemon-Process, viele (eine pro Business-Entity) → Sharding, “jeder Node” → Cluster-Router.

Die ShardedDaemonProcess API-Referenz deckt die vollständige Oberfläche ab.