Voice sample
The voice sample is a streaming-audio app showing:
- Kafka integration — audio chunks flow in via Kafka topic.
- Sharded session actors — one per active voice session.
- PersistentActor — transcript persisted as events.
- Projection — read-side view for transcript search.
- WebSocket — live transcript streaming to the client.
Find it under examples/voice/
in the repo.
Architecture
Section titled “Architecture” Audio source (microphone, file) │ │ chunks via Kafka ▼ KafkaActor (consumer) │ │ routes by session ID ▼ VoiceSessionActor (sharded, persistent) │ ├── persists transcript chunks as events │ ├── pushes transcript updates to client via WS │ └── publishes to projection topic │ ▼ TranscriptProjection │ └── builds searchable index (e.g., in SQL)Running it
Section titled “Running it”git clone https://github.com/pathosDev/actor-ts.gitcd actor-ts/examples/voice
docker compose up -d# Spins up: 3 actor-ts nodes, Kafka, Zookeeper, Cassandra, Nginx
# Stream audio (mock client):bun run client/mock-stream.ts ./test-audio.wav
# Watch transcripts:open http://localhost:3000The transcripts appear live in the UI as audio chunks process.
Key patterns demonstrated
Section titled “Key patterns demonstrated”Broker integration
Section titled “Broker integration”const kafka = system.actorOf( Props.create(() => new KafkaActor({ brokers: ['kafka:9092'], consumer: { groupId: 'voice-processor', topics: ['audio-chunks'] }, })),);
kafka.tell({ kind: 'subscribe', subscriber: dispatchActor });A broker actor consumes audio chunks; a dispatch actor routes them to the right sharded session.
Sharded + persistent session
Section titled “Sharded + persistent session”class VoiceSessionActor extends PersistentActor<SessionCmd, SessionEvent, SessionState> { readonly persistenceId = `voice-${this.sessionId}`;
override onEvent(state, event) { if (event.kind === 'chunk-transcribed') { return { ...state, transcript: state.transcript + event.text }; } return state; }
async onCommand(state, cmd: SessionCmd) { if (cmd.kind === 'audio-chunk') { const text = await transcribe(cmd.audioBytes); this.persist({ kind: 'chunk-transcribed', text, ts: Date.now() }, () => { // Push to WS client this.context.system.eventStream.publish(new TranscriptUpdate(this.sessionId, text)); }); } }
override tagsFor(event: SessionEvent) { return event.kind === 'chunk-transcribed' ? ['transcript'] : undefined; }}Notice:
- Sharded — one actor per session ID, distributed across nodes.
- Persistent — transcripts survive restart + failover.
- Tagged —
'transcript'events feed the projection.
Projection for search
Section titled “Projection for search”const projection = ProjectionActor.byTag<SessionEvent>({ name: 'transcript-search', tag: 'transcript', query: new CassandraQuery({ /* ... */ }), offsetStore: new SqliteOffsetStore({ path: '/var/lib/offsets.db' }), async handle(event) { if (event.event.kind === 'chunk-transcribed') { await searchDb.execute( `INSERT INTO transcripts (sessionId, text, ts) VALUES (?, ?, ?) ON CONFLICT (sessionId, ts) DO NOTHING`, [event.persistenceId, event.event.text, event.event.ts], ); } },});
system.actorOf(Props.create(() => projection));The projection reads the journal’s 'transcript'-tagged events
and writes to a search-optimized table — separate from the
session journal.
WebSocket pushes
Section titled “WebSocket pushes”// In the HTTP server, on WS upgrade:const session = sharding.entityRefFor('voice', sessionId);session.tell({ kind: 'subscribe-ws', subscriber: wsBridge });The session actor pushes transcript updates to subscribed WS clients — multiple clients can subscribe (multiple devices watching the same call).
What it doesn’t demonstrate
Section titled “What it doesn’t demonstrate”- Cluster-singleton coordinators — voice sessions don’t need a leader; they’re per-session sharded.
- DistributedData — transcript state belongs to a single session; no replicated state needed.
- Complex routing — straightforward 1:1 audio chunk → session routing.
For those see the stand-alone snippets or the chat sample.
File layout
Section titled “File layout”examples/voice/├── docker-compose.yml├── README.md├── package.json├── src/│ ├── main.ts # entry│ ├── actors/│ │ ├── VoiceSessionActor.ts # sharded + persistent│ │ ├── DispatchActor.ts # routes Kafka msgs│ │ └── TranscriptProjection.ts # read-side│ ├── messages.ts│ ├── transcribe.ts # mock transcription│ └── handlers/│ └── wsBridge.ts├── client/│ └── mock-stream.ts└── ui/~700 lines. Slightly larger than chat; the broker integration adds setup.
Real-world adaptation
Section titled “Real-world adaptation”For a production voice-pipeline app:
- Replace mock
transcribe()with a real ASR (Whisper, Deepgram). - Add the management endpoint + metrics for ops visibility.
- Replace the search projection’s SQL with your actual search backend (Elasticsearch, OpenSearch).
- Add auth (the mock skips it).
The skeleton is reusable for any streaming-input + per-stream state + read-side index workload — payments, IoT, log ingestion, etc.
Where to next
Section titled “Where to next”- Chat sample — cluster
- pubsub.
- Stand-alone snippets — bite-sized examples.
- Kafka — the broker actor.
- PersistentActor — event-sourced sessions.
- Projections — read-side views.