lorawan covert channels
on this page
lorawan covert channels leverage frame structure exploitation and timing-based techniques in lpwan environments, with cloaklora achieving ~38 bits per packet at 250m range using amplitude modulation.
technical description
lorawan (long range wide area network) is a protocol designed for iot devices requiring long-range, low-power communication. the protocol’s flexibility in payload structure and infrequent transmission patterns make it suitable for covert channels.
lorawan covert channel vectors:
- frame payload manipulation: embedding data in application payloads
- fport field exploitation: using frame port numbers for encoding
- timing-based channels: exploiting transmission intervals
- mac command abuse: hiding data in mac layer commands
- electromagnetic emanations: side-channel communication via rf patterns
lorawan frame structure
physical layer frame
preamble (8 symbols) | sync word (2 symbols) | phy payload | crc (2 bytes)
mac frame structure
 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|mtype|rfu|major|                  devaddr (4 bytes)           |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|     fcnt (2)  |  fctrl(1)     |    fport(1)   |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                 frm payload (0-n bytes)                       |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                        mic (4 bytes)                          |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
covert channel opportunities
| field | size | covert potential | detection difficulty | 
|---|---|---|---|
| fport | 1 byte | medium - limited values | low | 
| frm payload | 0-242 bytes | high - application data | medium | 
| fcnt | 2 bytes | low - must increment | high | 
| fopts | 0-15 bytes | high - mac commands | high | 
| timing | variable | high - transmission intervals | very high | 
implementation: cloaklora research
overview
cloaklora demonstrates lorawan electromagnetic covert channels:
- capacity: ~38 bits per typical lora packet
- range: 250m using amplitude modulation
- technique: rf emanation manipulation
- stealth: high - appears as normal lora transmission variations
amplitude modulation technique
# cloaklora-inspired amplitude modulation simulation
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
class loracovertchannel:
    def __init__(self):
        # lora parameters
        self.bandwidth = 125000  # 125 khz
        self.spreading_factor = 7  # sf7
        self.coding_rate = 1  # 4/5
        self.carrier_freq = 868100000  # 868.1 mhz (eu band)
        # covert channel parameters
        self.amplitude_levels = [0.5, 0.7, 0.9, 1.0]  # 4 levels = 2 bits per symbol
        self.symbol_duration = self.calculate_symbol_duration()
    def calculate_symbol_duration(self):
        """calculate lora symbol duration"""
        return (2 ** self.spreading_factor) / self.bandwidth
    def encode_data_in_amplitude(self, binary_data):
        """encode binary data using amplitude modulation"""
        # split binary data into 2-bit chunks
        amplitude_sequence = []
        for i in range(0, len(binary_data), 2):
            chunk = binary_data[i:i+2].ljust(2, '0')  # pad if needed
            amplitude_index = int(chunk, 2)
            amplitude = self.amplitude_levels[amplitude_index]
            amplitude_sequence.append(amplitude)
        return amplitude_sequence
    def generate_modulated_signal(self, data, duration=1.0):
        """generate lora signal with amplitude modulation"""
        # encode data in amplitudes
        amplitude_sequence = self.encode_data_in_amplitude(data)
        # time vector
        sample_rate = self.bandwidth * 4  # oversample
        t = np.linspace(0, duration, int(sample_rate * duration))
        # generate base lora chirp
        chirp_signal = self.generate_lora_chirp(t)
        # apply amplitude modulation
        modulated_signal = np.zeros_like(chirp_signal)
        samples_per_symbol = len(t) // len(amplitude_sequence)
        for i, amplitude in enumerate(amplitude_sequence):
            start_idx = i * samples_per_symbol
            end_idx = min((i + 1) * samples_per_symbol, len(chirp_signal))
            if start_idx < len(chirp_signal):
                modulated_signal[start_idx:end_idx] = (
                    chirp_signal[start_idx:end_idx] * amplitude
                )
        return t, modulated_signal
    def generate_lora_chirp(self, t):
        """generate lora chirp signal"""
        # lora uses chirp spread spectrum
        # frequency sweeps from -bw/2 to +bw/2
        freq_start = -self.bandwidth / 2
        freq_end = self.bandwidth / 2
        # linear frequency modulation
        chirp = signal.chirp(t,
                           freq_start,
                           t[-1],
                           freq_end,
                           method='linear')
        return chirp
    def extract_amplitude_data(self, signal_samples, sample_rate):
        """extract covert data from amplitude variations"""
        # calculate envelope using hilbert transform
        analytic_signal = signal.hilbert(signal_samples)
        amplitude_envelope = np.abs(analytic_signal)
        # segment into symbols
        symbol_samples = int(sample_rate * self.symbol_duration)
        num_symbols = len(amplitude_envelope) // symbol_samples
        decoded_bits = ''
        for i in range(num_symbols):
            start_idx = i * symbol_samples
            end_idx = (i + 1) * symbol_samples
            # average amplitude for this symbol
            symbol_amplitude = np.mean(amplitude_envelope[start_idx:end_idx])
            # quantize to nearest amplitude level
            closest_level = min(range(len(self.amplitude_levels)),
                              key=lambda x: abs(self.amplitude_levels[x] - symbol_amplitude))
            # convert level index to 2-bit binary
            bits = format(closest_level, '02b')
            decoded_bits += bits
        return decoded_bits
    def simulate_covert_transmission(self, message):
        """simulate complete covert transmission"""
        print(f"encoding message: '{message}'")
        # convert to binary
        binary_data = ''.join(format(ord(c), '08b') for c in message)
        print(f"binary data: {binary_data} ({len(binary_data)} bits)")
        # generate modulated signal
        t, modulated_signal = self.generate_modulated_signal(binary_data, duration=2.0)
        # simulate transmission and reception
        # add noise
        noise_power = 0.01
        received_signal = modulated_signal + np.random.normal(0, noise_power, len(modulated_signal))
        # decode
        sample_rate = len(t) / (t[-1] - t[0])
        decoded_bits = self.extract_amplitude_data(received_signal, sample_rate)
        # convert back to text
        decoded_message = ''
        for i in range(0, len(decoded_bits), 8):
            byte_bits = decoded_bits[i:i+8]
            if len(byte_bits) == 8:
                decoded_message += chr(int(byte_bits, 2))
        print(f"decoded message: '{decoded_message}'")
        # calculate capacity
        transmission_time = len(t) / sample_rate
        capacity_bps = len(binary_data) / transmission_time
        print(f"transmission time: {transmission_time:.2f}s")
        print(f"capacity: {capacity_bps:.1f} bits/second")
        return t, modulated_signal, received_signal, decoded_message
