Skip to content

KafkaCmd

KafkaCmd = { kind: "publish"; publish: KafkaPublish; } | { kind: "subscribe"; topic: string; } | { kind: "commit"; offset: string; partition: number; topic: string; } | { kind: "nack"; offset: string; partition: number; reason?: string; topic: string; } | { kind: "heartbeat"; offset: string; partition: number; topic: string; }

Defined in: src/io/broker/KafkaActor.ts:88

{ kind: "publish"; publish: KafkaPublish; }


{ kind: "subscribe"; topic: string; }


{ kind: "commit"; offset: string; partition: number; topic: string; }

Commit the offset for a message that was delivered in commitMode: 'manual' mode. The pump’s eachMessage promise resolves; kafkajs commits offset + 1 and reads the next message on this partition. Sending commit outside manual-commit mode is silently ignored.


{ kind: "nack"; offset: string; partition: number; reason?: string; topic: string; }

Negative-acknowledge a manual-commit message — the offset is not committed and the eachMessage promise rejects, so kafkajs treats the partition as having failed: on rebalance / restart the same offset will be re-delivered. The optional reason shows up in the actor’s warn log.


{ kind: "heartbeat"; offset: string; partition: number; topic: string; }

Bump the consumer’s session-deadline mid-processing (#78). commitMode: 'manual' pauses the eachMessage pump until the handler ack’s; if the handler runs longer than the consumer’s sessionTimeoutMs (kafkajs default 30 s) the broker evicts the member, the partition rebalances, and the message is re-delivered after the rebalance settles.

heartbeat invokes the captured kafkajs heartbeat() callback for the still-pending record, which restarts the session clock without touching the offset. Send it periodically from any handler that’s likely to exceed sessionTimeoutMs / 3; the withAutoHeartbeat() helper schedules it for you. Heartbeats for an unknown / already-committed record are a silent no-op.