skills/mukul975/anthropic-cybersecurity-skills/building-automated-malware-submission-pipeline

building-automated-malware-submission-pipeline

SKILL.md

Building Automated Malware Submission Pipeline

When to Use

Use this skill when:

  • SOC teams face high volume of suspicious file alerts requiring sandbox analysis
  • Manual sandbox submission creates bottlenecks in alert triage workflow
  • Endpoint and email security tools quarantine files needing automated verdict determination
  • Incident response requires rapid malware family identification and IOC extraction

Do not use for analyzing live malware samples in production environments — always use isolated sandbox infrastructure.

Prerequisites

  • Sandbox environment: Cuckoo Sandbox, Joe Sandbox, Any.Run, or VMRay
  • VirusTotal API key (Enterprise for submission, free for lookup)
  • MalwareBazaar API access for known malware lookup
  • File collection mechanism: EDR quarantine API, email gateway export, network capture
  • Python 3.8+ with requests, vt-py, pefile libraries
  • Isolated analysis network with no production connectivity

Workflow

Step 1: Build File Collection Pipeline

Collect suspicious files from multiple sources:

import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime

class MalwareCollector:
    def __init__(self, quarantine_dir="/opt/malware_quarantine"):
        self.quarantine_dir = Path(quarantine_dir)
        self.quarantine_dir.mkdir(exist_ok=True)

    def collect_from_edr(self, edr_api_url, api_token):
        """Pull quarantined files from CrowdStrike Falcon"""
        headers = {"Authorization": f"Bearer {api_token}"}

        # Get recent quarantine events
        response = requests.get(
            f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
            headers=headers,
            params={"filter": "state:'quarantined'", "limit": 50}
        )
        file_ids = response.json()["resources"]

        for file_id in file_ids:
            # Download quarantined file
            dl_response = requests.get(
                f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
                headers=headers,
                params={"ids": file_id}
            )
            file_data = dl_response.content
            sha256 = hashlib.sha256(file_data).hexdigest()

            filepath = self.quarantine_dir / f"{sha256}.sample"
            filepath.write_bytes(file_data)
            yield {"sha256": sha256, "path": str(filepath), "source": "edr"}

    def collect_from_email_gateway(self, smtp_quarantine_path):
        """Pull attachments from email gateway quarantine"""
        import email
        from email import policy

        for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
            msg = email.message_from_binary_file(
                eml_file.open("rb"), policy=policy.default
            )
            for attachment in msg.iter_attachments():
                content = attachment.get_content()
                if isinstance(content, str):
                    content = content.encode()
                sha256 = hashlib.sha256(content).hexdigest()
                filename = attachment.get_filename() or "unknown"

                filepath = self.quarantine_dir / f"{sha256}.sample"
                filepath.write_bytes(content)
                yield {
                    "sha256": sha256,
                    "path": str(filepath),
                    "source": "email",
                    "original_filename": filename,
                    "sender": msg["From"],
                    "subject": msg["Subject"]
                }

    def compute_hashes(self, filepath):
        """Calculate MD5, SHA1, SHA256 for a file"""
        with open(filepath, "rb") as f:
            content = f.read()
        return {
            "md5": hashlib.md5(content).hexdigest(),
            "sha1": hashlib.sha1(content).hexdigest(),
            "sha256": hashlib.sha256(content).hexdigest(),
            "size": len(content)
        }

Step 2: Pre-Screen with Hash Lookups

Check if the file is already known before sandbox submission:

import vt

class MalwarePreScreener:
    def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
        self.vt_client = vt.Client(vt_api_key)
        self.mb_api_url = mb_api_url

    def check_virustotal(self, sha256):
        """Lookup hash in VirusTotal"""
        try:
            file_obj = self.vt_client.get_object(f"/files/{sha256}")
            stats = file_obj.last_analysis_stats
            return {
                "found": True,
                "malicious": stats.get("malicious", 0),
                "suspicious": stats.get("suspicious", 0),
                "undetected": stats.get("undetected", 0),
                "total": sum(stats.values()),
                "threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
                    "suggested_threat_label", "Unknown"
                ),
                "type": getattr(file_obj, "type_description", "Unknown")
            }
        except vt.APIError:
            return {"found": False}

    def check_malwarebazaar(self, sha256):
        """Lookup hash in MalwareBazaar"""
        response = requests.post(
            self.mb_api_url,
            data={"query": "get_info", "hash": sha256}
        )
        data = response.json()
        if data["query_status"] == "ok":
            entry = data["data"][0]
            return {
                "found": True,
                "signature": entry.get("signature", "Unknown"),
                "tags": entry.get("tags", []),
                "file_type": entry.get("file_type", "Unknown"),
                "first_seen": entry.get("first_seen", "Unknown")
            }
        return {"found": False}

    def pre_screen(self, sha256):
        """Run all pre-screening checks"""
        vt_result = self.check_virustotal(sha256)
        mb_result = self.check_malwarebazaar(sha256)

        verdict = "UNKNOWN"
        if vt_result["found"] and vt_result.get("malicious", 0) > 10:
            verdict = "KNOWN_MALICIOUS"
        elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
            verdict = "LIKELY_CLEAN"

        return {
            "sha256": sha256,
            "virustotal": vt_result,
            "malwarebazaar": mb_result,
            "pre_screen_verdict": verdict,
            "needs_sandbox": verdict == "UNKNOWN"
        }

    def close(self):
        self.vt_client.close()

