mqtt tunneling

published: August 12, 2025
on this page

mqtt tunneling exploits the publish-subscribe messaging protocol used in iot environments to create covert channels through topic manipulation and qos field abuse.

technical description

mqtt (message queuing telemetry transport) is a lightweight messaging protocol designed for iot devices with limited bandwidth and processing power. the protocol’s flexibility in topic naming and message content makes it suitable for covert data transmission.

the technique works by:

  1. encoding data in mqtt topic names or message payloads
  2. using qos levels and retain flags for metadata
  3. exploiting publish-subscribe patterns for bidirectional communication
  4. leveraging legitimate iot traffic for cover

mqtt protocol structure

packet types

packet typevaluedirectiondescription
connect1client → serverconnection request
connack2server → clientconnection acknowledgment
publish3bidirectionalpublish message
puback4bidirectionalpublish acknowledgment
subscribe8client → serversubscribe request
suback9server → clientsubscribe acknowledgment
unsubscribe10client → serverunsubscribe request
pingreq12client → serverping request

publish message structure

fixed header:
 7  6  5  4  3  2  1  0
+--+--+--+--+--+--+--+--+
|mt|dp|qo|qo|rt|  |  |  | byte 1
+--+--+--+--+--+--+--+--+
|    remaining length     | byte 2+

variable header:
topic name length (msb)
topic name length (lsb)
topic name (utf-8)
packet identifier (qos > 0)

payload:
message data (optional)

implementation: mqtt_vpn

overview

