Skip to main content

MQTT Gateway Overview

MQTT Gateway

The MQTT Gateway is a Node.js protocol bridge that sits between ESP32 Cheeko devices and the LiveKit WebRTC cloud. Devices communicate using MQTT over TCP and raw UDP; the gateway translates those into LiveKit room connections and WebRTC audio tracks so the AI agents running in livekit-server can talk to children in real time.

MQTT (control messages)

ESP32 Device ──publish──► EMQX Broker ──republish──► internal/server-ingest ──► MQTTGateway
ESP32 Device ◄──subscribe── EMQX Broker ◄──publish── devices/p2p/<clientId> ◄── MQTTGateway
(gateway sends directly, no republish)

UDP (audio)

ESP32 Device ◄──► MQTTGateway  (AES-128-CTR encrypted Opus, bidirectional)

Per-device internal structure

MQTTGateway
└── VirtualMQTTConnection (one per connected device)
├── LiveKitBridge ──► LiveKit Cloud Room ◄──► AI Agent (livekit-server)
└── UDP crypto layer ◄──► ESP32 Device (encrypted audio)

Module Layer Table

LayerDirectoryFilesResponsibility
Entry point/app.jsEnvironment validation, Opus init, MQTTGateway startup, signal handlers
Gatewaygateway/mqtt-gateway.jsMain orchestrator; EMQX connection, UDP socket, per-device lifecycle
Gatewaygateway/device-handlers.jsHello/goodbye/mode-change/character-change handler helpers
Gatewaygateway/emqx-broker.jsStandalone EmqxBroker class with wildcard topic matching
Gatewaygateway/udp-server.jsUdpServer class; AES-128-CTR encrypted send
Gatewaygateway/udp-forwarder.jsUDP forwarding utilities
Gatewaygateway/playback-control.jsNext/previous track control helpers
LiveKitlivekit/livekit-bridge.jsPer-device LiveKitBridge; room creation, agent dispatch, audio forwarding
LiveKitlivekit/audio-processor.jsEntropy-based Opus/PCM detection, silence checking, frame validation
LiveKitlivekit/message-handlers.jsMessageHandlers; TTS start/stop, STT, emotion, LLM-thinking MQTT messages
LiveKitlivekit/mcp-handler.jsMcpHandler; MCP JSON-RPC request/response, volume debouncing
MQTTmqtt/message-parser.jsParsing helpers for hello/goodbye/abort/mode-change/character-change
MQTTmqtt/virtual-connection.jsVirtualMQTTConnection; per-device MQTT session state, UDP crypto, bridge lifecycle
MQTT/mqtt-protocol.jsRaw MQTT 3.1.1 parser/encoder (CONNECT, PUBLISH, SUBSCRIBE, PINGREQ, DISCONNECT)
Corecore/opus-initializer.js@discordjs/opus encoder/decoder init
Corecore/worker-pool-manager.jsWorkerPoolManager; 4–8 worker threads for audio encoding/decoding
Corecore/media-api-client.jsCerebrium API base URL and axios config for music/story bots
Corecore/mem0-client.js, mem0-integration.jsMem0 long-term memory fetch for agent dispatch metadata
Core/audio-worker.jsWorker thread; Opus encode/decode per session
Constantsconstants/audio.jsSample rates, frame sizes, channel count
Utilsutils/config-manager.jsJSON config file loader (mqtt.json)
Utilsutils/logger.jsPino/Winston logger
Utilsutils/debug-logger.js, console-override.jsDebug namespace setup

Connection Lifecycle

1. Device connects

The ESP32 connects to the EMQX broker via MQTT/TCP. The gateway does not receive device connections directly — EMQX republishes all device messages to the internal/server-ingest topic, which the gateway subscribes to.

The device's MQTT client ID uses the format:

GID_test@@@68_25_dd_bb_f3_a0@@@<uuid>

Where the three parts separated by @@@ are the group ID, MAC address (underscores instead of colons), and a UUID.

2. Hello message received

EMQX republishes all device messages to internal/server-ingest. The gateway receives the message, extracts the MAC address from the client ID, and calls handleDeviceHello.

handleDeviceHello creates two objects stored in MQTTGateway:

MapKeyValue
connectionsconnectionId (random 32-bit int)VirtualMQTTConnection instance
deviceConnectionsdevice MAC address{ connectionId, connection }

3. Fast hello response (< 50 ms)

VirtualMQTTConnection.parseHelloMessage immediately:

  • Generates a 16-byte AES key (crypto.randomBytes(16)) and a 16-byte nonce (from generateUdpHeader)
  • Generates a session_id in the format <uuid>_<mac>_<roomType> (default room type: conversation)
  • Sends a hello response back via MQTT on devices/p2p/<fullClientId> with UDP connection parameters

4. Deferred setup (background)