Step 3: Submit to Sandbox for Dynamic Analysis

Cuckoo Sandbox Submission:

class SandboxSubmitter:
    def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
        self.cuckoo_url = cuckoo_url

    def submit_to_cuckoo(self, filepath, timeout=300):
        """Submit file to Cuckoo Sandbox"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{self.cuckoo_url}/tasks/create/file",
                files={"file": f},
                data={
                    "timeout": timeout,
                    "options": "procmemdump=yes,route=none",
                    "priority": 2,
                    "machine": "win10_x64"
                }
            )
        task_id = response.json()["task_id"]
        return task_id

    def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
        """Wait for sandbox analysis to complete"""
        import time
        elapsed = 0
        while elapsed < max_wait:
            response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
            status = response.json()["task"]["status"]
            if status == "reported":
                return self.get_report(task_id)
            elif status == "failed_analysis":
                return {"error": "Analysis failed"}
            time.sleep(poll_interval)
            elapsed += poll_interval
        return {"error": "Analysis timed out"}

    def get_report(self, task_id):
        """Retrieve analysis report"""
        response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
        report = response.json()

        # Extract key indicators
        return {
            "task_id": task_id,
            "score": report.get("info", {}).get("score", 0),
            "signatures": [
                {"name": s["name"], "severity": s["severity"], "description": s["description"]}
                for s in report.get("signatures", [])
            ],
            "network": {
                "dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
                "http": [
                    {"url": h["uri"], "method": h["method"]}
                    for h in report.get("network", {}).get("http", [])
                ],
                "hosts": report.get("network", {}).get("hosts", [])
            },
            "dropped_files": [
                {"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
                for f in report.get("dropped", [])
            ],
            "processes": [
                {"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
                for p in report.get("behavior", {}).get("processes", [])
            ],
            "registry_keys": [
                k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
            ]
        }

    def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
        """Submit to Joe Sandbox Cloud"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{joe_url}/v2/submission/new",
                headers={"Authorization": f"Bearer {joe_api_key}"},
                files={"sample": f},
                data={
                    "systems": "w10_64",
                    "internet-access": False,
                    "report-cache": True
                }
            )
        return response.json()["data"]["webid"]

Step 4: Extract IOCs and Generate Verdict

class VerdictGenerator:
    def __init__(self):
        self.malicious_threshold = 7  # Cuckoo score threshold

    def generate_verdict(self, pre_screen, sandbox_report):
        """Combine pre-screening and sandbox results for final verdict"""
        iocs = {
            "ips": [],
            "domains": [],
            "urls": [],
            "hashes": [],
            "registry_keys": [],
            "files_dropped": []
        }

        # Extract IOCs from sandbox report
        if sandbox_report:
            iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
            iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
            iocs["urls"] = [
                h["url"] for h in sandbox_report.get("network", {}).get("http", [])
            ]
            iocs["hashes"] = [
                f["sha256"] for f in sandbox_report.get("dropped_files", [])
            ]
            iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
            iocs["files_dropped"] = sandbox_report.get("dropped_files", [])

        # Determine verdict
        vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
        sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
        sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0

        combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)

        if combined_score >= 100:
            verdict = "MALICIOUS"
            confidence = "HIGH"
        elif combined_score >= 50:
            verdict = "SUSPICIOUS"
            confidence = "MEDIUM"
        elif combined_score >= 20:
            verdict = "POTENTIALLY_UNWANTED"
            confidence = "LOW"
        else:
            verdict = "CLEAN"
            confidence = "HIGH"

        return {
            "verdict": verdict,
            "confidence": confidence,
            "combined_score": combined_score,
            "iocs": iocs,
            "vt_detections": vt_malicious,
            "sandbox_score": sandbox_score,
            "signatures": sandbox_report.get("signatures", []) if sandbox_report else []
        }

Step 5: Push Results to SIEM