# usage
channel = loracovertchannel()
t, tx_signal, rx_signal, decoded = channel.simulate_covert_transmission("secret")
lorawan payload covert channel
# lorawan application payload covert channel
import struct
import json
from cryptography.fernet import fernet
class lorawanpayloadcovert:
    def __init__(self):
        # lorawan parameters
        self.max_payload_size = 242  # bytes (sf7)
        self.encryption_key = fernet.generate_key()
        self.cipher = fernet(self.encryption_key)
        # legitimate sensor data templates
        self.sensor_templates = {
            'temperature': {'temp': 23.5, 'humidity': 45.2},
            'motion': {'detected': false, 'count': 0},
            'battery': {'voltage': 3.7, 'percentage': 85},
            'gps': {'lat': 40.7128, 'lon': -74.0060, 'alt': 10}
        }
    def create_legitimate_payload(self, sensor_type='temperature'):
        """create legitimate-looking sensor payload"""
        if sensor_type in self.sensor_templates:
            data = self.sensor_templates[sensor_type].copy()
            # add realistic variations
            import random
            if sensor_type == 'temperature':
                data['temp'] += random.uniform(-2, 2)
                data['humidity'] += random.uniform(-5, 5)
            elif sensor_type == 'battery':
                data['voltage'] += random.uniform(-0.1, 0.1)
                data['percentage'] = max(0, min(100, data['percentage'] + random.randint(-2, 2)))
            return json.dumps(data).encode()
        return b'{"status": "ok"}'
    def embed_covert_data(self, legitimate_payload, covert_data):
        """embed covert data in lorawan payload"""
        # method 1: steganography in json values
        if legitimate_payload.startswith(b'{'):
            return self.embed_in_json(legitimate_payload, covert_data)
        # method 2: binary payload with hidden data
        return self.embed_in_binary(legitimate_payload, covert_data)
    def embed_in_json(self, json_payload, covert_data):
        """hide data in json floating point precision"""
        try:
            data = json.loads(json_payload.decode())
            # encrypt covert data
            encrypted_covert = self.cipher.encrypt(covert_data.encode())
            # convert to integer for embedding
            covert_int = int.from_bytes(encrypted_covert[:8], byteorder='big')  # limit size
            # embed in floating point precision
            for key, value in data.items():
                if isinstance(value, float):
                    # encode in least significant digits
                    precision_factor = 10000
                    covert_part = (covert_int % precision_factor) / precision_factor
                    data[key] = int(value) + covert_part
                    break
            return json.dumps(data).encode()
        except exception as e:
            print(f"json embedding failed: {e}")
            return json_payload
    def embed_in_binary(self, binary_payload, covert_data):
        """embed covert data in binary payload"""
        # encrypt covert data
        encrypted_covert = self.cipher.encrypt(covert_data.encode())
        # append to payload with marker
        marker = b'\xaa\xbb'  # covert data marker
        combined = binary_payload + marker + encrypted_covert
        # ensure within lorawan size limits
        if len(combined) <= self.max_payload_size:
            return combined
        else:
            # truncate covert data if necessary
            max_covert_size = self.max_payload_size - len(binary_payload) - len(marker)
            truncated_covert = encrypted_covert[:max_covert_size]
            return binary_payload + marker + truncated_covert
    def extract_covert_data(self, payload):
        """extract covert data from lorawan payload"""
        # try json extraction first
        if payload.startswith(b'{'):
            return self.extract_from_json(payload)
        # try binary extraction
        return self.extract_from_binary(payload)
    def extract_from_json(self, json_payload):
        """extract covert data from json payload"""
        try:
            data = json.loads(json_payload.decode())
            # look for embedded precision data
            for key, value in data.items():
                if isinstance(value, float):
                    # extract fractional part
                    fractional = value - int(value)
                    covert_int = int(fractional * 10000)
                    if covert_int > 0:
                        # convert back to bytes
                        covert_bytes = covert_int.to_bytes(8, byteorder='big')
                        try:
                            # decrypt
                            decrypted = self.cipher.decrypt(covert_bytes)
                            return decrypted.decode()
                        except:
                            continue
        except exception as e:
            print(f"json extraction failed: {e}")
        return none
    def extract_from_binary(self, binary_payload):
        """extract covert data from binary payload"""
        marker = b'\xaa\xbb'
        marker_pos = binary_payload.find(marker)
        if marker_pos != -1:
            covert_data = binary_payload[marker_pos + len(marker):]
            try:
                decrypted = self.cipher.decrypt(covert_data)
                return decrypted.decode()
            except:
                pass
        return none
    def create_covert_lorawan_frame(self, device_addr, frame_count, covert_message):
        """create complete lorawan frame with covert data"""
        # create legitimate payload
        legit_payload = self.create_legitimate_payload('temperature')
        # embed covert data
        covert_payload = self.embed_covert_data(legit_payload, covert_message)
        # build lorawan mac frame
        mhdr = 0x40  # confirmed data up
        dev_addr = struct.pack('<i', device_addr)  # little endian
        fctrl = 0x00  # no mac commands
        fcnt = struct.pack('<h', frame_count)  # frame counter
        fport = 1     # application port
        # frame header
        fhdr = dev_addr + fctrl.to_bytes(1, 'big') + fcnt + bytes([fport])
        # complete mac payload
        mac_payload = fhdr + covert_payload
        # calculate mic (simplified - would use aes-cmac in real implementation)
        mic = self.calculate_mic(mac_payload, frame_count)
        # complete frame
        lorawan_frame = bytes([mhdr]) + mac_payload + mic
        return lorawan_frame
    def calculate_mic(self, payload, frame_count):
        """calculate message integrity code (simplified)"""
        # in real lorawan, this would be aes-cmac
        # here we use simple checksum for demonstration
        checksum = sum(payload + frame_count.to_bytes(2, 'little'))
        return (checksum & 0xffffffff).to_bytes(4, 'big')