While the device starts streaming audio, the gateway runs parallel DB queries to fetch:

  • Room type (conversation / music / story)
  • PTT mode (auto / manual)
  • Current character (e.g., Cheeko, Math Tutor)
  • Child profile
  • Mem0 long-term memories

After queries complete it:

  1. Sends a mode_update MQTT message with the actual values
  2. Creates a LiveKitBridge and connects to a LiveKit room named <uuid>_<mac>_<roomType>
  3. Dispatches the appropriate AI agent via AgentDispatchClient

5. Audio streaming

UDP packets arrive at MQTTGateway.onUdpMessage. The 16-byte header is parsed to extract the connectionId, which looks up the VirtualMQTTConnection. The connection decrypts the payload and forwards decoded PCM audio to LiveKit. Audio coming back from LiveKit is encoded to Opus and sent as encrypted UDP to the device.

6. Cleanup

When a goodbye message is received (or an inactivity timeout fires), VirtualMQTTConnection.parseOtherMessage notifies the agent via a LiveKit data channel disconnect_agent message, then closes the LiveKitBridge. The entries are removed from both gateway maps. Ghost room cleanup also runs every 5 minutes to remove any LiveKit rooms that are empty, have only agents (no device), or are older than 60 minutes.


Key Data Structures

VirtualMQTTConnection

One instance per connected device, stored in MQTTGateway.connections.

FieldTypeDescription
deviceIdstringDevice MAC address (aa:bb:cc:dd:ee:ff)
connectionIdnumberRandom 32-bit integer, used as UDP cookie
macAddressstringColon-separated MAC
groupIdstringFirst segment of MQTT client ID
uuidstringThird segment of MQTT client ID
bridgeLiveKitBridgeLiveKit room bridge (null until deferred setup completes)
roomTypestringconversation, music, or story
udp.keyBuffer16-byte AES-128 encryption key
udp.nonceBuffer16-byte AES-128-CTR IV
udp.encryptionstringAlways "aes-128-ctr"
udp.session_idstringLiveKit room name
udp.remoteAddressObject{ address, port } of device UDP endpoint
lastActivityTimenumberUnix ms; used for 2-minute inactivity timeout
sessionStartTimenumberUnix ms; max session duration 60 minutes

LiveKitBridge

One instance per active device session, held by VirtualMQTTConnection.bridge.

Connects the gateway as a participant in the LiveKit room, publishes device audio as a track, subscribes to agent audio tracks, and routes data channel messages in both directions.


Running

cd main/mqtt-gateway
npm install
node app.js

Configuration File (config/mqtt.json)

The gateway reads MQTT broker and LiveKit credentials from config/mqtt.json. The ConfigManager watches this file for live-reload — changes take effect without restarting the process.

{
"debug": false,
"mqtt_broker": {
"host": "YOUR_EMQX_HOST",
"port": 1883,
"protocol": "mqtt",
"keepalive": 60,
"clean": true,
"reconnectPeriod": 1000,
"connectTimeout": 30000
},
"livekit": {
"url": "wss://your-project.livekit.cloud",
"api_key": "YOUR_LIVEKIT_API_KEY",
"api_secret": "YOUR_LIVEKIT_API_SECRET"
}
}
FieldDescription
debugEnable verbose debug logging
mqtt_broker.hostEMQX broker hostname or IP
mqtt_broker.portEMQX broker port (default 1883)
mqtt_broker.protocolmqtt (plain) or mqtts (TLS)
mqtt_broker.keepaliveMQTT keepalive interval in seconds
mqtt_broker.cleanStart with a clean MQTT session
mqtt_broker.reconnectPeriodAuto-reconnect interval in ms
mqtt_broker.connectTimeoutConnection timeout in ms
livekit.urlLiveKit Cloud WebSocket URL
livekit.api_keyLiveKit API key
livekit.api_secretLiveKit API secret

Environment variables (EMQX_HOST, LIVEKIT_URL, etc.) override the corresponding mqtt.json values when set.


Environment Variables

VariableDefaultDescription
UDP_PORT1883UDP port for device audio streaming
PUBLIC_IP127.0.0.1Public IP address returned to devices in hello response
EMQX_HOST(from mqtt.json)EMQX broker hostname
EMQX_PORT(from mqtt.json)EMQX broker port
EMQX_PROTOCOL(from mqtt.json)MQTT protocol (mqtt, mqtts)
LIVEKIT_URL(from mqtt.json)LiveKit server WebSocket URL
LIVEKIT_API_KEY(from mqtt.json)LiveKit API key
LIVEKIT_API_SECRET(from mqtt.json)LiveKit API secret
MANAGER_API_URLBase URL for manager API, e.g. http://localhost:3000/toy
MANAGER_API_SECRETSecret header value for internal manager API calls
MEDIA_API_BASECerebrium endpointBase URL for music/story bot API
CEREBRIUM_API_TOKENRequired. Bearer token for Cerebrium API. Process exits if unset.
LOKI_HOSTOptional Grafana Loki host for centralized logging