lorawan covert channels

published: August 12, 2025

lorawan covert channels leverage frame structure exploitation and timing-based techniques in lpwan environments, with cloaklora achieving ~38 bits per packet at 250m range using amplitude modulation.

technical description

lorawan (long range wide area network) is a protocol designed for iot devices requiring long-range, low-power communication. the protocol’s flexibility in payload structure and infrequent transmission patterns make it suitable for covert channels.

lorawan covert channel vectors:

  1. frame payload manipulation: embedding data in application payloads
  2. fport field exploitation: using frame port numbers for encoding
  3. timing-based channels: exploiting transmission intervals
  4. mac command abuse: hiding data in mac layer commands
  5. electromagnetic emanations: side-channel communication via rf patterns

lorawan frame structure

physical layer frame

preamble (8 symbols) | sync word (2 symbols) | phy payload | crc (2 bytes)

mac frame structure

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|mtype|rfu|major|                  devaddr (4 bytes)           |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|     fcnt (2)  |  fctrl(1)     |    fport(1)   |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                 frm payload (0-n bytes)                       |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                        mic (4 bytes)                          |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

covert channel opportunities

fieldsizecovert potentialdetection difficulty
fport1 bytemedium - limited valueslow
frm payload0-242 byteshigh - application datamedium
fcnt2 byteslow - must incrementhigh
fopts0-15 byteshigh - mac commandshigh
timingvariablehigh - transmission intervalsvery high

implementation: cloaklora research

overview

cloaklora demonstrates lorawan electromagnetic covert channels:

  • capacity: ~38 bits per typical lora packet
  • range: 250m using amplitude modulation
  • technique: rf emanation manipulation
  • stealth: high - appears as normal lora transmission variations

amplitude modulation technique

# cloaklora-inspired amplitude modulation simulation
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal

class loracovertchannel:
    def __init__(self):
        # lora parameters
        self.bandwidth = 125000  # 125 khz
        self.spreading_factor = 7  # sf7
        self.coding_rate = 1  # 4/5
        self.carrier_freq = 868100000  # 868.1 mhz (eu band)

        # covert channel parameters
        self.amplitude_levels = [0.5, 0.7, 0.9, 1.0]  # 4 levels = 2 bits per symbol
        self.symbol_duration = self.calculate_symbol_duration()

    def calculate_symbol_duration(self):
        """calculate lora symbol duration"""
        return (2 ** self.spreading_factor) / self.bandwidth

    def encode_data_in_amplitude(self, binary_data):
        """encode binary data using amplitude modulation"""

        # split binary data into 2-bit chunks
        amplitude_sequence = []

        for i in range(0, len(binary_data), 2):
            chunk = binary_data[i:i+2].ljust(2, '0')  # pad if needed
            amplitude_index = int(chunk, 2)
            amplitude = self.amplitude_levels[amplitude_index]
            amplitude_sequence.append(amplitude)

        return amplitude_sequence

    def generate_modulated_signal(self, data, duration=1.0):
        """generate lora signal with amplitude modulation"""

        # encode data in amplitudes
        amplitude_sequence = self.encode_data_in_amplitude(data)

        # time vector
        sample_rate = self.bandwidth * 4  # oversample
        t = np.linspace(0, duration, int(sample_rate * duration))

        # generate base lora chirp
        chirp_signal = self.generate_lora_chirp(t)

        # apply amplitude modulation
        modulated_signal = np.zeros_like(chirp_signal)
        samples_per_symbol = len(t) // len(amplitude_sequence)

        for i, amplitude in enumerate(amplitude_sequence):
            start_idx = i * samples_per_symbol
            end_idx = min((i + 1) * samples_per_symbol, len(chirp_signal))

            if start_idx < len(chirp_signal):
                modulated_signal[start_idx:end_idx] = (
                    chirp_signal[start_idx:end_idx] * amplitude
                )

        return t, modulated_signal

    def generate_lora_chirp(self, t):
        """generate lora chirp signal"""

        # lora uses chirp spread spectrum
        # frequency sweeps from -bw/2 to +bw/2
        freq_start = -self.bandwidth / 2
        freq_end = self.bandwidth / 2

        # linear frequency modulation
        chirp = signal.chirp(t,
                           freq_start,
                           t[-1],
                           freq_end,
                           method='linear')

        return chirp

    def extract_amplitude_data(self, signal_samples, sample_rate):
        """extract covert data from amplitude variations"""

        # calculate envelope using hilbert transform
        analytic_signal = signal.hilbert(signal_samples)
        amplitude_envelope = np.abs(analytic_signal)

        # segment into symbols
        symbol_samples = int(sample_rate * self.symbol_duration)
        num_symbols = len(amplitude_envelope) // symbol_samples

        decoded_bits = ''

        for i in range(num_symbols):
            start_idx = i * symbol_samples
            end_idx = (i + 1) * symbol_samples

            # average amplitude for this symbol
            symbol_amplitude = np.mean(amplitude_envelope[start_idx:end_idx])

            # quantize to nearest amplitude level
            closest_level = min(range(len(self.amplitude_levels)),
                              key=lambda x: abs(self.amplitude_levels[x] - symbol_amplitude))

            # convert level index to 2-bit binary
            bits = format(closest_level, '02b')
            decoded_bits += bits

        return decoded_bits

    def simulate_covert_transmission(self, message):
        """simulate complete covert transmission"""

        print(f"encoding message: '{message}'")

        # convert to binary
        binary_data = ''.join(format(ord(c), '08b') for c in message)
        print(f"binary data: {binary_data} ({len(binary_data)} bits)")

        # generate modulated signal
        t, modulated_signal = self.generate_modulated_signal(binary_data, duration=2.0)

        # simulate transmission and reception
        # add noise
        noise_power = 0.01
        received_signal = modulated_signal + np.random.normal(0, noise_power, len(modulated_signal))

        # decode
        sample_rate = len(t) / (t[-1] - t[0])
        decoded_bits = self.extract_amplitude_data(received_signal, sample_rate)

        # convert back to text
        decoded_message = ''
        for i in range(0, len(decoded_bits), 8):
            byte_bits = decoded_bits[i:i+8]
            if len(byte_bits) == 8:
                decoded_message += chr(int(byte_bits, 2))

        print(f"decoded message: '{decoded_message}'")

        # calculate capacity
        transmission_time = len(t) / sample_rate
        capacity_bps = len(binary_data) / transmission_time

        print(f"transmission time: {transmission_time:.2f}s")
        print(f"capacity: {capacity_bps:.1f} bits/second")

        return t, modulated_signal, received_signal, decoded_message

