mqtt tunneling
on this page
mqtt tunneling exploits the publish-subscribe messaging protocol used in iot environments to create covert channels through topic manipulation and qos field abuse.
technical description
mqtt (message queuing telemetry transport) is a lightweight messaging protocol designed for iot devices with limited bandwidth and processing power. the protocol’s flexibility in topic naming and message content makes it suitable for covert data transmission.
the technique works by:
- encoding data in mqtt topic names or message payloads
- using qos levels and retain flags for metadata
- exploiting publish-subscribe patterns for bidirectional communication
- leveraging legitimate iot traffic for cover
mqtt protocol structure
packet types
packet type | value | direction | description |
---|---|---|---|
connect | 1 | client → server | connection request |
connack | 2 | server → client | connection acknowledgment |
publish | 3 | bidirectional | publish message |
puback | 4 | bidirectional | publish acknowledgment |
subscribe | 8 | client → server | subscribe request |
suback | 9 | server → client | subscribe acknowledgment |
unsubscribe | 10 | client → server | unsubscribe request |
pingreq | 12 | client → server | ping request |
publish message structure
fixed header:
7 6 5 4 3 2 1 0
+--+--+--+--+--+--+--+--+
|mt|dp|qo|qo|rt| | | | byte 1
+--+--+--+--+--+--+--+--+
| remaining length | byte 2+
variable header:
topic name length (msb)
topic name length (lsb)
topic name (utf-8)
packet identifier (qos > 0)
payload:
message data (optional)
implementation: mqtt_vpn
overview
mqtt_vpn (https://github.com/martin-ger/mqtt_vpn) creates ip tunnels over mqtt:
- supports tun interface creation
- optional aes encryption
- configurable mqtt broker
- performance varies by broker latency
installation
# clone repository
git clone https://github.com/martin-ger/mqtt_vpn.git
cd mqtt_vpn
# compile
make all
# install dependencies
sudo apt install libmosquitto-dev
server setup
# create mqtt broker (mosquitto)
sudo apt install mosquitto mosquitto-clients
# configure mosquitto
sudo nano /etc/mosquitto/mosquitto.conf
mosquitto configuration:
# /etc/mosquitto/mosquitto.conf
port 1883
allow_anonymous true
max_connections 1000
max_keepalive 300
# logging
log_dest file /var/log/mosquitto/mosquitto.log
log_type all
# persistence
persistence true
persistence_location /var/lib/mosquitto/
vpn tunnel setup
# server side
sudo ./mqtt_vpn -s -b mqtt://localhost:1883 \
-t tunnel/server -i 10.0.0.1/24
# client side
sudo ./mqtt_vpn -c -b mqtt://broker.example.com:1883 \
-t tunnel/server -i 10.0.0.2/24
with encryption:
# server with aes encryption
sudo ./mqtt_vpn -s -b mqtt://localhost:1883 \
-t tunnel/encrypted -i 10.0.0.1/24 \
-k "secretkey123456"
# client with matching key
sudo ./mqtt_vpn -c -b mqtt://broker.example.com:1883 \
-t tunnel/encrypted -i 10.0.0.2/24 \
-k "secretkey123456"
configuration file
# mqtt_vpn.conf
[server]
broker_url = mqtt://broker.example.com:1883
topic_prefix = vpn/tunnel
interface_ip = 10.0.0.1/24
encryption_key = your-secret-key-here
mtu = 1500
[client]
broker_url = mqtt://broker.example.com:1883
topic_prefix = vpn/tunnel
interface_ip = 10.0.0.2/24
encryption_key = your-secret-key-here
keepalive = 60
implementation: zika
overview
zika (https://github.com/akiroz/zika) provides mqtt-based networking:
- lightweight implementation
- focus on simplicity
- supports message queueing
- javascript/node.js based
setup
# install via npm
npm install -g zika
# or clone from source
git clone https://github.com/akiroz/zika.git
cd zika
npm install
usage
// server.js
const zika = require('zika');
const server = zika.createserver({
mqtt_broker: 'mqtt://localhost:1883',
topic_base: 'zika/tunnel',
encryption: 'aes-256-cbc',
key: 'your-encryption-key',
});
server.on('connection', (client) => {
console.log('client connected:', client.id);
client.on('data', (data) => {
console.log('received:', data.length, 'bytes');
// forward data through tunnel
});
});
server.listen(8080);
// client.js
const zika = require('zika');
const client = zika.createclient({
mqtt_broker: 'mqtt://broker.example.com:1883',
topic_base: 'zika/tunnel',
server_host: 'localhost',
server_port: 8080,
});
client.connect(() => {
console.log('tunnel established');
// send data through tunnel
client.write('hello from client');
});
implementation: mqtunnel
overview
mqtunnel (https://github.com/shirou/mqtunnel) provides simple mqtt tunneling:
- written in go
- minimal dependencies
- supports multiple connections
- configurable qos levels
installation
# install go
sudo apt install golang-go
# clone and build
git clone https://github.com/shirou/mqtunnel.git
cd mqtunnel
go build -o mqtunnel
# install binary
sudo cp mqtunnel /usr/local/bin/
configuration
# mqtunnel.yaml
server:
mqtt_broker: 'tcp://localhost:1883'
client_id: 'mqtunnel_server'
topic_prefix: 'tunnel'
bind_address: ':8080'
client:
mqtt_broker: 'tcp://broker.example.com:1883'
client_id: 'mqtunnel_client'
topic_prefix: 'tunnel'
remote_address: 'target.server.com:22'
usage
# server mode
mqtunnel server --config mqtunnel.yaml
# client mode
mqtunnel client --config mqtunnel.yaml
# connect through tunnel
ssh -p 8080 user@localhost
covert channel techniques
topic-based encoding
# encode data in mqtt topic names
import paho.mqtt.client as mqtt
import base64
import json
class mqttcovertchannel:
def __init__(self, broker_host, broker_port=1883):
self.client = mqtt.client()
self.broker_host = broker_host
self.broker_port = broker_port
self.connected = false
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def connect(self):
"""connect to mqtt broker"""
self.client.connect(self.broker_host, self.broker_port, 60)
self.client.loop_start()
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = true
print("connected to mqtt broker")
# subscribe to covert channel topics
client.subscribe("sensors/+/status")
else:
print(f"failed to connect: {rc}")
def on_message(self, client, userdata, msg):
"""handle incoming covert messages"""
topic_parts = msg.topic.split('/')
if len(topic_parts) == 3 and topic_parts[0] == 'sensors':
# extract data from topic
device_id = topic_parts[1]
# decode base64 data from device id
try:
decoded_data = base64.b64decode(device_id).decode()
print(f"covert data received: {decoded_data}")
except:
pass # ignore non-covert messages
def send_covert_data(self, data):
"""send data encoded in topic names"""
if not self.connected:
return false
# encode data as base64
encoded_data = base64.b64encode(data.encode()).decode()
# split into chunks for topic names (mqtt topic limit ~65kb)
chunk_size = 50 # conservative chunk size
chunks = [encoded_data[i:i+chunk_size]
for i in range(0, len(encoded_data), chunk_size)]
# send each chunk as separate topic
for i, chunk in enumerate(chunks):
topic = f"sensors/{chunk}/status"
payload = json.dumps({
"timestamp": time.time(),
"sequence": i,
"total_chunks": len(chunks),
"temperature": 23.5 # fake sensor data
})
result = self.client.publish(topic, payload, qos=1)
if result.rc != mqtt.mqtt_err_success:
print(f"failed to send chunk {i}")
return false
return true
# usage
channel = mqttcovertchannel('localhost')
channel.connect()
channel.send_covert_data('secret intelligence data')
payload encoding with qos manipulation
# use qos levels and retain flags for metadata
class mqttadvancedcovert:
def __init__(self, broker_host):
self.client = mqtt.client()
self.broker_host = broker_host
self.sequence = 0
def send_with_metadata(self, data, metadata):
"""encode metadata in qos and retain flags"""
# encode metadata bits in qos (2 bits) and retain (1 bit)
# total: 3 bits per message
metadata_bits = format(metadata, '03b')
qos = int(metadata_bits[:2], 2) # first 2 bits
retain = bool(int(metadata_bits[2])) # last bit
# send data with encoded metadata
topic = f"data/stream/{self.sequence}"
self.client.publish(topic, data, qos=qos, retain=retain)
self.sequence += 1
def receive_with_metadata(self, client, userdata, msg):
"""decode metadata from message properties"""
qos = msg.qos
retain = msg.retain
# reconstruct metadata
metadata_bits = f"{qos:02b}{int(retain)}"
metadata = int(metadata_bits, 2)
print(f"received data: {msg.payload}")
print(f"metadata: {metadata} (qos: {qos}, retain: {retain})")
# usage
covert = mqttadvancedcovert('localhost')
covert.send_with_metadata(b'payload data', 5) # metadata = 5 (101 binary)
client id steganography
# hide data in mqtt client identifiers
import hashlib
import time
class mqttclientidcovert:
def __init__(self, broker_host):
self.broker_host = broker_host
def generate_covert_client_id(self, data):
"""generate client id with embedded data"""
# use legitimate-looking client id format
timestamp = str(int(time.time()))
# encode data in client id
encoded_data = base64.b64encode(data.encode()).decode()
# create hash for authenticity
hash_input = f"{timestamp}{encoded_data}"
hash_value = hashlib.md5(hash_input.encode()).hexdigest()[:8]
# format: device_<timestamp>_<hash>_<data>
client_id = f"device_{timestamp}_{hash_value}_{encoded_data}"
return client_id
def extract_from_client_id(self, client_id):
"""extract data from client id"""
parts = client_id.split('_')
if len(parts) >= 4 and parts[0] == 'device':
timestamp = parts[1]
hash_value = parts[2]
encoded_data = '_'.join(parts[3:]) # rejoin remaining parts
# verify hash
hash_input = f"{timestamp}{encoded_data}"
expected_hash = hashlib.md5(hash_input.encode()).hexdigest()[:8]
if hash_value == expected_hash:
try:
decoded_data = base64.b64decode(encoded_data).decode()
return decoded_data
except:
pass
return none
def connect_with_covert_id(self, data):
"""connect using covert client id"""
covert_id = self.generate_covert_client_id(data)
client = mqtt.client(client_id=covert_id)
client.connect(self.broker_host, 1883, 60)
return client
# usage
covert = mqttclientidcovert('localhost')
client = covert.connect_with_covert_id('hidden message')
traffic analysis
mqtt packet inspection
# analyze mqtt traffic with scapy
from scapy.all import *
import struct
class mqttpacket(packet):
name = "mqtt"
fields_desc = [
bitfield("message_type", 0, 4),
bitfield("dup", 0, 1),
bitfield("qos", 0, 2),
bitfield("retain", 0, 1),
fieldlenfield("length", none, fmt="b")
]
def analyze_mqtt_traffic(pcap_file):
"""analyze mqtt traffic for covert channels"""
packets = rdpcap(pcap_file)
mqtt_packets = []
for pkt in packets:
if pkt.haslayer(tcp) and (pkt[tcp].dport == 1883 or pkt[tcp].sport == 1883):
if hasattr(pkt, 'load') and pkt.load:
mqtt_packets.append(pkt)
# analyze patterns
topic_lengths = []
qos_distribution = {0: 0, 1: 0, 2: 0}
retain_flags = []
for pkt in mqtt_packets:
payload = pkt.load
if len(payload) > 0:
# parse mqtt fixed header
msg_type = (payload[0] >> 4) & 0x0f
qos = (payload[0] >> 1) & 0x03
retain = payload[0] & 0x01
if msg_type == 3: # publish message
qos_distribution[qos] += 1
retain_flags.append(retain)
# extract topic length
if len(payload) > 2:
topic_len = struct.unpack('!h', payload[2:4])[0]
topic_lengths.append(topic_len)
# analyze for anomalies
print(f"total mqtt packets: {len(mqtt_packets)}")
print(f"qos distribution: {qos_distribution}")
print(f"retain flag usage: {sum(retain_flags)}/{len(retain_flags)}")
print(f"avg topic length: {sum(topic_lengths)/len(topic_lengths):.1f}")
# check for suspicious patterns
if len(set(topic_lengths)) < 5 and len(topic_lengths) > 20:
print("warning: consistent topic lengths detected")
high_qos_ratio = (qos_distribution[1] + qos_distribution[2]) / sum(qos_distribution.values())
if high_qos_ratio > 0.8:
print("warning: unusually high qos levels")
# usage
analyze_mqtt_traffic('mqtt_traffic.pcap')
detection methods
broker-side monitoring
# mqtt broker plugin for covert channel detection
import paho.mqtt.client as mqtt
from collections import defaultdict
import time
import math
class mqttcovertdetector:
def __init__(self, broker_host):
self.client = mqtt.client()
self.broker_host = broker_host
# tracking data
self.client_stats = defaultdict(lambda: {
'messages': 0,
'topics': set(),
'qos_usage': {0: 0, 1: 0, 2: 0},
'retain_count': 0,
'first_seen': time.time(),
'last_activity': time.time()
})
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
def on_connect(self, client, userdata, flags, rc):
if rc == 0:
# subscribe to all topics for monitoring
client.subscribe('#', qos=2)
print("mqtt covert detector active")
def on_message(self, client, userdata, msg):
"""analyze each message for covert channel indicators"""
client_id = "unknown" # would need broker logs for actual client id
current_time = time.time()
stats = self.client_stats[client_id]
stats['messages'] += 1
stats['topics'].add(msg.topic)
stats['qos_usage'][msg.qos] += 1
stats['last_activity'] = current_time
if msg.retain:
stats['retain_count'] += 1
# check for anomalies
self.check_client_anomalies(client_id, msg)
# periodic analysis
if stats['messages'] % 100 == 0:
self.analyze_client_behavior(client_id)
def check_client_anomalies(self, client_id, msg):
"""check individual message for anomalies"""
# high entropy in topic names
topic_entropy = self.calculate_entropy(msg.topic)
if topic_entropy > 4.0:
print(f"high entropy topic from {client_id}: {msg.topic}")
# unusually large payloads for iot devices
if len(msg.payload) > 10000:
print(f"large payload from {client_id}: {len(msg.payload)} bytes")
# base64 encoded topic components
topic_parts = msg.topic.split('/')
for part in topic_parts:
if len(part) > 10 and self.looks_like_base64(part):
print(f"possible base64 encoding in topic: {msg.topic}")
def analyze_client_behavior(self, client_id):
"""analyze overall client behavior patterns"""
stats = self.client_stats[client_id]
# calculate activity metrics
duration = stats['last_activity'] - stats['first_seen']
message_rate = stats['messages'] / duration if duration > 0 else 0
# topic diversity
unique_topics = len(stats['topics'])
topic_diversity = unique_topics / stats['messages']
# qos usage patterns
total_qos = sum(stats['qos_usage'].values())
high_qos_ratio = (stats['qos_usage'][1] + stats['qos_usage'][2]) / total_qos
# retain usage
retain_ratio = stats['retain_count'] / stats['messages']
# flag suspicious patterns
anomaly_score = 0
reasons = []
if message_rate > 10: # >10 msg/sec sustained
anomaly_score += 1
reasons.append('high message rate')
if topic_diversity < 0.1: # very few unique topics
anomaly_score += 1
reasons.append('low topic diversity')
if high_qos_ratio > 0.8: # mostly qos 1/2
anomaly_score += 1
reasons.append('unusual qos pattern')
if retain_ratio > 0.5: # >50% retained messages
anomaly_score += 1
reasons.append('high retain usage')
if anomaly_score >= 2:
print(f"suspicious client {client_id}: {', '.join(reasons)}")
print(f" rate: {message_rate:.1f} msg/s, topics: {unique_topics}")
def calculate_entropy(self, text):
"""calculate shannon entropy"""
from collections import counter
if not text:
return 0
counts = counter(text)
probs = [count/len(text) for count in counts.values()]
return -sum(p * math.log2(p) for p in probs if p > 0)
def looks_like_base64(self, text):
"""heuristic to detect base64 encoding"""
import re
# base64 pattern
base64_pattern = re.compile(r'^[a-za-z0-9+/]*={0,2}$')
if not base64_pattern.match(text):
return false
# length should be multiple of 4
if len(text) % 4 != 0:
return false
# calculate entropy
entropy = self.calculate_entropy(text)
return entropy > 3.5 # base64 typically has high entropy
# usage
detector = mqttcovertdetector('localhost')
detector.client.connect('localhost', 1883, 60)
detector.client.loop_forever()
network-level detection
# monitor mqtt connections
netstat -an | grep :1883
# capture mqtt traffic
tcpdump -i eth0 -w mqtt_capture.pcap port 1883
# analyze with wireshark
tshark -r mqtt_capture.pcap -y mqtt -t fields \
-e mqtt.msgtype -e mqtt.topic -e mqtt.qos -e mqtt.retain
# count unique topics per client
tshark -r mqtt_capture.pcap -y mqtt -t fields \
-e ip.src -e mqtt.topic | sort | uniq -c
countermeasures
broker configuration
# mosquitto security settings
allow_anonymous false
password_file /etc/mosquitto/passwd
# acl restrictions
acl_file /etc/mosquitto/acls
# connection limits
max_connections 100
max_inflight_messages 10
# topic restrictions
topic_pattern +/+/+ # limit topic depth
topic_max_length 100
access control list:
# /etc/mosquitto/acls
# deny all by default
pattern read $sys/#
# allow specific patterns only
user sensor_device
topic read sensors/device_001/+
topic write sensors/device_001/status
user controller
topic read sensors/+/+
topic write control/+/command
topic filtering
# mqtt topic filter for covert channel prevention
import re
from collections import defaultdict
class mqtttopicfilter:
def __init__(self):
self.allowed_patterns = [
r'^sensors/[a-z0-9_]+/[a-z_]+$',
r'^control/[a-z0-9_]+/[a-z_]+$',
r'^status/[a-z0-9_]+$'
]
self.blocked_patterns = [
r'.*[a-za-z0-9+/]{20,}.*', # base64-like strings
r'.*[0-9a-f]{32,}.*', # hex strings
r'.*/[^/]{100,}$' # very long topic components
]
self.client_topic_counts = defaultdict(set)
def is_allowed_topic(self, topic, client_id=none):
"""check if topic is allowed"""
# check against blocked patterns first
for pattern in self.blocked_patterns:
if re.match(pattern, topic, re.ignorecase):
print(f"blocked topic pattern: {topic}")
return false
# check against allowed patterns
for pattern in self.allowed_patterns:
if re.match(pattern, topic):
# track topic usage per client
if client_id:
self.client_topic_counts[client_id].add(topic)
# limit topics per client
if len(self.client_topic_counts[client_id]) > 50:
print(f"client {client_id} exceeded topic limit")
return false
return true
print(f"topic not in allowed patterns: {topic}")
return false
def filter_message(self, client_id, topic, payload, qos, retain):
"""comprehensive message filtering"""
# topic validation
if not self.is_allowed_topic(topic, client_id):
return false
# payload size limits
if len(payload) > 1024: # 1kb limit for iot messages
print(f"oversized payload from {client_id}: {len(payload)} bytes")
return false
# qos restrictions
if qos > 1: # limit to qos 0 and 1
print(f"high qos level from {client_id}: {qos}")
return false
# retain flag restrictions
if retain and not topic.endswith('/config'):
print(f"inappropriate retain flag from {client_id}")
return false
return true
# integration with broker
filter_instance = mqtttopicfilter()
def message_callback(client, userdata, message):
client_id = userdata.get('client_id', 'unknown')
if not filter_instance.filter_message(
client_id, message.topic, message.payload,
message.qos, message.retain):
# drop message
print(f"message dropped from {client_id}")
return
# process allowed message
print(f"allowed message: {message.topic}")
advantages and limitations
advantages
- widespread iot deployment provides cover
- flexible topic structure allows encoding variations
- publish-subscribe model supports one-to-many communication
- lightweight protocol suitable for low-bandwidth environments
- qos and retain flags provide additional metadata channels
limitations
- mqtt brokers may log all topics and messages
- simple protocol structure offers limited hiding places
- topic filtering can block covert channels effectively
- authentication requirements in secure deployments
- limited payload size for efficiency
performance characteristics
bandwidth analysis
technique | capacity | latency | stealth level |
---|---|---|---|
topic encoding | 10-100 bytes/msg | 50-200ms | medium |
payload embedding | 1-10kb/msg | 20-100ms | low |
qos/retain metadata | 3 bits/msg | minimal | high |
client id encoding | 50-200 bytes/conn | connection time | high |
broker performance impact
# measure mqtt broker performance impact
import time
import paho.mqtt.client as mqtt
import threading
def performance_test():
"""test mqtt broker under covert channel load"""
clients = []
message_count = 0
start_time = time.time()
def on_message(client, userdata, msg):
nonlocal message_count
message_count += 1
# create multiple covert clients
for i in range(10):
client = mqtt.client(f"covert_client_{i}")
client.on_message = on_message
client.connect('localhost', 1883, 60)
client.subscribe('test/+/+')
clients.append(client)
# send covert messages at high rate
for i in range(1000):
topic = f"test/{base64.b64encode(f'data{i}'.encode()).decode()}/status"
clients[i % 10].publish(topic, f"payload_{i}", qos=1)
if i % 100 == 0:
elapsed = time.time() - start_time
rate = message_count / elapsed if elapsed > 0 else 0
print(f"messages: {message_count}, rate: {rate:.1f} msg/s")
# cleanup
for client in clients:
client.disconnect()
# performance_test()
real-world applications
iot botnet communication
- command and control via mqtt topics
- distributed across multiple public brokers
- legitimate iot traffic provides cover
industrial espionage
- data exfiltration from smart factories
- sensor data embedding in mqtt messages
- long-term persistent access
research and testing
- academic studies on iot security
- penetration testing in iot environments
- covert channel capacity analysis
testing environment
local mqtt setup
# install mosquitto broker
sudo apt install mosquitto mosquitto-clients
# start broker
sudo systemctl start mosquitto
sudo systemctl enable mosquitto
# test basic functionality
mosquitto_pub -h localhost -t test/topic -m "hello world"
mosquitto_sub -h localhost -t test/topic
# test with authentication
sudo mosquitto_passwd -c /etc/mosquitto/passwd testuser
echo "password_file /etc/mosquitto/passwd" | sudo tee -a /etc/mosquitto/mosquitto.conf
sudo systemctl restart mosquitto
docker test environment
# mqtt test environment
from eclipse-mosquitto:latest
copy mosquitto.conf /mosquitto/config/mosquitto.conf
copy passwd /mosquitto/config/passwd
copy acl /mosquitto/config/acl
expose 1883 9001
cmd ["/usr/sbin/mosquitto", "-c", "/mosquitto/config/mosquitto.conf"]
# run test environment
docker build -t mqtt-test .
docker run -p 1883:1883 -p 9001:9001 mqtt-test
references
- mqtt version 5.0 specification - oasis standard
- rfc 6455: the websocket protocol (mqtt over websockets)
- “mqtt security fundamentals” - hivemq documentation
- “covert channels in iot protocols” - academic research
- eclipse mosquitto documentation