mqtt_vpn (https://github.com/martin-ger/mqtt_vpn) creates ip tunnels over mqtt:

  • supports tun interface creation
  • optional aes encryption
  • configurable mqtt broker
  • performance varies by broker latency

installation

# clone repository
git clone https://github.com/martin-ger/mqtt_vpn.git
cd mqtt_vpn

# compile
make all

# install dependencies
sudo apt install libmosquitto-dev

server setup

# create mqtt broker (mosquitto)
sudo apt install mosquitto mosquitto-clients

# configure mosquitto
sudo nano /etc/mosquitto/mosquitto.conf

mosquitto configuration:

# /etc/mosquitto/mosquitto.conf
port 1883
allow_anonymous true
max_connections 1000
max_keepalive 300

# logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type all

# persistence
persistence true
persistence_location /var/lib/mosquitto/

vpn tunnel setup

# server side
sudo ./mqtt_vpn -s -b mqtt://localhost:1883 \
  -t tunnel/server -i 10.0.0.1/24

# client side
sudo ./mqtt_vpn -c -b mqtt://broker.example.com:1883 \
  -t tunnel/server -i 10.0.0.2/24

with encryption:

# server with aes encryption
sudo ./mqtt_vpn -s -b mqtt://localhost:1883 \
  -t tunnel/encrypted -i 10.0.0.1/24 \
  -k "secretkey123456"

# client with matching key
sudo ./mqtt_vpn -c -b mqtt://broker.example.com:1883 \
  -t tunnel/encrypted -i 10.0.0.2/24 \
  -k "secretkey123456"

configuration file

# mqtt_vpn.conf
[server]
broker_url = mqtt://broker.example.com:1883
topic_prefix = vpn/tunnel
interface_ip = 10.0.0.1/24
encryption_key = your-secret-key-here
mtu = 1500

[client]
broker_url = mqtt://broker.example.com:1883
topic_prefix = vpn/tunnel
interface_ip = 10.0.0.2/24
encryption_key = your-secret-key-here
keepalive = 60

implementation: zika

overview

zika (https://github.com/akiroz/zika) provides mqtt-based networking:

  • lightweight implementation
  • focus on simplicity
  • supports message queueing
  • javascript/node.js based

setup

# install via npm
npm install -g zika

# or clone from source
git clone https://github.com/akiroz/zika.git
cd zika
npm install

usage

// server.js
const zika = require('zika');

const server = zika.createserver({
  mqtt_broker: 'mqtt://localhost:1883',
  topic_base: 'zika/tunnel',
  encryption: 'aes-256-cbc',
  key: 'your-encryption-key',
});

server.on('connection', (client) => {
  console.log('client connected:', client.id);

  client.on('data', (data) => {
    console.log('received:', data.length, 'bytes');
    // forward data through tunnel
  });
});

server.listen(8080);
// client.js
const zika = require('zika');

const client = zika.createclient({
  mqtt_broker: 'mqtt://broker.example.com:1883',
  topic_base: 'zika/tunnel',
  server_host: 'localhost',
  server_port: 8080,
});

client.connect(() => {
  console.log('tunnel established');

  // send data through tunnel
  client.write('hello from client');
});

implementation: mqtunnel

overview

mqtunnel (https://github.com/shirou/mqtunnel) provides simple mqtt tunneling:

  • written in go
  • minimal dependencies
  • supports multiple connections
  • configurable qos levels

installation

# install go
sudo apt install golang-go

# clone and build
git clone https://github.com/shirou/mqtunnel.git
cd mqtunnel
go build -o mqtunnel

# install binary
sudo cp mqtunnel /usr/local/bin/

configuration

# mqtunnel.yaml
server:
  mqtt_broker: 'tcp://localhost:1883'
  client_id: 'mqtunnel_server'
  topic_prefix: 'tunnel'
  bind_address: ':8080'

client:
  mqtt_broker: 'tcp://broker.example.com:1883'
  client_id: 'mqtunnel_client'
  topic_prefix: 'tunnel'
  remote_address: 'target.server.com:22'

usage

# server mode
mqtunnel server --config mqtunnel.yaml

# client mode
mqtunnel client --config mqtunnel.yaml

# connect through tunnel
ssh -p 8080 user@localhost

covert channel techniques

topic-based encoding

# encode data in mqtt topic names
import paho.mqtt.client as mqtt
import base64
import json

class mqttcovertchannel:
    def __init__(self, broker_host, broker_port=1883):
        self.client = mqtt.client()
        self.broker_host = broker_host
        self.broker_port = broker_port
        self.connected = false

        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

    def connect(self):
        """connect to mqtt broker"""
        self.client.connect(self.broker_host, self.broker_port, 60)
        self.client.loop_start()

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.connected = true
            print("connected to mqtt broker")
            # subscribe to covert channel topics
            client.subscribe("sensors/+/status")
        else:
            print(f"failed to connect: {rc}")

    def on_message(self, client, userdata, msg):
        """handle incoming covert messages"""
        topic_parts = msg.topic.split('/')

        if len(topic_parts) == 3 and topic_parts[0] == 'sensors':
            # extract data from topic
            device_id = topic_parts[1]

            # decode base64 data from device id
            try:
                decoded_data = base64.b64decode(device_id).decode()
                print(f"covert data received: {decoded_data}")
            except:
                pass  # ignore non-covert messages

    def send_covert_data(self, data):
        """send data encoded in topic names"""
        if not self.connected:
            return false

        # encode data as base64
        encoded_data = base64.b64encode(data.encode()).decode()

        # split into chunks for topic names (mqtt topic limit ~65kb)
        chunk_size = 50  # conservative chunk size
        chunks = [encoded_data[i:i+chunk_size]
                 for i in range(0, len(encoded_data), chunk_size)]

        # send each chunk as separate topic
        for i, chunk in enumerate(chunks):
            topic = f"sensors/{chunk}/status"
            payload = json.dumps({
                "timestamp": time.time(),
                "sequence": i,
                "total_chunks": len(chunks),
                "temperature": 23.5  # fake sensor data
            })

            result = self.client.publish(topic, payload, qos=1)
            if result.rc != mqtt.mqtt_err_success:
                print(f"failed to send chunk {i}")
                return false

        return true

# usage
channel = mqttcovertchannel('localhost')
channel.connect()
channel.send_covert_data('secret intelligence data')

payload encoding with qos manipulation

# use qos levels and retain flags for metadata
class mqttadvancedcovert:
    def __init__(self, broker_host):
        self.client = mqtt.client()
        self.broker_host = broker_host
        self.sequence = 0

    def send_with_metadata(self, data, metadata):
        """encode metadata in qos and retain flags"""

        # encode metadata bits in qos (2 bits) and retain (1 bit)
        # total: 3 bits per message
        metadata_bits = format(metadata, '03b')

        qos = int(metadata_bits[:2], 2)  # first 2 bits
        retain = bool(int(metadata_bits[2]))  # last bit

        # send data with encoded metadata
        topic = f"data/stream/{self.sequence}"
        self.client.publish(topic, data, qos=qos, retain=retain)
        self.sequence += 1

    def receive_with_metadata(self, client, userdata, msg):
        """decode metadata from message properties"""

        qos = msg.qos
        retain = msg.retain

        # reconstruct metadata
        metadata_bits = f"{qos:02b}{int(retain)}"
        metadata = int(metadata_bits, 2)

        print(f"received data: {msg.payload}")
        print(f"metadata: {metadata} (qos: {qos}, retain: {retain})")

# usage
covert = mqttadvancedcovert('localhost')
covert.send_with_metadata(b'payload data', 5)  # metadata = 5 (101 binary)

client id steganography

# hide data in mqtt client identifiers
import hashlib
import time

class mqttclientidcovert:
    def __init__(self, broker_host):
        self.broker_host = broker_host

    def generate_covert_client_id(self, data):
        """generate client id with embedded data"""

        # use legitimate-looking client id format
        timestamp = str(int(time.time()))

        # encode data in client id
        encoded_data = base64.b64encode(data.encode()).decode()

        # create hash for authenticity
        hash_input = f"{timestamp}{encoded_data}"
        hash_value = hashlib.md5(hash_input.encode()).hexdigest()[:8]

        # format: device_<timestamp>_<hash>_<data>
        client_id = f"device_{timestamp}_{hash_value}_{encoded_data}"

        return client_id

    def extract_from_client_id(self, client_id):
        """extract data from client id"""

        parts = client_id.split('_')
        if len(parts) >= 4 and parts[0] == 'device':
            timestamp = parts[1]
            hash_value = parts[2]
            encoded_data = '_'.join(parts[3:])  # rejoin remaining parts

            # verify hash
            hash_input = f"{timestamp}{encoded_data}"
            expected_hash = hashlib.md5(hash_input.encode()).hexdigest()[:8]

            if hash_value == expected_hash:
                try:
                    decoded_data = base64.b64decode(encoded_data).decode()
                    return decoded_data
                except:
                    pass

        return none

    def connect_with_covert_id(self, data):
        """connect using covert client id"""

        covert_id = self.generate_covert_client_id(data)

        client = mqtt.client(client_id=covert_id)
        client.connect(self.broker_host, 1883, 60)

        return client

# usage
covert = mqttclientidcovert('localhost')
client = covert.connect_with_covert_id('hidden message')

traffic analysis

mqtt packet inspection

# analyze mqtt traffic with scapy
from scapy.all import *
import struct

class mqttpacket(packet):
    name = "mqtt"
    fields_desc = [
        bitfield("message_type", 0, 4),
        bitfield("dup", 0, 1),
        bitfield("qos", 0, 2),
        bitfield("retain", 0, 1),
        fieldlenfield("length", none, fmt="b")
    ]

def analyze_mqtt_traffic(pcap_file):
    """analyze mqtt traffic for covert channels"""

    packets = rdpcap(pcap_file)
    mqtt_packets = []

    for pkt in packets:
        if pkt.haslayer(tcp) and (pkt[tcp].dport == 1883 or pkt[tcp].sport == 1883):
            if hasattr(pkt, 'load') and pkt.load:
                mqtt_packets.append(pkt)

    # analyze patterns
    topic_lengths = []
    qos_distribution = {0: 0, 1: 0, 2: 0}
    retain_flags = []

    for pkt in mqtt_packets:
        payload = pkt.load

        if len(payload) > 0:
            # parse mqtt fixed header
            msg_type = (payload[0] >> 4) & 0x0f
            qos = (payload[0] >> 1) & 0x03
            retain = payload[0] & 0x01

            if msg_type == 3:  # publish message
                qos_distribution[qos] += 1
                retain_flags.append(retain)

                # extract topic length
                if len(payload) > 2:
                    topic_len = struct.unpack('!h', payload[2:4])[0]
                    topic_lengths.append(topic_len)

    # analyze for anomalies
    print(f"total mqtt packets: {len(mqtt_packets)}")
    print(f"qos distribution: {qos_distribution}")
    print(f"retain flag usage: {sum(retain_flags)}/{len(retain_flags)}")
    print(f"avg topic length: {sum(topic_lengths)/len(topic_lengths):.1f}")

    # check for suspicious patterns
    if len(set(topic_lengths)) < 5 and len(topic_lengths) > 20:
        print("warning: consistent topic lengths detected")

    high_qos_ratio = (qos_distribution[1] + qos_distribution[2]) / sum(qos_distribution.values())
    if high_qos_ratio > 0.8:
        print("warning: unusually high qos levels")

# usage
analyze_mqtt_traffic('mqtt_traffic.pcap')

detection methods

broker-side monitoring

# mqtt broker plugin for covert channel detection
import paho.mqtt.client as mqtt
from collections import defaultdict
import time
import math

class mqttcovertdetector:
    def __init__(self, broker_host):
        self.client = mqtt.client()
        self.broker_host = broker_host

        # tracking data
        self.client_stats = defaultdict(lambda: {
            'messages': 0,
            'topics': set(),
            'qos_usage': {0: 0, 1: 0, 2: 0},
            'retain_count': 0,
            'first_seen': time.time(),
            'last_activity': time.time()
        })

        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message

    def on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            # subscribe to all topics for monitoring
            client.subscribe('#', qos=2)
            print("mqtt covert detector active")

    def on_message(self, client, userdata, msg):
        """analyze each message for covert channel indicators"""

        client_id = "unknown"  # would need broker logs for actual client id
        current_time = time.time()

        stats = self.client_stats[client_id]
        stats['messages'] += 1
        stats['topics'].add(msg.topic)
        stats['qos_usage'][msg.qos] += 1
        stats['last_activity'] = current_time

        if msg.retain:
            stats['retain_count'] += 1

        # check for anomalies
        self.check_client_anomalies(client_id, msg)

        # periodic analysis
        if stats['messages'] % 100 == 0:
            self.analyze_client_behavior(client_id)

    def check_client_anomalies(self, client_id, msg):
        """check individual message for anomalies"""

        # high entropy in topic names
        topic_entropy = self.calculate_entropy(msg.topic)
        if topic_entropy > 4.0:
            print(f"high entropy topic from {client_id}: {msg.topic}")

        # unusually large payloads for iot devices
        if len(msg.payload) > 10000:
            print(f"large payload from {client_id}: {len(msg.payload)} bytes")

        # base64 encoded topic components
        topic_parts = msg.topic.split('/')
        for part in topic_parts:
            if len(part) > 10 and self.looks_like_base64(part):
                print(f"possible base64 encoding in topic: {msg.topic}")

    def analyze_client_behavior(self, client_id):
        """analyze overall client behavior patterns"""

        stats = self.client_stats[client_id]

        # calculate activity metrics
        duration = stats['last_activity'] - stats['first_seen']
        message_rate = stats['messages'] / duration if duration > 0 else 0

        # topic diversity
        unique_topics = len(stats['topics'])
        topic_diversity = unique_topics / stats['messages']

        # qos usage patterns
        total_qos = sum(stats['qos_usage'].values())
        high_qos_ratio = (stats['qos_usage'][1] + stats['qos_usage'][2]) / total_qos

        # retain usage
        retain_ratio = stats['retain_count'] / stats['messages']

        # flag suspicious patterns
        anomaly_score = 0
        reasons = []

        if message_rate > 10:  # >10 msg/sec sustained
            anomaly_score += 1
            reasons.append('high message rate')

        if topic_diversity < 0.1:  # very few unique topics
            anomaly_score += 1
            reasons.append('low topic diversity')

        if high_qos_ratio > 0.8:  # mostly qos 1/2
            anomaly_score += 1
            reasons.append('unusual qos pattern')

        if retain_ratio > 0.5:  # >50% retained messages
            anomaly_score += 1
            reasons.append('high retain usage')

        if anomaly_score >= 2:
            print(f"suspicious client {client_id}: {', '.join(reasons)}")
            print(f"  rate: {message_rate:.1f} msg/s, topics: {unique_topics}")

    def calculate_entropy(self, text):
        """calculate shannon entropy"""
        from collections import counter

        if not text:
            return 0

        counts = counter(text)
        probs = [count/len(text) for count in counts.values()]
        return -sum(p * math.log2(p) for p in probs if p > 0)

    def looks_like_base64(self, text):
        """heuristic to detect base64 encoding"""
        import re

        # base64 pattern
        base64_pattern = re.compile(r'^[a-za-z0-9+/]*={0,2}$')

        if not base64_pattern.match(text):
            return false

        # length should be multiple of 4
        if len(text) % 4 != 0:
            return false

        # calculate entropy
        entropy = self.calculate_entropy(text)
        return entropy > 3.5  # base64 typically has high entropy

# usage
detector = mqttcovertdetector('localhost')
detector.client.connect('localhost', 1883, 60)
detector.client.loop_forever()

network-level detection

# monitor mqtt connections
netstat -an | grep :1883

# capture mqtt traffic
tcpdump -i eth0 -w mqtt_capture.pcap port 1883

# analyze with wireshark
tshark -r mqtt_capture.pcap -y mqtt -t fields \
  -e mqtt.msgtype -e mqtt.topic -e mqtt.qos -e mqtt.retain

# count unique topics per client
tshark -r mqtt_capture.pcap -y mqtt -t fields \
  -e ip.src -e mqtt.topic | sort | uniq -c

countermeasures

broker configuration

# mosquitto security settings
allow_anonymous false
password_file /etc/mosquitto/passwd

# acl restrictions
acl_file /etc/mosquitto/acls

# connection limits
max_connections 100
max_inflight_messages 10

# topic restrictions
topic_pattern +/+/+  # limit topic depth
topic_max_length 100

access control list:

# /etc/mosquitto/acls
# deny all by default
pattern read $sys/#

# allow specific patterns only
user sensor_device
topic read sensors/device_001/+
topic write sensors/device_001/status

user controller
topic read sensors/+/+
topic write control/+/command

topic filtering

# mqtt topic filter for covert channel prevention
import re
from collections import defaultdict

class mqtttopicfilter:
    def __init__(self):
        self.allowed_patterns = [
            r'^sensors/[a-z0-9_]+/[a-z_]+$',
            r'^control/[a-z0-9_]+/[a-z_]+$',
            r'^status/[a-z0-9_]+$'
        ]

        self.blocked_patterns = [
            r'.*[a-za-z0-9+/]{20,}.*',  # base64-like strings
            r'.*[0-9a-f]{32,}.*',       # hex strings
            r'.*/[^/]{100,}$'           # very long topic components
        ]

        self.client_topic_counts = defaultdict(set)

    def is_allowed_topic(self, topic, client_id=none):
        """check if topic is allowed"""

        # check against blocked patterns first
        for pattern in self.blocked_patterns:
            if re.match(pattern, topic, re.ignorecase):
                print(f"blocked topic pattern: {topic}")
                return false

        # check against allowed patterns
        for pattern in self.allowed_patterns:
            if re.match(pattern, topic):
                # track topic usage per client
                if client_id:
                    self.client_topic_counts[client_id].add(topic)

                    # limit topics per client
                    if len(self.client_topic_counts[client_id]) > 50:
                        print(f"client {client_id} exceeded topic limit")
                        return false

                return true

        print(f"topic not in allowed patterns: {topic}")
        return false

    def filter_message(self, client_id, topic, payload, qos, retain):
        """comprehensive message filtering"""

        # topic validation
        if not self.is_allowed_topic(topic, client_id):
            return false

        # payload size limits
        if len(payload) > 1024:  # 1kb limit for iot messages
            print(f"oversized payload from {client_id}: {len(payload)} bytes")
            return false

        # qos restrictions
        if qos > 1:  # limit to qos 0 and 1
            print(f"high qos level from {client_id}: {qos}")
            return false

        # retain flag restrictions
        if retain and not topic.endswith('/config'):
            print(f"inappropriate retain flag from {client_id}")
            return false

        return true

# integration with broker
filter_instance = mqtttopicfilter()

def message_callback(client, userdata, message):
    client_id = userdata.get('client_id', 'unknown')

    if not filter_instance.filter_message(
        client_id, message.topic, message.payload,
        message.qos, message.retain):
        # drop message
        print(f"message dropped from {client_id}")
        return

    # process allowed message
    print(f"allowed message: {message.topic}")

advantages and limitations

advantages

  • widespread iot deployment provides cover
  • flexible topic structure allows encoding variations
  • publish-subscribe model supports one-to-many communication
  • lightweight protocol suitable for low-bandwidth environments
  • qos and retain flags provide additional metadata channels

limitations

  • mqtt brokers may log all topics and messages
  • simple protocol structure offers limited hiding places
  • topic filtering can block covert channels effectively
  • authentication requirements in secure deployments
  • limited payload size for efficiency

performance characteristics

bandwidth analysis

techniquecapacitylatencystealth level
topic encoding10-100 bytes/msg50-200msmedium
payload embedding1-10kb/msg20-100mslow
qos/retain metadata3 bits/msgminimalhigh
client id encoding50-200 bytes/connconnection timehigh

broker performance impact

# measure mqtt broker performance impact
import time
import paho.mqtt.client as mqtt
import threading

def performance_test():
    """test mqtt broker under covert channel load"""

    clients = []
    message_count = 0
    start_time = time.time()

    def on_message(client, userdata, msg):
        nonlocal message_count
        message_count += 1

    # create multiple covert clients
    for i in range(10):
        client = mqtt.client(f"covert_client_{i}")
        client.on_message = on_message
        client.connect('localhost', 1883, 60)
        client.subscribe('test/+/+')
        clients.append(client)

    # send covert messages at high rate
    for i in range(1000):
        topic = f"test/{base64.b64encode(f'data{i}'.encode()).decode()}/status"
        clients[i % 10].publish(topic, f"payload_{i}", qos=1)

        if i % 100 == 0:
            elapsed = time.time() - start_time
            rate = message_count / elapsed if elapsed > 0 else 0
            print(f"messages: {message_count}, rate: {rate:.1f} msg/s")

    # cleanup
    for client in clients:
        client.disconnect()

# performance_test()

real-world applications

iot botnet communication

  • command and control via mqtt topics
  • distributed across multiple public brokers
  • legitimate iot traffic provides cover

industrial espionage

  • data exfiltration from smart factories
  • sensor data embedding in mqtt messages
  • long-term persistent access

research and testing

  • academic studies on iot security
  • penetration testing in iot environments
  • covert channel capacity analysis

testing environment

local mqtt setup

# install mosquitto broker
sudo apt install mosquitto mosquitto-clients

# start broker
sudo systemctl start mosquitto
sudo systemctl enable mosquitto

# test basic functionality
mosquitto_pub -h localhost -t test/topic -m "hello world"
mosquitto_sub -h localhost -t test/topic

# test with authentication
sudo mosquitto_passwd -c /etc/mosquitto/passwd testuser
echo "password_file /etc/mosquitto/passwd" | sudo tee -a /etc/mosquitto/mosquitto.conf
sudo systemctl restart mosquitto

docker test environment

# mqtt test environment
from eclipse-mosquitto:latest

copy mosquitto.conf /mosquitto/config/mosquitto.conf
copy passwd /mosquitto/config/passwd
copy acl /mosquitto/config/acl

expose 1883 9001

cmd ["/usr/sbin/mosquitto", "-c", "/mosquitto/config/mosquitto.conf"]
# run test environment
docker build -t mqtt-test .
docker run -p 1883:1883 -p 9001:9001 mqtt-test

references

  • mqtt version 5.0 specification - oasis standard
  • rfc 6455: the websocket protocol (mqtt over websockets)
  • “mqtt security fundamentals” - hivemq documentation
  • “covert channels in iot protocols” - academic research
  • eclipse mosquitto documentation
on this page