# usage
channel = loracovertchannel()
t, tx_signal, rx_signal, decoded = channel.simulate_covert_transmission("secret")

lorawan payload covert channel

# lorawan application payload covert channel
import struct
import json
from cryptography.fernet import fernet

class lorawanpayloadcovert:
    def __init__(self):
        # lorawan parameters
        self.max_payload_size = 242  # bytes (sf7)
        self.encryption_key = fernet.generate_key()
        self.cipher = fernet(self.encryption_key)

        # legitimate sensor data templates
        self.sensor_templates = {
            'temperature': {'temp': 23.5, 'humidity': 45.2},
            'motion': {'detected': false, 'count': 0},
            'battery': {'voltage': 3.7, 'percentage': 85},
            'gps': {'lat': 40.7128, 'lon': -74.0060, 'alt': 10}
        }

    def create_legitimate_payload(self, sensor_type='temperature'):
        """create legitimate-looking sensor payload"""

        if sensor_type in self.sensor_templates:
            data = self.sensor_templates[sensor_type].copy()

            # add realistic variations
            import random
            if sensor_type == 'temperature':
                data['temp'] += random.uniform(-2, 2)
                data['humidity'] += random.uniform(-5, 5)
            elif sensor_type == 'battery':
                data['voltage'] += random.uniform(-0.1, 0.1)
                data['percentage'] = max(0, min(100, data['percentage'] + random.randint(-2, 2)))

            return json.dumps(data).encode()

        return b'{"status": "ok"}'

    def embed_covert_data(self, legitimate_payload, covert_data):
        """embed covert data in lorawan payload"""

        # method 1: steganography in json values
        if legitimate_payload.startswith(b'{'):
            return self.embed_in_json(legitimate_payload, covert_data)

        # method 2: binary payload with hidden data
        return self.embed_in_binary(legitimate_payload, covert_data)

    def embed_in_json(self, json_payload, covert_data):
        """hide data in json floating point precision"""

        try:
            data = json.loads(json_payload.decode())

            # encrypt covert data
            encrypted_covert = self.cipher.encrypt(covert_data.encode())

            # convert to integer for embedding
            covert_int = int.from_bytes(encrypted_covert[:8], byteorder='big')  # limit size

            # embed in floating point precision
            for key, value in data.items():
                if isinstance(value, float):
                    # encode in least significant digits
                    precision_factor = 10000
                    covert_part = (covert_int % precision_factor) / precision_factor
                    data[key] = int(value) + covert_part
                    break

            return json.dumps(data).encode()

        except exception as e:
            print(f"json embedding failed: {e}")
            return json_payload

    def embed_in_binary(self, binary_payload, covert_data):
        """embed covert data in binary payload"""

        # encrypt covert data
        encrypted_covert = self.cipher.encrypt(covert_data.encode())

        # append to payload with marker
        marker = b'\xaa\xbb'  # covert data marker
        combined = binary_payload + marker + encrypted_covert

        # ensure within lorawan size limits
        if len(combined) <= self.max_payload_size:
            return combined
        else:
            # truncate covert data if necessary
            max_covert_size = self.max_payload_size - len(binary_payload) - len(marker)
            truncated_covert = encrypted_covert[:max_covert_size]
            return binary_payload + marker + truncated_covert

    def extract_covert_data(self, payload):
        """extract covert data from lorawan payload"""

        # try json extraction first
        if payload.startswith(b'{'):
            return self.extract_from_json(payload)

        # try binary extraction
        return self.extract_from_binary(payload)

    def extract_from_json(self, json_payload):
        """extract covert data from json payload"""

        try:
            data = json.loads(json_payload.decode())

            # look for embedded precision data
            for key, value in data.items():
                if isinstance(value, float):
                    # extract fractional part
                    fractional = value - int(value)
                    covert_int = int(fractional * 10000)

                    if covert_int > 0:
                        # convert back to bytes
                        covert_bytes = covert_int.to_bytes(8, byteorder='big')

                        try:
                            # decrypt
                            decrypted = self.cipher.decrypt(covert_bytes)
                            return decrypted.decode()
                        except:
                            continue

        except exception as e:
            print(f"json extraction failed: {e}")

        return none

    def extract_from_binary(self, binary_payload):
        """extract covert data from binary payload"""

        marker = b'\xaa\xbb'
        marker_pos = binary_payload.find(marker)

        if marker_pos != -1:
            covert_data = binary_payload[marker_pos + len(marker):]

            try:
                decrypted = self.cipher.decrypt(covert_data)
                return decrypted.decode()
            except:
                pass

        return none

    def create_covert_lorawan_frame(self, device_addr, frame_count, covert_message):
        """create complete lorawan frame with covert data"""

        # create legitimate payload
        legit_payload = self.create_legitimate_payload('temperature')

        # embed covert data
        covert_payload = self.embed_covert_data(legit_payload, covert_message)

        # build lorawan mac frame
        mhdr = 0x40  # confirmed data up
        dev_addr = struct.pack('<i', device_addr)  # little endian
        fctrl = 0x00  # no mac commands
        fcnt = struct.pack('<h', frame_count)  # frame counter
        fport = 1     # application port

        # frame header
        fhdr = dev_addr + fctrl.to_bytes(1, 'big') + fcnt + bytes([fport])

        # complete mac payload
        mac_payload = fhdr + covert_payload

        # calculate mic (simplified - would use aes-cmac in real implementation)
        mic = self.calculate_mic(mac_payload, frame_count)

        # complete frame
        lorawan_frame = bytes([mhdr]) + mac_payload + mic

        return lorawan_frame

    def calculate_mic(self, payload, frame_count):
        """calculate message integrity code (simplified)"""

        # in real lorawan, this would be aes-cmac
        # here we use simple checksum for demonstration
        checksum = sum(payload + frame_count.to_bytes(2, 'little'))
        return (checksum & 0xffffffff).to_bytes(4, 'big')