# usage
covert = lorawanpayloadcovert()
frame = covert.create_covert_lorawan_frame(0x12345678, 42, "hidden message")
print(f"lorawan frame: {frame.hex()}")
timing-based covert channels
transmission interval manipulation
# lorawan timing covert channel
import time
import random
import statistics
class lorawantimingchannel:
    def __init__(self):
        # lorawan duty cycle limits (eu868)
        self.duty_cycle_limit = 0.01  # 1% duty cycle
        self.min_interval = 60  # minimum seconds between transmissions
        # timing encoding
        self.short_interval = 60   # represents binary 0
        self.long_interval = 120   # represents binary 1
        self.sync_interval = 300   # synchronization pattern
    def encode_timing_message(self, binary_data):
        """encode binary message in transmission timing"""
        transmission_schedule = []
        current_time = time.time()
        # synchronization burst
        for _ in range(3):
            transmission_schedule.append(current_time)
            current_time += self.sync_interval
        # encode data bits
        for bit in binary_data:
            interval = self.long_interval if bit == '1' else self.short_interval
            # add random jitter to avoid detection (±10%)
            jitter = interval * random.uniform(-0.1, 0.1)
            actual_interval = interval + jitter
            current_time += actual_interval
            transmission_schedule.append(current_time)
        return transmission_schedule
    def decode_timing_message(self, transmission_times):
        """decode binary message from transmission timing"""
        if len(transmission_times) < 4:
            return none
        # calculate intervals
        intervals = []
        for i in range(1, len(transmission_times)):
            interval = transmission_times[i] - transmission_times[i-1]
            intervals.append(interval)
        # detect synchronization pattern
        sync_count = 0
        data_start = 0
        for i, interval in enumerate(intervals):
            if abs(interval - self.sync_interval) < 30:  # tolerance
                sync_count += 1
            else:
                if sync_count >= 2:  # found sync pattern
                    data_start = i + 1
                    break
                sync_count = 0
        if data_start == 0:
            print("no synchronization pattern found")
            return none
        # decode data intervals
        data_intervals = intervals[data_start:]
        decoded_bits = ''
        for interval in data_intervals:
            if abs(interval - self.short_interval) < abs(interval - self.long_interval):
                decoded_bits += '0'
            else:
                decoded_bits += '1'
        return decoded_bits
    def simulate_lorawan_timing_channel(self, message):
        """simulate complete timing channel transmission"""
        print(f"encoding message: '{message}'")
        # convert to binary
        binary_data = ''.join(format(ord(c), '08b') for c in message)
        print(f"binary data: {binary_data}")
        # generate transmission schedule
        schedule = self.encode_timing_message(binary_data)
        print(f"transmission schedule ({len(schedule)} packets):")
        for i, tx_time in enumerate(schedule):
            print(f"  packet {i}: {time.ctime(tx_time)}")
        # simulate reception with timing jitter
        received_times = []
        for tx_time in schedule:
            # add reception timing error (±2 seconds)
            rx_time = tx_time + random.uniform(-2, 2)
            received_times.append(rx_time)
        # decode received timing
        decoded_bits = self.decode_timing_message(received_times)
        if decoded_bits:
            # convert back to text
            decoded_message = ''
            for i in range(0, len(decoded_bits), 8):
                byte_bits = decoded_bits[i:i+8]
                if len(byte_bits) == 8:
                    decoded_message += chr(int(byte_bits, 2))
            print(f"decoded message: '{decoded_message}'")
            # calculate channel capacity
            total_time = schedule[-1] - schedule[0]
            capacity = len(binary_data) / total_time
            print(f"channel capacity: {capacity:.4f} bits/second")
        return decoded_bits