def push_to_splunk(verdict_result, splunk_url, splunk_token):
    """Send malware analysis verdict to Splunk HEC"""
    import json

    event = {
        "sourcetype": "malware_analysis",
        "source": "malware_pipeline",
        "event": {
            "sha256": verdict_result["sha256"],
            "verdict": verdict_result["verdict"],
            "confidence": verdict_result["confidence"],
            "score": verdict_result["combined_score"],
            "vt_detections": verdict_result["vt_detections"],
            "sandbox_score": verdict_result["sandbox_score"],
            "malware_family": verdict_result.get("threat_label", "Unknown"),
            "iocs": verdict_result["iocs"],
            "signatures": [s["name"] for s in verdict_result["signatures"]]
        }
    }

    response = requests.post(
        f"{splunk_url}/services/collector/event",
        headers={
            "Authorization": f"Splunk {splunk_token}",
            "Content-Type": "application/json"
        },
        json=event,
        verify=False
    )
    return response.status_code == 200

def push_iocs_to_blocklist(iocs, firewall_api):
    """Push extracted IOCs to blocking infrastructure"""
    for ip in iocs.get("ips", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
        )
    for domain in iocs.get("domains", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
        )

Step 6: Orchestrate the Full Pipeline

def run_malware_pipeline(sample_path, config):
    """Execute full malware analysis pipeline"""
    collector = MalwareCollector()
    screener = MalwarePreScreener(config["vt_key"])
    submitter = SandboxSubmitter(config["cuckoo_url"])
    generator = VerdictGenerator()

    # Step 1: Hash and pre-screen
    hashes = collector.compute_hashes(sample_path)
    pre_screen = screener.pre_screen(hashes["sha256"])

    # Step 2: Submit to sandbox if unknown
    sandbox_report = None
    if pre_screen["needs_sandbox"]:
        task_id = submitter.submit_to_cuckoo(sample_path)
        sandbox_report = submitter.wait_for_analysis(task_id)

    # Step 3: Generate verdict
    verdict = generator.generate_verdict(pre_screen, sandbox_report)
    verdict["sha256"] = hashes["sha256"]
    verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")

    # Step 4: Push to SIEM
    push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])

    # Step 5: Block if malicious
    if verdict["verdict"] == "MALICIOUS":
        push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])

    screener.close()
    return verdict

Key Concepts

Term Definition
Dynamic Analysis Executing malware in a sandbox to observe runtime behavior (process creation, network, file system changes)
Static Analysis Examining malware without execution (hash lookup, string analysis, PE header inspection)
Sandbox Evasion Techniques malware uses to detect sandbox environments and alter behavior to avoid analysis
IOC Extraction Automated process of identifying network indicators, file artifacts, and registry changes from sandbox reports
Multi-AV Scanning Submitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection
Verdict Final classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean

Tools & Systems

  • Cuckoo Sandbox: Open-source automated malware analysis platform with behavioral analysis and network capture
  • Joe Sandbox: Commercial sandbox with deep behavioral analysis, YARA matching, and MITRE ATT&CK mapping
  • Any.Run: Interactive sandbox service allowing real-time manipulation during analysis for debugging evasive malware
  • VirusTotal: Multi-engine scanning service providing 70+ AV results and behavioral analysis reports
  • CAPE Sandbox: Community-maintained Cuckoo fork with enhanced payload extraction and configuration dumping

Common Scenarios

  • Email Attachment Triage: Auto-submit quarantined email attachments, generate verdict in <5 minutes
  • EDR Quarantine Processing: Batch-process files quarantined by endpoint security for detailed analysis
  • Incident Investigation: Submit suspicious binaries found during IR for malware family identification and IOC extraction
  • Threat Intel Enrichment: Analyze samples from threat feeds to extract C2 infrastructure and update blocking
  • Zero-Day Detection: Sandbox catches novel malware missed by signature-based AV through behavioral analysis

Output Format

MALWARE ANALYSIS REPORT — Pipeline Submission
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Sample:       invoice_march.docx
SHA256:       a1b2c3d4e5f6a7b8...
File Type:    Microsoft Word Document (macro-enabled)

Pre-Screening:
  VirusTotal:    34/72 malicious (Emotet.Downloader)
  MalwareBazaar: Tags: emotet, macro, downloader

Sandbox Analysis (Cuckoo):
  Score:         9.2/10 (MALICIOUS)
  Signatures:
    - Macro executes PowerShell download cradle (severity: 8)
    - Process injection into explorer.exe (severity: 9)
    - Connects to known Emotet C2 server (severity: 9)

Extracted IOCs:
  C2 IPs:       185.234.218[.]50:8080, 45.77.123[.]45:443
  Domains:       update-service[.]evil[.]com
  Dropped Files: payload.dll (SHA256: b2c3d4e5...)
  Registry:      HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update

VERDICT: MALICIOUS (Emotet Downloader) — Confidence: HIGH
ACTIONS:
  [DONE] IOCs pushed to Splunk threat intel
  [DONE] C2 IPs blocked on firewall
  [DONE] Domain sinkholed on DNS
  [DONE] Hash blocked on endpoint
Weekly Installs
3
GitHub Stars
1.3K
First Seen
1 day ago
Installed on
opencode3
github-copilot3
codex3
kimi-cli3
gemini-cli3
cursor3