# usage
covert = lorawanpayloadcovert()
frame = covert.create_covert_lorawan_frame(0x12345678, 42, "hidden message")
print(f"lorawan frame: {frame.hex()}")

timing-based covert channels

transmission interval manipulation

# lorawan timing covert channel
import time
import random
import statistics

class lorawantimingchannel:
    def __init__(self):
        # lorawan duty cycle limits (eu868)
        self.duty_cycle_limit = 0.01  # 1% duty cycle
        self.min_interval = 60  # minimum seconds between transmissions

        # timing encoding
        self.short_interval = 60   # represents binary 0
        self.long_interval = 120   # represents binary 1
        self.sync_interval = 300   # synchronization pattern

    def encode_timing_message(self, binary_data):
        """encode binary message in transmission timing"""

        transmission_schedule = []
        current_time = time.time()

        # synchronization burst
        for _ in range(3):
            transmission_schedule.append(current_time)
            current_time += self.sync_interval

        # encode data bits
        for bit in binary_data:
            interval = self.long_interval if bit == '1' else self.short_interval

            # add random jitter to avoid detection (±10%)
            jitter = interval * random.uniform(-0.1, 0.1)
            actual_interval = interval + jitter

            current_time += actual_interval
            transmission_schedule.append(current_time)

        return transmission_schedule

    def decode_timing_message(self, transmission_times):
        """decode binary message from transmission timing"""

        if len(transmission_times) < 4:
            return none

        # calculate intervals
        intervals = []
        for i in range(1, len(transmission_times)):
            interval = transmission_times[i] - transmission_times[i-1]
            intervals.append(interval)

        # detect synchronization pattern
        sync_count = 0
        data_start = 0

        for i, interval in enumerate(intervals):
            if abs(interval - self.sync_interval) < 30:  # tolerance
                sync_count += 1
            else:
                if sync_count >= 2:  # found sync pattern
                    data_start = i + 1
                    break
                sync_count = 0

        if data_start == 0:
            print("no synchronization pattern found")
            return none

        # decode data intervals
        data_intervals = intervals[data_start:]
        decoded_bits = ''

        for interval in data_intervals:
            if abs(interval - self.short_interval) < abs(interval - self.long_interval):
                decoded_bits += '0'
            else:
                decoded_bits += '1'

        return decoded_bits

    def simulate_lorawan_timing_channel(self, message):
        """simulate complete timing channel transmission"""

        print(f"encoding message: '{message}'")

        # convert to binary
        binary_data = ''.join(format(ord(c), '08b') for c in message)
        print(f"binary data: {binary_data}")

        # generate transmission schedule
        schedule = self.encode_timing_message(binary_data)

        print(f"transmission schedule ({len(schedule)} packets):")
        for i, tx_time in enumerate(schedule):
            print(f"  packet {i}: {time.ctime(tx_time)}")

        # simulate reception with timing jitter
        received_times = []
        for tx_time in schedule:
            # add reception timing error (±2 seconds)
            rx_time = tx_time + random.uniform(-2, 2)
            received_times.append(rx_time)

        # decode received timing
        decoded_bits = self.decode_timing_message(received_times)

        if decoded_bits:
            # convert back to text
            decoded_message = ''
            for i in range(0, len(decoded_bits), 8):
                byte_bits = decoded_bits[i:i+8]
                if len(byte_bits) == 8:
                    decoded_message += chr(int(byte_bits, 2))

            print(f"decoded message: '{decoded_message}'")

            # calculate channel capacity
            total_time = schedule[-1] - schedule[0]
            capacity = len(binary_data) / total_time
            print(f"channel capacity: {capacity:.4f} bits/second")

        return decoded_bits

