skills/mukul975/anthropic-cybersecurity-skills/analyzing-network-covert-channels-in-malware

analyzing-network-covert-channels-in-malware

SKILL.md

Analyzing Network Covert Channels in Malware

Overview

Malware uses covert channels to disguise C2 communication and data exfiltration within legitimate-looking network traffic. DNS tunneling encodes data in DNS queries and responses (used by tools like iodine, dnscat2, and malware families like FrameworkPOS). ICMP tunneling hides data in echo request/reply payloads (icmpsh, ptunnel). HTTP covert channels embed C2 data in headers, cookies, or steganographic images. Protocol abuse exploits allowed protocols to bypass firewalls. DNS tunneling detection achieves 99%+ recall with modern ML-based approaches, though low-throughput exfiltration remains challenging. Palo Alto Unit42 tracked three major DNS tunneling campaigns (TrkCdn, SecShow, Savvy Seahorse) through 2024, showing the technique's continued prevalence.

Prerequisites

  • Python 3.9+ with scapy, dpkt, dnslib
  • Wireshark/tshark for PCAP analysis
  • Zeek (formerly Bro) for network monitoring
  • DNS query logging infrastructure
  • Understanding of DNS, ICMP, HTTP protocols at packet level

Practical Steps

Step 1: DNS Tunneling Detection

#!/usr/bin/env python3
"""Detect DNS tunneling and covert channels in network traffic."""
import sys
import json
import math
from collections import Counter, defaultdict

try:
    from scapy.all import rdpcap, DNS, DNSQR, DNSRR, IP, ICMP
except ImportError:
    print("pip install scapy")
    sys.exit(1)


def entropy(data):
    if not data:
        return 0
    freq = Counter(data)
    length = len(data)
    return -sum((c/length) * math.log2(c/length) for c in freq.values())


def analyze_dns_tunneling(pcap_path):
    """Detect DNS tunneling indicators in PCAP."""
    packets = rdpcap(pcap_path)
    domain_stats = defaultdict(lambda: {
        "queries": 0, "total_qname_len": 0, "subdomain_lengths": [],
        "query_types": Counter(), "unique_subdomains": set(),
    })

    for pkt in packets:
        if pkt.haslayer(DNS) and pkt.haslayer(DNSQR):
            qname = pkt[DNSQR].qname.decode('utf-8', errors='replace').rstrip('.')
            qtype = pkt[DNSQR].qtype

            parts = qname.split('.')
            if len(parts) >= 3:
                base_domain = '.'.join(parts[-2:])
                subdomain = '.'.join(parts[:-2])

                stats = domain_stats[base_domain]
                stats["queries"] += 1
                stats["total_qname_len"] += len(qname)
                stats["subdomain_lengths"].append(len(subdomain))
                stats["query_types"][qtype] += 1
                stats["unique_subdomains"].add(subdomain)

    # Score domains for tunneling indicators
    suspicious = []
    for domain, stats in domain_stats.items():
        if stats["queries"] < 5:
            continue

        avg_subdomain_len = (sum(stats["subdomain_lengths"]) /
                             len(stats["subdomain_lengths"]))
        unique_ratio = len(stats["unique_subdomains"]) / stats["queries"]

        # Calculate subdomain entropy
        all_subdomains = ''.join(stats["unique_subdomains"])
        sub_entropy = entropy(all_subdomains)

        score = 0
        reasons = []

        if avg_subdomain_len > 30:
            score += 30
            reasons.append(f"Long subdomains (avg {avg_subdomain_len:.0f} chars)")
        if unique_ratio > 0.9:
            score += 25
            reasons.append(f"High uniqueness ({unique_ratio:.2%})")
        if sub_entropy > 4.0:
            score += 25
            reasons.append(f"High entropy ({sub_entropy:.2f})")
        if stats["query_types"].get(16, 0) > 10:  # TXT records
            score += 20
            reasons.append(f"Many TXT queries ({stats['query_types'][16]})")

        if score >= 50:
            suspicious.append({
                "domain": domain,
                "score": score,
                "queries": stats["queries"],
                "avg_subdomain_length": round(avg_subdomain_len, 1),
                "unique_subdomains": len(stats["unique_subdomains"]),
                "subdomain_entropy": round(sub_entropy, 2),
                "reasons": reasons,
            })

    return sorted(suspicious, key=lambda x: -x["score"])


def analyze_icmp_tunneling(pcap_path):
    """Detect ICMP tunneling in PCAP."""
    packets = rdpcap(pcap_path)
    icmp_stats = defaultdict(lambda: {"count": 0, "payload_sizes": [], "payloads": []})

    for pkt in packets:
        if pkt.haslayer(ICMP) and pkt.haslayer(IP):
            src = pkt[IP].src
            dst = pkt[IP].dst
            key = f"{src}->{dst}"

            payload = bytes(pkt[ICMP].payload)
            icmp_stats[key]["count"] += 1
            icmp_stats[key]["payload_sizes"].append(len(payload))
            if len(payload) > 64:
                icmp_stats[key]["payloads"].append(payload[:100])

    suspicious = []
    for flow, stats in icmp_stats.items():
        if stats["count"] < 5:
            continue
        avg_size = sum(stats["payload_sizes"]) / len(stats["payload_sizes"])
        if avg_size > 64 or stats["count"] > 100:
            suspicious.append({
                "flow": flow,
                "packets": stats["count"],
                "avg_payload_size": round(avg_size, 1),
                "reason": "Large/frequent ICMP payloads suggest tunneling",
            })

    return suspicious


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print(f"Usage: {sys.argv[0]} <pcap_file>")
        sys.exit(1)

    print("[+] DNS Tunneling Analysis")
    dns_results = analyze_dns_tunneling(sys.argv[1])
    for r in dns_results:
        print(f"  {r['domain']} (score: {r['score']})")
        for reason in r['reasons']:
            print(f"    - {reason}")

    print("\n[+] ICMP Tunneling Analysis")
    icmp_results = analyze_icmp_tunneling(sys.argv[1])
    for r in icmp_results:
        print(f"  {r['flow']}: {r['reason']}")

Validation Criteria

  • DNS tunneling detected via entropy, subdomain length, and query volume analysis
  • ICMP covert channels identified through payload size anomalies
  • Tunneling domains distinguished from legitimate CDN/cloud traffic
  • Data exfiltration volume estimated from captured traffic
  • C2 communication patterns and beaconing intervals extracted

References

Weekly Installs
7
GitHub Stars
1.3K
First Seen
2 days ago
Installed on
kimi-cli7
gemini-cli7
github-copilot7
amp7
cline7
codex7