lorawan covert channels
on this page
lorawan covert channels leverage frame structure exploitation and timing-based techniques in lpwan environments, with cloaklora achieving ~38 bits per packet at 250m range using amplitude modulation.
technical description
lorawan (long range wide area network) is a protocol designed for iot devices requiring long-range, low-power communication. the protocol’s flexibility in payload structure and infrequent transmission patterns make it suitable for covert channels.
lorawan covert channel vectors:
- frame payload manipulation: embedding data in application payloads
- fport field exploitation: using frame port numbers for encoding
- timing-based channels: exploiting transmission intervals
- mac command abuse: hiding data in mac layer commands
- electromagnetic emanations: side-channel communication via rf patterns
lorawan frame structure
physical layer frame
preamble (8 symbols) | sync word (2 symbols) | phy payload | crc (2 bytes)
mac frame structure
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|mtype|rfu|major| devaddr (4 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| fcnt (2) | fctrl(1) | fport(1) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| frm payload (0-n bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| mic (4 bytes) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
covert channel opportunities
field | size | covert potential | detection difficulty |
---|---|---|---|
fport | 1 byte | medium - limited values | low |
frm payload | 0-242 bytes | high - application data | medium |
fcnt | 2 bytes | low - must increment | high |
fopts | 0-15 bytes | high - mac commands | high |
timing | variable | high - transmission intervals | very high |
implementation: cloaklora research
overview
cloaklora demonstrates lorawan electromagnetic covert channels:
- capacity: ~38 bits per typical lora packet
- range: 250m using amplitude modulation
- technique: rf emanation manipulation
- stealth: high - appears as normal lora transmission variations
amplitude modulation technique
# cloaklora-inspired amplitude modulation simulation
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
class loracovertchannel:
def __init__(self):
# lora parameters
self.bandwidth = 125000 # 125 khz
self.spreading_factor = 7 # sf7
self.coding_rate = 1 # 4/5
self.carrier_freq = 868100000 # 868.1 mhz (eu band)
# covert channel parameters
self.amplitude_levels = [0.5, 0.7, 0.9, 1.0] # 4 levels = 2 bits per symbol
self.symbol_duration = self.calculate_symbol_duration()
def calculate_symbol_duration(self):
"""calculate lora symbol duration"""
return (2 ** self.spreading_factor) / self.bandwidth
def encode_data_in_amplitude(self, binary_data):
"""encode binary data using amplitude modulation"""
# split binary data into 2-bit chunks
amplitude_sequence = []
for i in range(0, len(binary_data), 2):
chunk = binary_data[i:i+2].ljust(2, '0') # pad if needed
amplitude_index = int(chunk, 2)
amplitude = self.amplitude_levels[amplitude_index]
amplitude_sequence.append(amplitude)
return amplitude_sequence
def generate_modulated_signal(self, data, duration=1.0):
"""generate lora signal with amplitude modulation"""
# encode data in amplitudes
amplitude_sequence = self.encode_data_in_amplitude(data)
# time vector
sample_rate = self.bandwidth * 4 # oversample
t = np.linspace(0, duration, int(sample_rate * duration))
# generate base lora chirp
chirp_signal = self.generate_lora_chirp(t)
# apply amplitude modulation
modulated_signal = np.zeros_like(chirp_signal)
samples_per_symbol = len(t) // len(amplitude_sequence)
for i, amplitude in enumerate(amplitude_sequence):
start_idx = i * samples_per_symbol
end_idx = min((i + 1) * samples_per_symbol, len(chirp_signal))
if start_idx < len(chirp_signal):
modulated_signal[start_idx:end_idx] = (
chirp_signal[start_idx:end_idx] * amplitude
)
return t, modulated_signal
def generate_lora_chirp(self, t):
"""generate lora chirp signal"""
# lora uses chirp spread spectrum
# frequency sweeps from -bw/2 to +bw/2
freq_start = -self.bandwidth / 2
freq_end = self.bandwidth / 2
# linear frequency modulation
chirp = signal.chirp(t,
freq_start,
t[-1],
freq_end,
method='linear')
return chirp
def extract_amplitude_data(self, signal_samples, sample_rate):
"""extract covert data from amplitude variations"""
# calculate envelope using hilbert transform
analytic_signal = signal.hilbert(signal_samples)
amplitude_envelope = np.abs(analytic_signal)
# segment into symbols
symbol_samples = int(sample_rate * self.symbol_duration)
num_symbols = len(amplitude_envelope) // symbol_samples
decoded_bits = ''
for i in range(num_symbols):
start_idx = i * symbol_samples
end_idx = (i + 1) * symbol_samples
# average amplitude for this symbol
symbol_amplitude = np.mean(amplitude_envelope[start_idx:end_idx])
# quantize to nearest amplitude level
closest_level = min(range(len(self.amplitude_levels)),
key=lambda x: abs(self.amplitude_levels[x] - symbol_amplitude))
# convert level index to 2-bit binary
bits = format(closest_level, '02b')
decoded_bits += bits
return decoded_bits
def simulate_covert_transmission(self, message):
"""simulate complete covert transmission"""
print(f"encoding message: '{message}'")
# convert to binary
binary_data = ''.join(format(ord(c), '08b') for c in message)
print(f"binary data: {binary_data} ({len(binary_data)} bits)")
# generate modulated signal
t, modulated_signal = self.generate_modulated_signal(binary_data, duration=2.0)
# simulate transmission and reception
# add noise
noise_power = 0.01
received_signal = modulated_signal + np.random.normal(0, noise_power, len(modulated_signal))
# decode
sample_rate = len(t) / (t[-1] - t[0])
decoded_bits = self.extract_amplitude_data(received_signal, sample_rate)
# convert back to text
decoded_message = ''
for i in range(0, len(decoded_bits), 8):
byte_bits = decoded_bits[i:i+8]
if len(byte_bits) == 8:
decoded_message += chr(int(byte_bits, 2))
print(f"decoded message: '{decoded_message}'")
# calculate capacity
transmission_time = len(t) / sample_rate
capacity_bps = len(binary_data) / transmission_time
print(f"transmission time: {transmission_time:.2f}s")
print(f"capacity: {capacity_bps:.1f} bits/second")
return t, modulated_signal, received_signal, decoded_message
# usage
channel = loracovertchannel()
t, tx_signal, rx_signal, decoded = channel.simulate_covert_transmission("secret")
lorawan payload covert channel
# lorawan application payload covert channel
import struct
import json
from cryptography.fernet import fernet
class lorawanpayloadcovert:
def __init__(self):
# lorawan parameters
self.max_payload_size = 242 # bytes (sf7)
self.encryption_key = fernet.generate_key()
self.cipher = fernet(self.encryption_key)
# legitimate sensor data templates
self.sensor_templates = {
'temperature': {'temp': 23.5, 'humidity': 45.2},
'motion': {'detected': false, 'count': 0},
'battery': {'voltage': 3.7, 'percentage': 85},
'gps': {'lat': 40.7128, 'lon': -74.0060, 'alt': 10}
}
def create_legitimate_payload(self, sensor_type='temperature'):
"""create legitimate-looking sensor payload"""
if sensor_type in self.sensor_templates:
data = self.sensor_templates[sensor_type].copy()
# add realistic variations
import random
if sensor_type == 'temperature':
data['temp'] += random.uniform(-2, 2)
data['humidity'] += random.uniform(-5, 5)
elif sensor_type == 'battery':
data['voltage'] += random.uniform(-0.1, 0.1)
data['percentage'] = max(0, min(100, data['percentage'] + random.randint(-2, 2)))
return json.dumps(data).encode()
return b'{"status": "ok"}'
def embed_covert_data(self, legitimate_payload, covert_data):
"""embed covert data in lorawan payload"""
# method 1: steganography in json values
if legitimate_payload.startswith(b'{'):
return self.embed_in_json(legitimate_payload, covert_data)
# method 2: binary payload with hidden data
return self.embed_in_binary(legitimate_payload, covert_data)
def embed_in_json(self, json_payload, covert_data):
"""hide data in json floating point precision"""
try:
data = json.loads(json_payload.decode())
# encrypt covert data
encrypted_covert = self.cipher.encrypt(covert_data.encode())
# convert to integer for embedding
covert_int = int.from_bytes(encrypted_covert[:8], byteorder='big') # limit size
# embed in floating point precision
for key, value in data.items():
if isinstance(value, float):
# encode in least significant digits
precision_factor = 10000
covert_part = (covert_int % precision_factor) / precision_factor
data[key] = int(value) + covert_part
break
return json.dumps(data).encode()
except exception as e:
print(f"json embedding failed: {e}")
return json_payload
def embed_in_binary(self, binary_payload, covert_data):
"""embed covert data in binary payload"""
# encrypt covert data
encrypted_covert = self.cipher.encrypt(covert_data.encode())
# append to payload with marker
marker = b'\xaa\xbb' # covert data marker
combined = binary_payload + marker + encrypted_covert
# ensure within lorawan size limits
if len(combined) <= self.max_payload_size:
return combined
else:
# truncate covert data if necessary
max_covert_size = self.max_payload_size - len(binary_payload) - len(marker)
truncated_covert = encrypted_covert[:max_covert_size]
return binary_payload + marker + truncated_covert
def extract_covert_data(self, payload):
"""extract covert data from lorawan payload"""
# try json extraction first
if payload.startswith(b'{'):
return self.extract_from_json(payload)
# try binary extraction
return self.extract_from_binary(payload)
def extract_from_json(self, json_payload):
"""extract covert data from json payload"""
try:
data = json.loads(json_payload.decode())
# look for embedded precision data
for key, value in data.items():
if isinstance(value, float):
# extract fractional part
fractional = value - int(value)
covert_int = int(fractional * 10000)
if covert_int > 0:
# convert back to bytes
covert_bytes = covert_int.to_bytes(8, byteorder='big')
try:
# decrypt
decrypted = self.cipher.decrypt(covert_bytes)
return decrypted.decode()
except:
continue
except exception as e:
print(f"json extraction failed: {e}")
return none
def extract_from_binary(self, binary_payload):
"""extract covert data from binary payload"""
marker = b'\xaa\xbb'
marker_pos = binary_payload.find(marker)
if marker_pos != -1:
covert_data = binary_payload[marker_pos + len(marker):]
try:
decrypted = self.cipher.decrypt(covert_data)
return decrypted.decode()
except:
pass
return none
def create_covert_lorawan_frame(self, device_addr, frame_count, covert_message):
"""create complete lorawan frame with covert data"""
# create legitimate payload
legit_payload = self.create_legitimate_payload('temperature')
# embed covert data
covert_payload = self.embed_covert_data(legit_payload, covert_message)
# build lorawan mac frame
mhdr = 0x40 # confirmed data up
dev_addr = struct.pack('<i', device_addr) # little endian
fctrl = 0x00 # no mac commands
fcnt = struct.pack('<h', frame_count) # frame counter
fport = 1 # application port
# frame header
fhdr = dev_addr + fctrl.to_bytes(1, 'big') + fcnt + bytes([fport])
# complete mac payload
mac_payload = fhdr + covert_payload
# calculate mic (simplified - would use aes-cmac in real implementation)
mic = self.calculate_mic(mac_payload, frame_count)
# complete frame
lorawan_frame = bytes([mhdr]) + mac_payload + mic
return lorawan_frame
def calculate_mic(self, payload, frame_count):
"""calculate message integrity code (simplified)"""
# in real lorawan, this would be aes-cmac
# here we use simple checksum for demonstration
checksum = sum(payload + frame_count.to_bytes(2, 'little'))
return (checksum & 0xffffffff).to_bytes(4, 'big')
# usage
covert = lorawanpayloadcovert()
frame = covert.create_covert_lorawan_frame(0x12345678, 42, "hidden message")
print(f"lorawan frame: {frame.hex()}")
timing-based covert channels
transmission interval manipulation
# lorawan timing covert channel
import time
import random
import statistics
class lorawantimingchannel:
def __init__(self):
# lorawan duty cycle limits (eu868)
self.duty_cycle_limit = 0.01 # 1% duty cycle
self.min_interval = 60 # minimum seconds between transmissions
# timing encoding
self.short_interval = 60 # represents binary 0
self.long_interval = 120 # represents binary 1
self.sync_interval = 300 # synchronization pattern
def encode_timing_message(self, binary_data):
"""encode binary message in transmission timing"""
transmission_schedule = []
current_time = time.time()
# synchronization burst
for _ in range(3):
transmission_schedule.append(current_time)
current_time += self.sync_interval
# encode data bits
for bit in binary_data:
interval = self.long_interval if bit == '1' else self.short_interval
# add random jitter to avoid detection (±10%)
jitter = interval * random.uniform(-0.1, 0.1)
actual_interval = interval + jitter
current_time += actual_interval
transmission_schedule.append(current_time)
return transmission_schedule
def decode_timing_message(self, transmission_times):
"""decode binary message from transmission timing"""
if len(transmission_times) < 4:
return none
# calculate intervals
intervals = []
for i in range(1, len(transmission_times)):
interval = transmission_times[i] - transmission_times[i-1]
intervals.append(interval)
# detect synchronization pattern
sync_count = 0
data_start = 0
for i, interval in enumerate(intervals):
if abs(interval - self.sync_interval) < 30: # tolerance
sync_count += 1
else:
if sync_count >= 2: # found sync pattern
data_start = i + 1
break
sync_count = 0
if data_start == 0:
print("no synchronization pattern found")
return none
# decode data intervals
data_intervals = intervals[data_start:]
decoded_bits = ''
for interval in data_intervals:
if abs(interval - self.short_interval) < abs(interval - self.long_interval):
decoded_bits += '0'
else:
decoded_bits += '1'
return decoded_bits
def simulate_lorawan_timing_channel(self, message):
"""simulate complete timing channel transmission"""
print(f"encoding message: '{message}'")
# convert to binary
binary_data = ''.join(format(ord(c), '08b') for c in message)
print(f"binary data: {binary_data}")
# generate transmission schedule
schedule = self.encode_timing_message(binary_data)
print(f"transmission schedule ({len(schedule)} packets):")
for i, tx_time in enumerate(schedule):
print(f" packet {i}: {time.ctime(tx_time)}")
# simulate reception with timing jitter
received_times = []
for tx_time in schedule:
# add reception timing error (±2 seconds)
rx_time = tx_time + random.uniform(-2, 2)
received_times.append(rx_time)
# decode received timing
decoded_bits = self.decode_timing_message(received_times)
if decoded_bits:
# convert back to text
decoded_message = ''
for i in range(0, len(decoded_bits), 8):
byte_bits = decoded_bits[i:i+8]
if len(byte_bits) == 8:
decoded_message += chr(int(byte_bits, 2))
print(f"decoded message: '{decoded_message}'")
# calculate channel capacity
total_time = schedule[-1] - schedule[0]
capacity = len(binary_data) / total_time
print(f"channel capacity: {capacity:.4f} bits/second")
return decoded_bits
# usage
timing_channel = lorawantimingchannel()
result = timing_channel.simulate_lorawan_timing_channel("hi")
detection methods
lorawan traffic analysis
# lorawan covert channel detection
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
class lorawancovertdetector:
def __init__(self):
self.device_stats = defaultdict(lambda: {
'transmissions': [],
'payload_sizes': [],
'intervals': [],
'fports': [],
'sf_usage': defaultdict(int),
'first_seen': none,
'last_seen': none
})
def analyze_lorawan_packet(self, timestamp, dev_addr, payload_size,
fport, spreading_factor, payload_data):
"""analyze individual lorawan packet"""
stats = self.device_stats[dev_addr]
# update timing statistics
stats['transmissions'].append(timestamp)
stats['payload_sizes'].append(payload_size)
stats['fports'].append(fport)
stats['sf_usage'][spreading_factor] += 1
if stats['first_seen'] is none:
stats['first_seen'] = timestamp
stats['last_seen'] = timestamp
# calculate intervals
if len(stats['transmissions']) > 1:
interval = timestamp - stats['transmissions'][-2]
stats['intervals'].append(interval)
# analyze payload for anomalies
self.analyze_payload_anomalies(dev_addr, payload_data, fport)
# check for timing anomalies
if len(stats['intervals']) >= 10:
self.check_timing_anomalies(dev_addr)
def analyze_payload_anomalies(self, dev_addr, payload_data, fport):
"""analyze payload for covert channel indicators"""
anomalies = []
# check payload entropy
if payload_data:
entropy = self.calculate_entropy(payload_data)
if entropy > 7.5: # high entropy suggests encryption/encoding
anomalies.append(f'high payload entropy: {entropy:.2f}')
# check for binary markers
if b'\xaa\xbb' in payload_data:
anomalies.append('suspicious binary marker detected')
# check json payload anomalies
if payload_data.startswith(b'{'):
json_anomalies = self.analyze_json_payload(payload_data)
anomalies.extend(json_anomalies)
# unusual fport usage
if fport > 200:
anomalies.append(f'unusual fport value: {fport}')
if anomalies:
print(f"payload anomalies from device {dev_addr:08x}:")
for anomaly in anomalies:
print(f" {anomaly}")
def analyze_json_payload(self, json_data):
"""analyze json payload for steganography"""
anomalies = []
try:
import json
data = json.loads(json_data.decode())
# check for unusual precision in float values
for key, value in data.items():
if isinstance(value, float):
# check decimal places
decimal_str = str(value).split('.')[1] if '.' in str(value) else ''
if len(decimal_str) > 6: # excessive precision
anomalies.append(f'excessive precision in {key}: {value}')
# check for non-random decimal patterns
if self.has_pattern_in_decimals(decimal_str):
anomalies.append(f'patterned decimals in {key}: {value}')
except exception as e:
anomalies.append(f'json parsing error: {e}')
return anomalies
def check_timing_anomalies(self, dev_addr):
"""check for timing-based covert channels"""
stats = self.device_stats[dev_addr]
intervals = stats['intervals'][-50:] # recent intervals
if len(intervals) < 10:
return
anomalies = []
# statistical analysis
mean_interval = statistics.mean(intervals)
median_interval = statistics.median(intervals)
stdev_interval = statistics.stdev(intervals)
# coefficient of variation
cv = stdev_interval / mean_interval if mean_interval > 0 else 0
# check for bimodal distribution (timing channel signature)
interval_counts = defaultdict(int)
for interval in intervals:
# bin intervals to nearest 30 seconds
binned = round(interval / 30) * 30
interval_counts[binned] += 1
# look for two dominant intervals
sorted_counts = sorted(interval_counts.items(), key=lambda x: x[1], reverse=true)
if (len(sorted_counts) >= 2 and
sorted_counts[0][1] + sorted_counts[1][1] > len(intervals) * 0.8):
ratio = sorted_counts[0][0] / sorted_counts[1][0] if sorted_counts[1][0] > 0 else 0
if 1.5 < ratio < 3.0: # common encoding ratios
anomalies.append(f'bimodal timing pattern: {sorted_counts[0][0]}s/{sorted_counts[1][0]}s')
# check for regular patterns
if cv < 0.2: # very regular timing
anomalies.append(f'unusually regular timing (cv={cv:.3f})')
# duty cycle analysis
total_time = stats['last_seen'] - stats['first_seen']
if total_time > 0:
transmission_rate = len(stats['transmissions']) / (total_time / 3600) # per hour
if transmission_rate > 60: # more than 1 per minute average
anomalies.append(f'high transmission rate: {transmission_rate:.1f}/hour')
if anomalies:
print(f"timing anomalies from device {dev_addr:08x}:")
for anomaly in anomalies:
print(f" {anomaly}")
def calculate_entropy(self, data):
"""calculate shannon entropy of data"""
from collections import counter
import math
if not data:
return 0
counts = counter(data)
probs = [count/len(data) for count in counts.values()]
return -sum(p * math.log2(p) for p in probs if p > 0)
def has_pattern_in_decimals(self, decimal_str):
"""check for patterns in decimal places"""
if len(decimal_str) < 4:
return false
# check for repeated sequences
for length in range(1, len(decimal_str) // 2 + 1):
pattern = decimal_str[:length]
repetitions = len(decimal_str) // length
if pattern * repetitions == decimal_str[:len(pattern) * repetitions]:
if repetitions > 2: # pattern repeats more than twice
return true
return false
def generate_detection_report(self, dev_addr):
"""generate comprehensive detection report for device"""
stats = self.device_stats[dev_addr]
if not stats['transmissions']:
return none
report = {
'device_address': f'{dev_addr:08x}',
'observation_period': {
'start': stats['first_seen'],
'end': stats['last_seen'],
'duration_hours': (stats['last_seen'] - stats['first_seen']) / 3600
},
'transmission_stats': {
'total_packets': len(stats['transmissions']),
'avg_interval': statistics.mean(stats['intervals']) if stats['intervals'] else 0,
'median_interval': statistics.median(stats['intervals']) if stats['intervals'] else 0,
'interval_stdev': statistics.stdev(stats['intervals']) if len(stats['intervals']) > 1 else 0
},
'payload_stats': {
'avg_size': statistics.mean(stats['payload_sizes']) if stats['payload_sizes'] else 0,
'size_variation': statistics.stdev(stats['payload_sizes']) if len(stats['payload_sizes']) > 1 else 0,
'unique_fports': len(set(stats['fports']))
},
'spreading_factor_usage': dict(stats['sf_usage'])
}
# calculate anomaly score
anomaly_score = 0
# timing regularity
if report['transmission_stats']['interval_stdev'] > 0:
cv = (report['transmission_stats']['interval_stdev'] /
report['transmission_stats']['avg_interval'])
if cv < 0.2:
anomaly_score += 2
# high transmission rate
if report['observation_period']['duration_hours'] > 0:
rate = (report['transmission_stats']['total_packets'] /
report['observation_period']['duration_hours'])
if rate > 10: # more than 10 per hour
anomaly_score += 1
# payload size consistency
if report['payload_stats']['size_variation'] < 5: # very consistent sizes
anomaly_score += 1
report['anomaly_score'] = anomaly_score
report['risk_level'] = 'high' if anomaly_score >= 3 else 'medium' if anomaly_score >= 2 else 'low'
return report
# usage
detector = lorawancovertdetector()
# detector.analyze_lorawan_packet(time.time(), 0x12345678, 25, 1, 7, b'{"temp": 23.5}')
rf analysis for electromagnetic channels
# lorawan rf analysis for cloaklora-style channels
import numpy as np
from scipy import signal, fft
import matplotlib.pyplot as plt
class lorawanrfanalyzer:
def __init__(self):
self.sample_rate = 1000000 # 1 mhz
self.lora_bandwidth = 125000 # 125 khz
self.center_freq = 868100000 # 868.1 mhz
def analyze_amplitude_modulation(self, rf_samples):
"""analyze rf samples for amplitude-based covert channels"""
# extract amplitude envelope
analytic_signal = signal.hilbert(rf_samples)
amplitude_envelope = np.abs(analytic_signal)
# detect amplitude variations
amplitude_variation = np.std(amplitude_envelope)
mean_amplitude = np.mean(amplitude_envelope)
cv_amplitude = amplitude_variation / mean_amplitude
# look for periodic amplitude patterns
amplitude_fft = np.fft.fft(amplitude_envelope)
amplitude_freqs = np.fft.fftfreq(len(amplitude_envelope), 1/self.sample_rate)
# find dominant frequencies in amplitude variations
amplitude_power = np.abs(amplitude_fft)**2
peak_indices = signal.find_peaks(amplitude_power, height=np.max(amplitude_power)*0.1)[0]
dominant_freqs = amplitude_freqs[peak_indices]
analysis = {
'amplitude_variation': amplitude_variation,
'coefficient_of_variation': cv_amplitude,
'dominant_frequencies': dominant_freqs[:5], # top 5
'covert_channel_likely': cv_amplitude > 0.1 # threshold for detection
}
return analysis
def detect_timing_patterns(self, packet_timestamps):
"""detect covert timing patterns in packet transmissions"""
if len(packet_timestamps) < 10:
return {'insufficient_data': true}
# calculate inter-packet intervals
intervals = np.diff(packet_timestamps)
# statistical analysis
mean_interval = np.mean(intervals)
std_interval = np.std(intervals)
cv_interval = std_interval / mean_interval if mean_interval > 0 else 0
# histogram analysis for bimodal detection
hist, bin_edges = np.histogram(intervals, bins=20)
# find peaks in histogram
peaks, _ = signal.find_peaks(hist, height=np.max(hist)*0.3)
# bimodal detection
is_bimodal = len(peaks) >= 2
if is_bimodal:
# calculate ratio between dominant intervals
peak_positions = bin_edges[peaks]
if len(peak_positions) >= 2:
interval_ratio = peak_positions[0] / peak_positions[1]
else:
interval_ratio = 1
else:
interval_ratio = 1
analysis = {
'mean_interval': mean_interval,
'interval_cv': cv_interval,
'is_bimodal': is_bimodal,
'interval_ratio': interval_ratio,
'num_peaks': len(peaks),
'covert_timing_likely': is_bimodal and 1.5 < interval_ratio < 3.0
}
return analysis
def analyze_lorawan_spectrum(self, rf_samples):
"""analyze spectrum for lorawan signal anomalies"""
# compute spectrogram
frequencies, times, spectrogram = signal.spectrogram(
rf_samples,
fs=self.sample_rate,
window='hann',
nperseg=1024,
noverlap=512
)
# focus on lorawan bandwidth
lora_freq_mask = (frequencies >= -self.lora_bandwidth/2) & (frequencies <= self.lora_bandwidth/2)
lora_spectrum = spectrogram[lora_freq_mask, :]
# analyze spectral features
spectral_variation = np.std(lora_spectrum, axis=1)
spectral_peaks = []
for freq_bin in range(len(spectral_variation)):
if spectral_variation[freq_bin] > np.mean(spectral_variation) + 2*np.std(spectral_variation):
spectral_peaks.append(frequencies[lora_freq_mask][freq_bin])
# chirp analysis
chirp_linearity = self.analyze_chirp_linearity(lora_spectrum, times)
analysis = {
'spectral_peaks': spectral_peaks,
'chirp_linearity': chirp_linearity,
'spectrum_anomaly': len(spectral_peaks) > 5 # threshold
}
return analysis
def analyze_chirp_linearity(self, spectrum, times):
"""analyze lorawan chirp linearity for modulation detection"""
# track instantaneous frequency across time
instantaneous_freqs = []
for time_bin in range(spectrum.shape[1]):
# find frequency with maximum power at this time
max_power_idx = np.argmax(spectrum[:, time_bin])
instantaneous_freqs.append(max_power_idx)
# calculate linearity of frequency sweep
if len(instantaneous_freqs) > 1:
# fit linear regression to frequency vs time
coefficients = np.polyfit(range(len(instantaneous_freqs)), instantaneous_freqs, 1)
residuals = instantaneous_freqs - np.polyval(coefficients, range(len(instantaneous_freqs)))
linearity_error = np.std(residuals)
return {
'linearity_error': linearity_error,
'is_linear': linearity_error < 5, # threshold
'slope': coefficients[0]
}
return {'insufficient_data': true}
def comprehensive_covert_analysis(self, rf_samples, packet_timestamps):
"""comprehensive analysis for all lorawan covert channel types"""
results = {
'timestamp': time.time(),
'sample_count': len(rf_samples),
'packet_count': len(packet_timestamps)
}
# amplitude modulation analysis
amplitude_analysis = self.analyze_amplitude_modulation(rf_samples)
results['amplitude_covert'] = amplitude_analysis
# timing analysis
timing_analysis = self.detect_timing_patterns(packet_timestamps)
results['timing_covert'] = timing_analysis
# spectrum analysis
spectrum_analysis = self.analyze_lorawan_spectrum(rf_samples)
results['spectrum_covert'] = spectrum_analysis
# overall risk assessment
risk_factors = 0
if amplitude_analysis.get('covert_channel_likely', false):
risk_factors += 2
if timing_analysis.get('covert_timing_likely', false):
risk_factors += 2
if spectrum_analysis.get('spectrum_anomaly', false):
risk_factors += 1
results['overall_risk'] = 'high' if risk_factors >= 3 else 'medium' if risk_factors >= 2 else 'low'
results['risk_score'] = risk_factors
return results
# usage
# analyzer = lorawanrfanalyzer()
# analysis = analyzer.comprehensive_covert_analysis(rf_data, packet_times)
countermeasures
network-level monitoring
# lorawan network monitoring setup
# install chirpstack for lorawan network analysis
docker-compose up chirpstack-network-server chirpstack-application-server
# monitor lorawan traffic
tcpdump -i any -w lorawan_traffic.pcap 'port 1700'
# analyze packet timing
tshark -r lorawan_traffic.pcap -t fields -e frame.time_epoch -e data.data \
| awk '{print $1}' | sort -n | awk 'nr>1 {print $1-prev} {prev=$1}'
payload filtering
# lorawan payload filtering and normalization
import json
import re
from cryptography.fernet import fernet
class lorawanpayloadfilter:
def __init__(self):
self.max_payload_size = 51 # restrict to sf12 size for security
self.allowed_fports = [1, 2, 10, 20] # whitelist approach
self.entropy_threshold = 6.0
# payload sanitization rules
self.sanitization_rules = {
'max_decimal_places': 2,
'allowed_json_keys': ['temp', 'humidity', 'battery', 'lat', 'lon'],
'max_string_length': 20
}
def filter_lorawan_packet(self, dev_addr, fport, payload_data):
"""filter and sanitize lorawan packets"""
# check fport whitelist
if fport not in self.allowed_fports:
print(f"blocked packet from {dev_addr:08x}: invalid fport {fport}")
return none
# check payload size
if len(payload_data) > self.max_payload_size:
print(f"blocked packet from {dev_addr:08x}: oversized payload ({len(payload_data)} bytes)")
return none
# check payload entropy
entropy = self.calculate_entropy(payload_data)
if entropy > self.entropy_threshold:
print(f"blocked packet from {dev_addr:08x}: high entropy payload ({entropy:.2f})")
return none
# sanitize payload content
sanitized_payload = self.sanitize_payload(payload_data)
if sanitized_payload != payload_data:
print(f"sanitized payload from {dev_addr:08x}")
return sanitized_payload
def sanitize_payload(self, payload_data):
"""sanitize payload content"""
# try json sanitization
if payload_data.startswith(b'{'):
return self.sanitize_json_payload(payload_data)
# binary payload sanitization
return self.sanitize_binary_payload(payload_data)
def sanitize_json_payload(self, json_data):
"""sanitize json payload"""
try:
data = json.loads(json_data.decode())
sanitized_data = {}
for key, value in data.items():
# check key whitelist
if key not in self.sanitization_rules['allowed_json_keys']:
continue
# sanitize values
if isinstance(value, float):
# limit decimal places
value = round(value, self.sanitization_rules['max_decimal_places'])
elif isinstance(value, str):
# limit string length
value = value[:self.sanitization_rules['max_string_length']]
elif isinstance(value, int):
# keep integers as-is but limit range
value = max(-1000000, min(1000000, value))
else:
# remove unsupported types
continue
sanitized_data[key] = value
return json.dumps(sanitized_data).encode()
except exception as e:
print(f"json sanitization failed: {e}")
return b'{"error": "invalid_json"}'
def sanitize_binary_payload(self, binary_data):
"""sanitize binary payload"""
# remove potential covert markers
sanitized = binary_data.replace(b'\xaa\xbb', b'')
sanitized = sanitized.replace(b'\xcc\xdd', b'')
sanitized = sanitized.replace(b'\xee\xff', b'')
# limit to first 32 bytes for safety
return sanitized[:32]
def calculate_entropy(self, data):
"""calculate shannon entropy"""
from collections import counter
import math
if not data:
return 0
counts = counter(data)
probs = [count/len(data) for count in counts.values()]
return -sum(p * math.log2(p) for p in probs if p > 0)
# lorawan traffic shaping
class lorawantrafficshaper:
def __init__(self):
# rate limiting per device
self.device_limits = {
'packets_per_hour': 10,
'bytes_per_hour': 1000,
'burst_limit': 3 # max packets in 5 minutes
}
self.device_usage = defaultdict(lambda: {
'packets_this_hour': 0,
'bytes_this_hour': 0,
'last_hour_reset': time.time(),
'recent_packets': []
})
def should_allow_packet(self, dev_addr, payload_size):
"""check if packet should be allowed based on rate limits"""
current_time = time.time()
usage = self.device_usage[dev_addr]
# reset hourly counters
if current_time - usage['last_hour_reset'] >= 3600:
usage['packets_this_hour'] = 0
usage['bytes_this_hour'] = 0
usage['last_hour_reset'] = current_time
# check hourly limits
if usage['packets_this_hour'] >= self.device_limits['packets_per_hour']:
print(f"rate limit exceeded for {dev_addr:08x}: packets per hour")
return false
if usage['bytes_this_hour'] + payload_size > self.device_limits['bytes_per_hour']:
print(f"rate limit exceeded for {dev_addr:08x}: bytes per hour")
return false
# check burst limits
recent_packets = [t for t in usage['recent_packets'] if current_time - t < 300] # last 5 minutes
if len(recent_packets) >= self.device_limits['burst_limit']:
print(f"burst limit exceeded for {dev_addr:08x}")
return false
# update usage
usage['packets_this_hour'] += 1
usage['bytes_this_hour'] += payload_size
usage['recent_packets'] = recent_packets + [current_time]
return true
# usage
payload_filter = lorawanpayloadfilter()
traffic_shaper = lorawantrafficshaper()
# sanitized = payload_filter.filter_lorawan_packet(0x12345678, 1, b'{"temp": 23.456789}')
# allowed = traffic_shaper.should_allow_packet(0x12345678, len(sanitized) if sanitized else 0)
advantages and limitations
advantages
- low power consumption: suitable for battery-operated devices
- long range communication: coverage up to several kilometers
- infrequent transmission: natural timing variability provides cover
- limited monitoring: lorawan networks often have minimal security monitoring
- multiple encoding vectors: payload, timing, fport, rf modulation
limitations
- very low bandwidth: typical duty cycle limits severely restrict data rates
- range limitations: covert rf techniques require proximity to receiver
- packet loss: long-range transmission subject to interference and fading
- duty cycle restrictions: regulatory limits on transmission frequency
- limited deployment: lorawan infrastructure not universally available
performance characteristics
capacity analysis
technique | capacity | range | stealth level | detection difficulty |
---|---|---|---|---|
payload embedding | 50-200 bytes/packet | network range | medium | medium |
timing channels | 1-2 bits/packet | network range | high | high |
fport encoding | 8 bits/packet | network range | low | low |
rf amplitude mod | ~38 bits/packet | 250m | high | very high |
combined approach | 250+ bits/packet | variable | medium | medium |
duty cycle impact
# lorawan duty cycle analysis
def analyze_duty_cycle_impact():
"""analyze how duty cycle affects covert channel capacity"""
# eu868 band duty cycle limits
duty_cycles = {
'g1_bands': 0.01, # 1% - most restrictive
'g2_bands': 0.001, # 0.1% - very restrictive
'g3_bands': 0.1 # 10% - less restrictive
}
# typical lorawan parameters
packet_duration = 0.5 # 500ms for sf12
packets_per_day = {}
for band, duty_cycle in duty_cycles.items():
# calculate max packets per day
seconds_per_day = 86400
max_air_time = seconds_per_day * duty_cycle
max_packets = int(max_air_time / packet_duration)
packets_per_day[band] = max_packets
print(f"{band}: {duty_cycle*100}% duty cycle = {max_packets} packets/day")
# covert channel capacity
if max_packets > 0:
# assume 50 bytes covert data per packet
covert_bytes_per_day = max_packets * 50
covert_bits_per_second = (covert_bytes_per_day * 8) / seconds_per_day
print(f" covert capacity: {covert_bytes_per_day} bytes/day ({covert_bits_per_second:.4f} bps)")
return packets_per_day
# analyze_duty_cycle_impact()
real-world applications
iot surveillance
- long-term monitoring with minimal detection risk
- coordination between distributed sensor networks
- backup communication for compromised primary channels
industrial espionage
- exfiltration from smart factories and industrial iot
- monitoring production data and operational parameters
- maintaining persistent access in air-gapped facilities
research applications
- lorawan security assessment
- covert channel capacity studies
- electromagnetic emanation analysis
references
- lorawan 1.0.4 specification - lora alliance
- rp002-1.0.3 lorawan regional parameters - lora alliance
- “cloaklora: a covert channel over lorawan phy” - academic research
- “security analysis of lorawan networks” - iot security studies
- chirpstack lorawan network server documentation