# usage
timing_channel = lorawantimingchannel()
result = timing_channel.simulate_lorawan_timing_channel("hi")

detection methods

lorawan traffic analysis

# lorawan covert channel detection
import numpy as np
from collections import defaultdict
import matplotlib.pyplot as plt

class lorawancovertdetector:
    def __init__(self):
        self.device_stats = defaultdict(lambda: {
            'transmissions': [],
            'payload_sizes': [],
            'intervals': [],
            'fports': [],
            'sf_usage': defaultdict(int),
            'first_seen': none,
            'last_seen': none
        })

    def analyze_lorawan_packet(self, timestamp, dev_addr, payload_size,
                              fport, spreading_factor, payload_data):
        """analyze individual lorawan packet"""

        stats = self.device_stats[dev_addr]

        # update timing statistics
        stats['transmissions'].append(timestamp)
        stats['payload_sizes'].append(payload_size)
        stats['fports'].append(fport)
        stats['sf_usage'][spreading_factor] += 1

        if stats['first_seen'] is none:
            stats['first_seen'] = timestamp
        stats['last_seen'] = timestamp

        # calculate intervals
        if len(stats['transmissions']) > 1:
            interval = timestamp - stats['transmissions'][-2]
            stats['intervals'].append(interval)

        # analyze payload for anomalies
        self.analyze_payload_anomalies(dev_addr, payload_data, fport)

        # check for timing anomalies
        if len(stats['intervals']) >= 10:
            self.check_timing_anomalies(dev_addr)

    def analyze_payload_anomalies(self, dev_addr, payload_data, fport):
        """analyze payload for covert channel indicators"""

        anomalies = []

        # check payload entropy
        if payload_data:
            entropy = self.calculate_entropy(payload_data)
            if entropy > 7.5:  # high entropy suggests encryption/encoding
                anomalies.append(f'high payload entropy: {entropy:.2f}')

        # check for binary markers
        if b'\xaa\xbb' in payload_data:
            anomalies.append('suspicious binary marker detected')

        # check json payload anomalies
        if payload_data.startswith(b'{'):
            json_anomalies = self.analyze_json_payload(payload_data)
            anomalies.extend(json_anomalies)

        # unusual fport usage
        if fport > 200:
            anomalies.append(f'unusual fport value: {fport}')

        if anomalies:
            print(f"payload anomalies from device {dev_addr:08x}:")
            for anomaly in anomalies:
                print(f"  {anomaly}")

    def analyze_json_payload(self, json_data):
        """analyze json payload for steganography"""

        anomalies = []

        try:
            import json
            data = json.loads(json_data.decode())

            # check for unusual precision in float values
            for key, value in data.items():
                if isinstance(value, float):
                    # check decimal places
                    decimal_str = str(value).split('.')[1] if '.' in str(value) else ''
                    if len(decimal_str) > 6:  # excessive precision
                        anomalies.append(f'excessive precision in {key}: {value}')

                    # check for non-random decimal patterns
                    if self.has_pattern_in_decimals(decimal_str):
                        anomalies.append(f'patterned decimals in {key}: {value}')

        except exception as e:
            anomalies.append(f'json parsing error: {e}')

        return anomalies

    def check_timing_anomalies(self, dev_addr):
        """check for timing-based covert channels"""

        stats = self.device_stats[dev_addr]
        intervals = stats['intervals'][-50:]  # recent intervals

        if len(intervals) < 10:
            return

        anomalies = []

        # statistical analysis
        mean_interval = statistics.mean(intervals)
        median_interval = statistics.median(intervals)
        stdev_interval = statistics.stdev(intervals)

        # coefficient of variation
        cv = stdev_interval / mean_interval if mean_interval > 0 else 0

        # check for bimodal distribution (timing channel signature)
        interval_counts = defaultdict(int)
        for interval in intervals:
            # bin intervals to nearest 30 seconds
            binned = round(interval / 30) * 30
            interval_counts[binned] += 1

        # look for two dominant intervals
        sorted_counts = sorted(interval_counts.items(), key=lambda x: x[1], reverse=true)
        if (len(sorted_counts) >= 2 and
            sorted_counts[0][1] + sorted_counts[1][1] > len(intervals) * 0.8):

            ratio = sorted_counts[0][0] / sorted_counts[1][0] if sorted_counts[1][0] > 0 else 0
            if 1.5 < ratio < 3.0:  # common encoding ratios
                anomalies.append(f'bimodal timing pattern: {sorted_counts[0][0]}s/{sorted_counts[1][0]}s')

        # check for regular patterns
        if cv < 0.2:  # very regular timing
            anomalies.append(f'unusually regular timing (cv={cv:.3f})')

        # duty cycle analysis
        total_time = stats['last_seen'] - stats['first_seen']
        if total_time > 0:
            transmission_rate = len(stats['transmissions']) / (total_time / 3600)  # per hour
            if transmission_rate > 60:  # more than 1 per minute average
                anomalies.append(f'high transmission rate: {transmission_rate:.1f}/hour')

        if anomalies:
            print(f"timing anomalies from device {dev_addr:08x}:")
            for anomaly in anomalies:
                print(f"  {anomaly}")

    def calculate_entropy(self, data):
        """calculate shannon entropy of data"""
        from collections import counter
        import math

        if not data:
            return 0

        counts = counter(data)
        probs = [count/len(data) for count in counts.values()]
        return -sum(p * math.log2(p) for p in probs if p > 0)

    def has_pattern_in_decimals(self, decimal_str):
        """check for patterns in decimal places"""

        if len(decimal_str) < 4:
            return false

        # check for repeated sequences
        for length in range(1, len(decimal_str) // 2 + 1):
            pattern = decimal_str[:length]
            repetitions = len(decimal_str) // length

            if pattern * repetitions == decimal_str[:len(pattern) * repetitions]:
                if repetitions > 2:  # pattern repeats more than twice
                    return true

        return false

    def generate_detection_report(self, dev_addr):
        """generate comprehensive detection report for device"""

        stats = self.device_stats[dev_addr]

        if not stats['transmissions']:
            return none

        report = {
            'device_address': f'{dev_addr:08x}',
            'observation_period': {
                'start': stats['first_seen'],
                'end': stats['last_seen'],
                'duration_hours': (stats['last_seen'] - stats['first_seen']) / 3600
            },
            'transmission_stats': {
                'total_packets': len(stats['transmissions']),
                'avg_interval': statistics.mean(stats['intervals']) if stats['intervals'] else 0,
                'median_interval': statistics.median(stats['intervals']) if stats['intervals'] else 0,
                'interval_stdev': statistics.stdev(stats['intervals']) if len(stats['intervals']) > 1 else 0
            },
            'payload_stats': {
                'avg_size': statistics.mean(stats['payload_sizes']) if stats['payload_sizes'] else 0,
                'size_variation': statistics.stdev(stats['payload_sizes']) if len(stats['payload_sizes']) > 1 else 0,
                'unique_fports': len(set(stats['fports']))
            },
            'spreading_factor_usage': dict(stats['sf_usage'])
        }

        # calculate anomaly score
        anomaly_score = 0

        # timing regularity
        if report['transmission_stats']['interval_stdev'] > 0:
            cv = (report['transmission_stats']['interval_stdev'] /
                 report['transmission_stats']['avg_interval'])
            if cv < 0.2:
                anomaly_score += 2

        # high transmission rate
        if report['observation_period']['duration_hours'] > 0:
            rate = (report['transmission_stats']['total_packets'] /
                   report['observation_period']['duration_hours'])
            if rate > 10:  # more than 10 per hour
                anomaly_score += 1

        # payload size consistency
        if report['payload_stats']['size_variation'] < 5:  # very consistent sizes
            anomaly_score += 1

        report['anomaly_score'] = anomaly_score
        report['risk_level'] = 'high' if anomaly_score >= 3 else 'medium' if anomaly_score >= 2 else 'low'

        return report

# usage
detector = lorawancovertdetector()
# detector.analyze_lorawan_packet(time.time(), 0x12345678, 25, 1, 7, b'{"temp": 23.5}')

rf analysis for electromagnetic channels

# lorawan rf analysis for cloaklora-style channels
import numpy as np
from scipy import signal, fft
import matplotlib.pyplot as plt

class lorawanrfanalyzer:
    def __init__(self):
        self.sample_rate = 1000000  # 1 mhz
        self.lora_bandwidth = 125000  # 125 khz
        self.center_freq = 868100000  # 868.1 mhz

    def analyze_amplitude_modulation(self, rf_samples):
        """analyze rf samples for amplitude-based covert channels"""

        # extract amplitude envelope
        analytic_signal = signal.hilbert(rf_samples)
        amplitude_envelope = np.abs(analytic_signal)

        # detect amplitude variations
        amplitude_variation = np.std(amplitude_envelope)
        mean_amplitude = np.mean(amplitude_envelope)
        cv_amplitude = amplitude_variation / mean_amplitude

        # look for periodic amplitude patterns
        amplitude_fft = np.fft.fft(amplitude_envelope)
        amplitude_freqs = np.fft.fftfreq(len(amplitude_envelope), 1/self.sample_rate)

        # find dominant frequencies in amplitude variations
        amplitude_power = np.abs(amplitude_fft)**2
        peak_indices = signal.find_peaks(amplitude_power, height=np.max(amplitude_power)*0.1)[0]

        dominant_freqs = amplitude_freqs[peak_indices]

        analysis = {
            'amplitude_variation': amplitude_variation,
            'coefficient_of_variation': cv_amplitude,
            'dominant_frequencies': dominant_freqs[:5],  # top 5
            'covert_channel_likely': cv_amplitude > 0.1  # threshold for detection
        }

        return analysis

    def detect_timing_patterns(self, packet_timestamps):
        """detect covert timing patterns in packet transmissions"""

        if len(packet_timestamps) < 10:
            return {'insufficient_data': true}

        # calculate inter-packet intervals
        intervals = np.diff(packet_timestamps)

        # statistical analysis
        mean_interval = np.mean(intervals)
        std_interval = np.std(intervals)
        cv_interval = std_interval / mean_interval if mean_interval > 0 else 0

        # histogram analysis for bimodal detection
        hist, bin_edges = np.histogram(intervals, bins=20)

        # find peaks in histogram
        peaks, _ = signal.find_peaks(hist, height=np.max(hist)*0.3)

        # bimodal detection
        is_bimodal = len(peaks) >= 2

        if is_bimodal:
            # calculate ratio between dominant intervals
            peak_positions = bin_edges[peaks]
            if len(peak_positions) >= 2:
                interval_ratio = peak_positions[0] / peak_positions[1]
            else:
                interval_ratio = 1
        else:
            interval_ratio = 1

        analysis = {
            'mean_interval': mean_interval,
            'interval_cv': cv_interval,
            'is_bimodal': is_bimodal,
            'interval_ratio': interval_ratio,
            'num_peaks': len(peaks),
            'covert_timing_likely': is_bimodal and 1.5 < interval_ratio < 3.0
        }

        return analysis

    def analyze_lorawan_spectrum(self, rf_samples):
        """analyze spectrum for lorawan signal anomalies"""

        # compute spectrogram
        frequencies, times, spectrogram = signal.spectrogram(
            rf_samples,
            fs=self.sample_rate,
            window='hann',
            nperseg=1024,
            noverlap=512
        )

        # focus on lorawan bandwidth
        lora_freq_mask = (frequencies >= -self.lora_bandwidth/2) & (frequencies <= self.lora_bandwidth/2)
        lora_spectrum = spectrogram[lora_freq_mask, :]

        # analyze spectral features
        spectral_variation = np.std(lora_spectrum, axis=1)
        spectral_peaks = []

        for freq_bin in range(len(spectral_variation)):
            if spectral_variation[freq_bin] > np.mean(spectral_variation) + 2*np.std(spectral_variation):
                spectral_peaks.append(frequencies[lora_freq_mask][freq_bin])

        # chirp analysis
        chirp_linearity = self.analyze_chirp_linearity(lora_spectrum, times)

        analysis = {
            'spectral_peaks': spectral_peaks,
            'chirp_linearity': chirp_linearity,
            'spectrum_anomaly': len(spectral_peaks) > 5  # threshold
        }

        return analysis

    def analyze_chirp_linearity(self, spectrum, times):
        """analyze lorawan chirp linearity for modulation detection"""

        # track instantaneous frequency across time
        instantaneous_freqs = []

        for time_bin in range(spectrum.shape[1]):
            # find frequency with maximum power at this time
            max_power_idx = np.argmax(spectrum[:, time_bin])
            instantaneous_freqs.append(max_power_idx)

        # calculate linearity of frequency sweep
        if len(instantaneous_freqs) > 1:
            # fit linear regression to frequency vs time
            coefficients = np.polyfit(range(len(instantaneous_freqs)), instantaneous_freqs, 1)
            residuals = instantaneous_freqs - np.polyval(coefficients, range(len(instantaneous_freqs)))
            linearity_error = np.std(residuals)

            return {
                'linearity_error': linearity_error,
                'is_linear': linearity_error < 5,  # threshold
                'slope': coefficients[0]
            }

        return {'insufficient_data': true}

    def comprehensive_covert_analysis(self, rf_samples, packet_timestamps):
        """comprehensive analysis for all lorawan covert channel types"""

        results = {
            'timestamp': time.time(),
            'sample_count': len(rf_samples),
            'packet_count': len(packet_timestamps)
        }

        # amplitude modulation analysis
        amplitude_analysis = self.analyze_amplitude_modulation(rf_samples)
        results['amplitude_covert'] = amplitude_analysis

        # timing analysis
        timing_analysis = self.detect_timing_patterns(packet_timestamps)
        results['timing_covert'] = timing_analysis

        # spectrum analysis
        spectrum_analysis = self.analyze_lorawan_spectrum(rf_samples)
        results['spectrum_covert'] = spectrum_analysis

        # overall risk assessment
        risk_factors = 0

        if amplitude_analysis.get('covert_channel_likely', false):
            risk_factors += 2

        if timing_analysis.get('covert_timing_likely', false):
            risk_factors += 2

        if spectrum_analysis.get('spectrum_anomaly', false):
            risk_factors += 1

        results['overall_risk'] = 'high' if risk_factors >= 3 else 'medium' if risk_factors >= 2 else 'low'
        results['risk_score'] = risk_factors

        return results

# usage
# analyzer = lorawanrfanalyzer()
# analysis = analyzer.comprehensive_covert_analysis(rf_data, packet_times)

countermeasures

network-level monitoring

# lorawan network monitoring setup
# install chirpstack for lorawan network analysis
docker-compose up chirpstack-network-server chirpstack-application-server

# monitor lorawan traffic
tcpdump -i any -w lorawan_traffic.pcap 'port 1700'

# analyze packet timing
tshark -r lorawan_traffic.pcap -t fields -e frame.time_epoch -e data.data \
  | awk '{print $1}' | sort -n | awk 'nr>1 {print $1-prev} {prev=$1}'

payload filtering

# lorawan payload filtering and normalization
import json
import re
from cryptography.fernet import fernet

class lorawanpayloadfilter:
    def __init__(self):
        self.max_payload_size = 51  # restrict to sf12 size for security
        self.allowed_fports = [1, 2, 10, 20]  # whitelist approach
        self.entropy_threshold = 6.0

        # payload sanitization rules
        self.sanitization_rules = {
            'max_decimal_places': 2,
            'allowed_json_keys': ['temp', 'humidity', 'battery', 'lat', 'lon'],
            'max_string_length': 20
        }

    def filter_lorawan_packet(self, dev_addr, fport, payload_data):
        """filter and sanitize lorawan packets"""

        # check fport whitelist
        if fport not in self.allowed_fports:
            print(f"blocked packet from {dev_addr:08x}: invalid fport {fport}")
            return none

        # check payload size
        if len(payload_data) > self.max_payload_size:
            print(f"blocked packet from {dev_addr:08x}: oversized payload ({len(payload_data)} bytes)")
            return none

        # check payload entropy
        entropy = self.calculate_entropy(payload_data)
        if entropy > self.entropy_threshold:
            print(f"blocked packet from {dev_addr:08x}: high entropy payload ({entropy:.2f})")
            return none

        # sanitize payload content
        sanitized_payload = self.sanitize_payload(payload_data)

        if sanitized_payload != payload_data:
            print(f"sanitized payload from {dev_addr:08x}")

        return sanitized_payload

    def sanitize_payload(self, payload_data):
        """sanitize payload content"""

        # try json sanitization
        if payload_data.startswith(b'{'):
            return self.sanitize_json_payload(payload_data)

        # binary payload sanitization
        return self.sanitize_binary_payload(payload_data)

    def sanitize_json_payload(self, json_data):
        """sanitize json payload"""

        try:
            data = json.loads(json_data.decode())
            sanitized_data = {}

            for key, value in data.items():
                # check key whitelist
                if key not in self.sanitization_rules['allowed_json_keys']:
                    continue

                # sanitize values
                if isinstance(value, float):
                    # limit decimal places
                    value = round(value, self.sanitization_rules['max_decimal_places'])
                elif isinstance(value, str):
                    # limit string length
                    value = value[:self.sanitization_rules['max_string_length']]
                elif isinstance(value, int):
                    # keep integers as-is but limit range
                    value = max(-1000000, min(1000000, value))
                else:
                    # remove unsupported types
                    continue

                sanitized_data[key] = value

            return json.dumps(sanitized_data).encode()

        except exception as e:
            print(f"json sanitization failed: {e}")
            return b'{"error": "invalid_json"}'

    def sanitize_binary_payload(self, binary_data):
        """sanitize binary payload"""

        # remove potential covert markers
        sanitized = binary_data.replace(b'\xaa\xbb', b'')
        sanitized = sanitized.replace(b'\xcc\xdd', b'')
        sanitized = sanitized.replace(b'\xee\xff', b'')

        # limit to first 32 bytes for safety
        return sanitized[:32]

    def calculate_entropy(self, data):
        """calculate shannon entropy"""
        from collections import counter
        import math

        if not data:
            return 0

        counts = counter(data)
        probs = [count/len(data) for count in counts.values()]
        return -sum(p * math.log2(p) for p in probs if p > 0)

# lorawan traffic shaping
class lorawantrafficshaper:
    def __init__(self):
        # rate limiting per device
        self.device_limits = {
            'packets_per_hour': 10,
            'bytes_per_hour': 1000,
            'burst_limit': 3  # max packets in 5 minutes
        }

        self.device_usage = defaultdict(lambda: {
            'packets_this_hour': 0,
            'bytes_this_hour': 0,
            'last_hour_reset': time.time(),
            'recent_packets': []
        })

    def should_allow_packet(self, dev_addr, payload_size):
        """check if packet should be allowed based on rate limits"""

        current_time = time.time()
        usage = self.device_usage[dev_addr]

        # reset hourly counters
        if current_time - usage['last_hour_reset'] >= 3600:
            usage['packets_this_hour'] = 0
            usage['bytes_this_hour'] = 0
            usage['last_hour_reset'] = current_time

        # check hourly limits
        if usage['packets_this_hour'] >= self.device_limits['packets_per_hour']:
            print(f"rate limit exceeded for {dev_addr:08x}: packets per hour")
            return false

        if usage['bytes_this_hour'] + payload_size > self.device_limits['bytes_per_hour']:
            print(f"rate limit exceeded for {dev_addr:08x}: bytes per hour")
            return false

        # check burst limits
        recent_packets = [t for t in usage['recent_packets'] if current_time - t < 300]  # last 5 minutes
        if len(recent_packets) >= self.device_limits['burst_limit']:
            print(f"burst limit exceeded for {dev_addr:08x}")
            return false

        # update usage
        usage['packets_this_hour'] += 1
        usage['bytes_this_hour'] += payload_size
        usage['recent_packets'] = recent_packets + [current_time]

        return true

# usage
payload_filter = lorawanpayloadfilter()
traffic_shaper = lorawantrafficshaper()

# sanitized = payload_filter.filter_lorawan_packet(0x12345678, 1, b'{"temp": 23.456789}')
# allowed = traffic_shaper.should_allow_packet(0x12345678, len(sanitized) if sanitized else 0)

advantages and limitations

advantages

  • low power consumption: suitable for battery-operated devices
  • long range communication: coverage up to several kilometers
  • infrequent transmission: natural timing variability provides cover
  • limited monitoring: lorawan networks often have minimal security monitoring
  • multiple encoding vectors: payload, timing, fport, rf modulation

limitations

  • very low bandwidth: typical duty cycle limits severely restrict data rates
  • range limitations: covert rf techniques require proximity to receiver
  • packet loss: long-range transmission subject to interference and fading
  • duty cycle restrictions: regulatory limits on transmission frequency
  • limited deployment: lorawan infrastructure not universally available

performance characteristics

capacity analysis

techniquecapacityrangestealth leveldetection difficulty
payload embedding50-200 bytes/packetnetwork rangemediummedium
timing channels1-2 bits/packetnetwork rangehighhigh
fport encoding8 bits/packetnetwork rangelowlow
rf amplitude mod~38 bits/packet250mhighvery high
combined approach250+ bits/packetvariablemediummedium

duty cycle impact

# lorawan duty cycle analysis
def analyze_duty_cycle_impact():
    """analyze how duty cycle affects covert channel capacity"""

    # eu868 band duty cycle limits
    duty_cycles = {
        'g1_bands': 0.01,   # 1% - most restrictive
        'g2_bands': 0.001,  # 0.1% - very restrictive
        'g3_bands': 0.1     # 10% - less restrictive
    }

    # typical lorawan parameters
    packet_duration = 0.5  # 500ms for sf12
    packets_per_day = {}

    for band, duty_cycle in duty_cycles.items():
        # calculate max packets per day
        seconds_per_day = 86400
        max_air_time = seconds_per_day * duty_cycle
        max_packets = int(max_air_time / packet_duration)

        packets_per_day[band] = max_packets

        print(f"{band}: {duty_cycle*100}% duty cycle = {max_packets} packets/day")

        # covert channel capacity
        if max_packets > 0:
            # assume 50 bytes covert data per packet
            covert_bytes_per_day = max_packets * 50
            covert_bits_per_second = (covert_bytes_per_day * 8) / seconds_per_day

            print(f"  covert capacity: {covert_bytes_per_day} bytes/day ({covert_bits_per_second:.4f} bps)")

    return packets_per_day

# analyze_duty_cycle_impact()

real-world applications

iot surveillance

  • long-term monitoring with minimal detection risk
  • coordination between distributed sensor networks
  • backup communication for compromised primary channels

industrial espionage

  • exfiltration from smart factories and industrial iot
  • monitoring production data and operational parameters
  • maintaining persistent access in air-gapped facilities

research applications

  • lorawan security assessment
  • covert channel capacity studies
  • electromagnetic emanation analysis

references

  • lorawan 1.0.4 specification - lora alliance
  • rp002-1.0.3 lorawan regional parameters - lora alliance
  • “cloaklora: a covert channel over lorawan phy” - academic research
  • “security analysis of lorawan networks” - iot security studies
  • chirpstack lorawan network server documentation
on this page