# usage
timing_channel = lorawantimingchannel()
result = timing_channel.simulate_lorawan_timing_channel("hi")
detection methods
lorawan traffic analysis
# lorawan covert channel detection
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt
class lorawancovertdetector:
    def __init__(self):
        self.device_stats = defaultdict(lambda: {
            'transmissions': [],
            'payload_sizes': [],
            'intervals': [],
            'fports': [],
            'sf_usage': defaultdict(int),
            'first_seen': none,
            'last_seen': none
        })
    def analyze_lorawan_packet(self, timestamp, dev_addr, payload_size,
                              fport, spreading_factor, payload_data):
        """analyze individual lorawan packet"""
        stats = self.device_stats[dev_addr]
        # update timing statistics
        stats['transmissions'].append(timestamp)
        stats['payload_sizes'].append(payload_size)
        stats['fports'].append(fport)
        stats['sf_usage'][spreading_factor] += 1
        if stats['first_seen'] is none:
            stats['first_seen'] = timestamp
        stats['last_seen'] = timestamp
        # calculate intervals
        if len(stats['transmissions']) > 1:
            interval = timestamp - stats['transmissions'][-2]
            stats['intervals'].append(interval)
        # analyze payload for anomalies
        self.analyze_payload_anomalies(dev_addr, payload_data, fport)
        # check for timing anomalies
        if len(stats['intervals']) >= 10:
            self.check_timing_anomalies(dev_addr)
    def analyze_payload_anomalies(self, dev_addr, payload_data, fport):
        """analyze payload for covert channel indicators"""
        anomalies = []
        # check payload entropy
        if payload_data:
            entropy = self.calculate_entropy(payload_data)
            if entropy > 7.5:  # high entropy suggests encryption/encoding
                anomalies.append(f'high payload entropy: {entropy:.2f}')
        # check for binary markers
        if b'\xaa\xbb' in payload_data:
            anomalies.append('suspicious binary marker detected')
        # check json payload anomalies
        if payload_data.startswith(b'{'):
            json_anomalies = self.analyze_json_payload(payload_data)
            anomalies.extend(json_anomalies)
        # unusual fport usage
        if fport > 200:
            anomalies.append(f'unusual fport value: {fport}')
        if anomalies:
            print(f"payload anomalies from device {dev_addr:08x}:")
            for anomaly in anomalies:
                print(f"  {anomaly}")
    def analyze_json_payload(self, json_data):
        """analyze json payload for steganography"""
        anomalies = []
        try:
            import json
            data = json.loads(json_data.decode())
            # check for unusual precision in float values
            for key, value in data.items():
                if isinstance(value, float):
                    # check decimal places
                    decimal_str = str(value).split('.')[1] if '.' in str(value) else ''
                    if len(decimal_str) > 6:  # excessive precision
                        anomalies.append(f'excessive precision in {key}: {value}')
                    # check for non-random decimal patterns
                    if self.has_pattern_in_decimals(decimal_str):
                        anomalies.append(f'patterned decimals in {key}: {value}')
        except exception as e:
            anomalies.append(f'json parsing error: {e}')
        return anomalies
    def check_timing_anomalies(self, dev_addr):
        """check for timing-based covert channels"""
        stats = self.device_stats[dev_addr]
        intervals = stats['intervals'][-50:]  # recent intervals
        if len(intervals) < 10:
            return
        anomalies = []
        # statistical analysis
        mean_interval = statistics.mean(intervals)
        median_interval = statistics.median(intervals)
        stdev_interval = statistics.stdev(intervals)
        # coefficient of variation
        cv = stdev_interval / mean_interval if mean_interval > 0 else 0
        # check for bimodal distribution (timing channel signature)
        interval_counts = defaultdict(int)
        for interval in intervals:
            # bin intervals to nearest 30 seconds
            binned = round(interval / 30) * 30
            interval_counts[binned] += 1
        # look for two dominant intervals
        sorted_counts = sorted(interval_counts.items(), key=lambda x: x[1], reverse=true)
        if (len(sorted_counts) >= 2 and
            sorted_counts[0][1] + sorted_counts[1][1] > len(intervals) * 0.8):
            ratio = sorted_counts[0][0] / sorted_counts[1][0] if sorted_counts[1][0] > 0 else 0
            if 1.5 < ratio < 3.0:  # common encoding ratios
                anomalies.append(f'bimodal timing pattern: {sorted_counts[0][0]}s/{sorted_counts[1][0]}s')
        # check for regular patterns
        if cv < 0.2:  # very regular timing
            anomalies.append(f'unusually regular timing (cv={cv:.3f})')
        # duty cycle analysis
        total_time = stats['last_seen'] - stats['first_seen']
        if total_time > 0:
            transmission_rate = len(stats['transmissions']) / (total_time / 3600)  # per hour
            if transmission_rate > 60:  # more than 1 per minute average
                anomalies.append(f'high transmission rate: {transmission_rate:.1f}/hour')
        if anomalies:
            print(f"timing anomalies from device {dev_addr:08x}:")
            for anomaly in anomalies:
                print(f"  {anomaly}")
    def calculate_entropy(self, data):
        """calculate shannon entropy of data"""
        from collections import counter
        import math
        if not data:
            return 0
        counts = counter(data)
        probs = [count/len(data) for count in counts.values()]
        return -sum(p * math.log2(p) for p in probs if p > 0)
    def has_pattern_in_decimals(self, decimal_str):
        """check for patterns in decimal places"""
        if len(decimal_str) < 4:
            return false
        # check for repeated sequences
        for length in range(1, len(decimal_str) // 2 + 1):
            pattern = decimal_str[:length]
            repetitions = len(decimal_str) // length
            if pattern * repetitions == decimal_str[:len(pattern) * repetitions]:
                if repetitions > 2:  # pattern repeats more than twice
                    return true
        return false
    def generate_detection_report(self, dev_addr):
        """generate comprehensive detection report for device"""
        stats = self.device_stats[dev_addr]
        if not stats['transmissions']:
            return none
        report = {
            'device_address': f'{dev_addr:08x}',
            'observation_period': {
                'start': stats['first_seen'],
                'end': stats['last_seen'],
                'duration_hours': (stats['last_seen'] - stats['first_seen']) / 3600
            },
            'transmission_stats': {
                'total_packets': len(stats['transmissions']),
                'avg_interval': statistics.mean(stats['intervals']) if stats['intervals'] else 0,
                'median_interval': statistics.median(stats['intervals']) if stats['intervals'] else 0,
                'interval_stdev': statistics.stdev(stats['intervals']) if len(stats['intervals']) > 1 else 0
            },
            'payload_stats': {
                'avg_size': statistics.mean(stats['payload_sizes']) if stats['payload_sizes'] else 0,
                'size_variation': statistics.stdev(stats['payload_sizes']) if len(stats['payload_sizes']) > 1 else 0,
                'unique_fports': len(set(stats['fports']))
            },
            'spreading_factor_usage': dict(stats['sf_usage'])
        }
        # calculate anomaly score
        anomaly_score = 0
        # timing regularity
        if report['transmission_stats']['interval_stdev'] > 0:
            cv = (report['transmission_stats']['interval_stdev'] /
                 report['transmission_stats']['avg_interval'])
            if cv < 0.2:
                anomaly_score += 2
        # high transmission rate
        if report['observation_period']['duration_hours'] > 0:
            rate = (report['transmission_stats']['total_packets'] /
                   report['observation_period']['duration_hours'])
            if rate > 10:  # more than 10 per hour
                anomaly_score += 1
        # payload size consistency
        if report['payload_stats']['size_variation'] < 5:  # very consistent sizes
            anomaly_score += 1
        report['anomaly_score'] = anomaly_score
        report['risk_level'] = 'high' if anomaly_score >= 3 else 'medium' if anomaly_score >= 2 else 'low'
        return report
# usage
detector = lorawancovertdetector()
# detector.analyze_lorawan_packet(time.time(), 0x12345678, 25, 1, 7, b'{"temp": 23.5}')
rf analysis for electromagnetic channels
# lorawan rf analysis for cloaklora-style channels
import numpy as np
from scipy import signal, fft
import matplotlib.pyplot as plt
class lorawanrfanalyzer:
    def __init__(self):
        self.sample_rate = 1000000  # 1 mhz
        self.lora_bandwidth = 125000  # 125 khz
        self.center_freq = 868100000  # 868.1 mhz
    def analyze_amplitude_modulation(self, rf_samples):
        """analyze rf samples for amplitude-based covert channels"""
        # extract amplitude envelope
        analytic_signal = signal.hilbert(rf_samples)
        amplitude_envelope = np.abs(analytic_signal)
        # detect amplitude variations
        amplitude_variation = np.std(amplitude_envelope)
        mean_amplitude = np.mean(amplitude_envelope)
        cv_amplitude = amplitude_variation / mean_amplitude
        # look for periodic amplitude patterns
        amplitude_fft = np.fft.fft(amplitude_envelope)
        amplitude_freqs = np.fft.fftfreq(len(amplitude_envelope), 1/self.sample_rate)
        # find dominant frequencies in amplitude variations
        amplitude_power = np.abs(amplitude_fft)**2
        peak_indices = signal.find_peaks(amplitude_power, height=np.max(amplitude_power)*0.1)[0]
        dominant_freqs = amplitude_freqs[peak_indices]
        analysis = {
            'amplitude_variation': amplitude_variation,
            'coefficient_of_variation': cv_amplitude,
            'dominant_frequencies': dominant_freqs[:5],  # top 5
            'covert_channel_likely': cv_amplitude > 0.1  # threshold for detection
        }
        return analysis
    def detect_timing_patterns(self, packet_timestamps):
        """detect covert timing patterns in packet transmissions"""
        if len(packet_timestamps) < 10:
            return {'insufficient_data': true}
        # calculate inter-packet intervals
        intervals = np.diff(packet_timestamps)
        # statistical analysis
        mean_interval = np.mean(intervals)
        std_interval = np.std(intervals)
        cv_interval = std_interval / mean_interval if mean_interval > 0 else 0
        # histogram analysis for bimodal detection
        hist, bin_edges = np.histogram(intervals, bins=20)
        # find peaks in histogram
        peaks, _ = signal.find_peaks(hist, height=np.max(hist)*0.3)
        # bimodal detection
        is_bimodal = len(peaks) >= 2
        if is_bimodal:
            # calculate ratio between dominant intervals
            peak_positions = bin_edges[peaks]
            if len(peak_positions) >= 2:
                interval_ratio = peak_positions[0] / peak_positions[1]
            else:
                interval_ratio = 1
        else:
            interval_ratio = 1
        analysis = {
            'mean_interval': mean_interval,
            'interval_cv': cv_interval,
            'is_bimodal': is_bimodal,
            'interval_ratio': interval_ratio,
            'num_peaks': len(peaks),
            'covert_timing_likely': is_bimodal and 1.5 < interval_ratio < 3.0
        }
        return analysis
    def analyze_lorawan_spectrum(self, rf_samples):
        """analyze spectrum for lorawan signal anomalies"""
        # compute spectrogram
        frequencies, times, spectrogram = signal.spectrogram(
            rf_samples,
            fs=self.sample_rate,
            window='hann',
            nperseg=1024,
            noverlap=512
        )
        # focus on lorawan bandwidth
        lora_freq_mask = (frequencies >= -self.lora_bandwidth/2) & (frequencies <= self.lora_bandwidth/2)
        lora_spectrum = spectrogram[lora_freq_mask, :]
        # analyze spectral features
        spectral_variation = np.std(lora_spectrum, axis=1)
        spectral_peaks = []
        for freq_bin in range(len(spectral_variation)):
            if spectral_variation[freq_bin] > np.mean(spectral_variation) + 2*np.std(spectral_variation):
                spectral_peaks.append(frequencies[lora_freq_mask][freq_bin])
        # chirp analysis
        chirp_linearity = self.analyze_chirp_linearity(lora_spectrum, times)
        analysis = {
            'spectral_peaks': spectral_peaks,
            'chirp_linearity': chirp_linearity,
            'spectrum_anomaly': len(spectral_peaks) > 5  # threshold
        }
        return analysis
    def analyze_chirp_linearity(self, spectrum, times):
        """analyze lorawan chirp linearity for modulation detection"""
        # track instantaneous frequency across time
        instantaneous_freqs = []
        for time_bin in range(spectrum.shape[1]):
            # find frequency with maximum power at this time
            max_power_idx = np.argmax(spectrum[:, time_bin])
            instantaneous_freqs.append(max_power_idx)
        # calculate linearity of frequency sweep
        if len(instantaneous_freqs) > 1:
            # fit linear regression to frequency vs time
            coefficients = np.polyfit(range(len(instantaneous_freqs)), instantaneous_freqs, 1)
            residuals = instantaneous_freqs - np.polyval(coefficients, range(len(instantaneous_freqs)))
            linearity_error = np.std(residuals)
            return {
                'linearity_error': linearity_error,
                'is_linear': linearity_error < 5,  # threshold
                'slope': coefficients[0]
            }
        return {'insufficient_data': true}
    def comprehensive_covert_analysis(self, rf_samples, packet_timestamps):
        """comprehensive analysis for all lorawan covert channel types"""
        results = {
            'timestamp': time.time(),
            'sample_count': len(rf_samples),
            'packet_count': len(packet_timestamps)
        }
        # amplitude modulation analysis
        amplitude_analysis = self.analyze_amplitude_modulation(rf_samples)
        results['amplitude_covert'] = amplitude_analysis
        # timing analysis
        timing_analysis = self.detect_timing_patterns(packet_timestamps)
        results['timing_covert'] = timing_analysis
        # spectrum analysis
        spectrum_analysis = self.analyze_lorawan_spectrum(rf_samples)
        results['spectrum_covert'] = spectrum_analysis
        # overall risk assessment
        risk_factors = 0
        if amplitude_analysis.get('covert_channel_likely', false):
            risk_factors += 2
        if timing_analysis.get('covert_timing_likely', false):
            risk_factors += 2
        if spectrum_analysis.get('spectrum_anomaly', false):
            risk_factors += 1
        results['overall_risk'] = 'high' if risk_factors >= 3 else 'medium' if risk_factors >= 2 else 'low'
        results['risk_score'] = risk_factors
        return results
# usage
# analyzer = lorawanrfanalyzer()
# analysis = analyzer.comprehensive_covert_analysis(rf_data, packet_times)
countermeasures
network-level monitoring
# lorawan network monitoring setup
# install chirpstack for lorawan network analysis
docker-compose up chirpstack-network-server chirpstack-application-server
# monitor lorawan traffic
tcpdump -i any -w lorawan_traffic.pcap 'port 1700'
# analyze packet timing
tshark -r lorawan_traffic.pcap -t fields -e frame.time_epoch -e data.data \
  | awk '{print $1}' | sort -n | awk 'nr>1 {print $1-prev} {prev=$1}'
payload filtering
# lorawan payload filtering and normalization
import json
import re
from cryptography.fernet import fernet
class lorawanpayloadfilter:
    def __init__(self):
        self.max_payload_size = 51  # restrict to sf12 size for security
        self.allowed_fports = [1, 2, 10, 20]  # whitelist approach
        self.entropy_threshold = 6.0
        # payload sanitization rules
        self.sanitization_rules = {
            'max_decimal_places': 2,
            'allowed_json_keys': ['temp', 'humidity', 'battery', 'lat', 'lon'],
            'max_string_length': 20
        }
    def filter_lorawan_packet(self, dev_addr, fport, payload_data):
        """filter and sanitize lorawan packets"""
        # check fport whitelist
        if fport not in self.allowed_fports:
            print(f"blocked packet from {dev_addr:08x}: invalid fport {fport}")
            return none
        # check payload size
        if len(payload_data) > self.max_payload_size:
            print(f"blocked packet from {dev_addr:08x}: oversized payload ({len(payload_data)} bytes)")
            return none
        # check payload entropy
        entropy = self.calculate_entropy(payload_data)
        if entropy > self.entropy_threshold:
            print(f"blocked packet from {dev_addr:08x}: high entropy payload ({entropy:.2f})")
            return none
        # sanitize payload content
        sanitized_payload = self.sanitize_payload(payload_data)
        if sanitized_payload != payload_data:
            print(f"sanitized payload from {dev_addr:08x}")
        return sanitized_payload
    def sanitize_payload(self, payload_data):
        """sanitize payload content"""
        # try json sanitization
        if payload_data.startswith(b'{'):
            return self.sanitize_json_payload(payload_data)
        # binary payload sanitization
        return self.sanitize_binary_payload(payload_data)
    def sanitize_json_payload(self, json_data):
        """sanitize json payload"""
        try:
            data = json.loads(json_data.decode())
            sanitized_data = {}
            for key, value in data.items():
                # check key whitelist
                if key not in self.sanitization_rules['allowed_json_keys']:
                    continue
                # sanitize values
                if isinstance(value, float):
                    # limit decimal places
                    value = round(value, self.sanitization_rules['max_decimal_places'])
                elif isinstance(value, str):
                    # limit string length
                    value = value[:self.sanitization_rules['max_string_length']]
                elif isinstance(value, int):
                    # keep integers as-is but limit range
                    value = max(-1000000, min(1000000, value))
                else:
                    # remove unsupported types
                    continue
                sanitized_data[key] = value
            return json.dumps(sanitized_data).encode()
        except exception as e:
            print(f"json sanitization failed: {e}")
            return b'{"error": "invalid_json"}'
    def sanitize_binary_payload(self, binary_data):
        """sanitize binary payload"""
        # remove potential covert markers
        sanitized = binary_data.replace(b'\xaa\xbb', b'')
        sanitized = sanitized.replace(b'\xcc\xdd', b'')
        sanitized = sanitized.replace(b'\xee\xff', b'')
        # limit to first 32 bytes for safety
        return sanitized[:32]
    def calculate_entropy(self, data):
        """calculate shannon entropy"""
        from collections import counter
        import math
        if not data:
            return 0
        counts = counter(data)
        probs = [count/len(data) for count in counts.values()]
        return -sum(p * math.log2(p) for p in probs if p > 0)
# lorawan traffic shaping
class lorawantrafficshaper:
    def __init__(self):
        # rate limiting per device
        self.device_limits = {
            'packets_per_hour': 10,
            'bytes_per_hour': 1000,
            'burst_limit': 3  # max packets in 5 minutes
        }
        self.device_usage = defaultdict(lambda: {
            'packets_this_hour': 0,
            'bytes_this_hour': 0,
            'last_hour_reset': time.time(),
            'recent_packets': []
        })
    def should_allow_packet(self, dev_addr, payload_size):
        """check if packet should be allowed based on rate limits"""
        current_time = time.time()
        usage = self.device_usage[dev_addr]
        # reset hourly counters
        if current_time - usage['last_hour_reset'] >= 3600:
            usage['packets_this_hour'] = 0
            usage['bytes_this_hour'] = 0
            usage['last_hour_reset'] = current_time
        # check hourly limits
        if usage['packets_this_hour'] >= self.device_limits['packets_per_hour']:
            print(f"rate limit exceeded for {dev_addr:08x}: packets per hour")
            return false
        if usage['bytes_this_hour'] + payload_size > self.device_limits['bytes_per_hour']:
            print(f"rate limit exceeded for {dev_addr:08x}: bytes per hour")
            return false
        # check burst limits
        recent_packets = [t for t in usage['recent_packets'] if current_time - t < 300]  # last 5 minutes
        if len(recent_packets) >= self.device_limits['burst_limit']:
            print(f"burst limit exceeded for {dev_addr:08x}")
            return false
        # update usage
        usage['packets_this_hour'] += 1
        usage['bytes_this_hour'] += payload_size
        usage['recent_packets'] = recent_packets + [current_time]
        return true
# usage
payload_filter = lorawanpayloadfilter()
traffic_shaper = lorawantrafficshaper()
# sanitized = payload_filter.filter_lorawan_packet(0x12345678, 1, b'{"temp": 23.456789}')
# allowed = traffic_shaper.should_allow_packet(0x12345678, len(sanitized) if sanitized else 0)
advantages and limitations
advantages
- low power consumption: suitable for battery-operated devices
- long range communication: coverage up to several kilometers
- infrequent transmission: natural timing variability provides cover
- limited monitoring: lorawan networks often have minimal security monitoring
- multiple encoding vectors: payload, timing, fport, rf modulation
limitations
- very low bandwidth: typical duty cycle limits severely restrict data rates
- range limitations: covert rf techniques require proximity to receiver
- packet loss: long-range transmission subject to interference and fading
- duty cycle restrictions: regulatory limits on transmission frequency
- limited deployment: lorawan infrastructure not universally available
performance characteristics
capacity analysis
| technique | capacity | range | stealth level | detection difficulty | 
|---|---|---|---|---|
| payload embedding | 50-200 bytes/packet | network range | medium | medium | 
| timing channels | 1-2 bits/packet | network range | high | high | 
| fport encoding | 8 bits/packet | network range | low | low | 
| rf amplitude mod | ~38 bits/packet | 250m | high | very high | 
| combined approach | 250+ bits/packet | variable | medium | medium | 
duty cycle impact
# lorawan duty cycle analysis
def analyze_duty_cycle_impact():
    """analyze how duty cycle affects covert channel capacity"""
    # eu868 band duty cycle limits
    duty_cycles = {
        'g1_bands': 0.01,   # 1% - most restrictive
        'g2_bands': 0.001,  # 0.1% - very restrictive
        'g3_bands': 0.1     # 10% - less restrictive
    }
    # typical lorawan parameters
    packet_duration = 0.5  # 500ms for sf12
    packets_per_day = {}
    for band, duty_cycle in duty_cycles.items():
        # calculate max packets per day
        seconds_per_day = 86400
        max_air_time = seconds_per_day * duty_cycle
        max_packets = int(max_air_time / packet_duration)
        packets_per_day[band] = max_packets
        print(f"{band}: {duty_cycle*100}% duty cycle = {max_packets} packets/day")
        # covert channel capacity
        if max_packets > 0:
            # assume 50 bytes covert data per packet
            covert_bytes_per_day = max_packets * 50
            covert_bits_per_second = (covert_bytes_per_day * 8) / seconds_per_day
            print(f"  covert capacity: {covert_bytes_per_day} bytes/day ({covert_bits_per_second:.4f} bps)")
    return packets_per_day
# analyze_duty_cycle_impact()
real-world applications
iot surveillance
- long-term monitoring with minimal detection risk
- coordination between distributed sensor networks
- backup communication for compromised primary channels
industrial espionage
- exfiltration from smart factories and industrial iot
- monitoring production data and operational parameters
- maintaining persistent access in air-gapped facilities
research applications
- lorawan security assessment
- covert channel capacity studies
- electromagnetic emanation analysis
references
- lorawan 1.0.4 specification - lora alliance
- rp002-1.0.3 lorawan regional parameters - lora alliance
- “cloaklora: a covert channel over lorawan phy” - academic research
- “security analysis of lorawan networks” - iot security studies
- chirpstack lorawan network server documentation