building-threat-intelligence-platform
SKILL.md
Building Threat Intelligence Platform
Overview
Building a Threat Intelligence Platform (TIP) involves deploying and integrating multiple CTI tools into a unified system for collecting, analyzing, enriching, and disseminating threat intelligence. This skill covers designing TIP architecture using open-source tools (MISP, OpenCTI, TheHive, Cortex), configuring feed ingestion pipelines, establishing enrichment workflows, implementing STIX/TAXII interoperability, and building analyst dashboards for CTI operations.
Prerequisites
- Docker and Docker Compose for deploying platform components
- Python 3.9+ with
pymisp,pycti,thehive4pylibraries - Elasticsearch/OpenSearch cluster for data storage
- Redis and RabbitMQ for message queuing
- Understanding of STIX 2.1 data model and TAXII 2.1 transport
- API keys for enrichment services (VirusTotal, Shodan, AbuseIPDB)
Key Concepts
TIP Architecture Components
- Collection Layer: Feed ingestion from OSINT, commercial, and internal sources
- Storage Layer: Elasticsearch/OpenSearch for indexed CTI data with STIX 2.1 schema
- Analysis Layer: OpenCTI for knowledge graph analysis and MISP for IOC correlation
- Enrichment Layer: Cortex analyzers for automated IOC enrichment
- Response Layer: TheHive for case management and incident response integration
- Sharing Layer: TAXII server for outbound intelligence sharing
Platform Integration Points
- MISP <-> OpenCTI: Bidirectional sync via OpenCTI MISP connector
- OpenCTI <-> TheHive: Alert/case creation from high-confidence indicators
- TheHive <-> Cortex: Automated analysis and enrichment of case observables
- All <-> SIEM: Real-time IOC push to Splunk/Elastic via API or Kafka
Practical Steps
Step 1: Deploy Platform with Docker Compose
version: '3.8'
services:
# --- Storage Layer ---
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
ports:
- "9200:9200"
volumes:
- es-data:/usr/share/elasticsearch/data
redis:
image: redis:7
ports:
- "6379:6379"
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
minio:
image: minio/minio
command: server /data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
# --- MISP ---
misp:
image: ghcr.io/misp/misp-docker/misp-core:latest
ports:
- "8443:443"
environment:
- MISP_ADMIN_EMAIL=admin@tip.local
- MISP_BASEURL=https://localhost:8443
volumes:
- misp-data:/var/www/MISP/app/files
# --- OpenCTI ---
opencti:
image: opencti/platform:6.4.4
environment:
- APP__PORT=8080
- APP__ADMIN__EMAIL=admin@tip.local
- APP__ADMIN__PASSWORD=TIPAdminPassword
- APP__ADMIN__TOKEN=tip-opencti-token-uuid
- ELASTICSEARCH__URL=http://elasticsearch:9200
- MINIO__ENDPOINT=minio
- RABBITMQ__HOSTNAME=rabbitmq
- REDIS__HOSTNAME=redis
ports:
- "8080:8080"
depends_on:
- elasticsearch
- redis
- rabbitmq
- minio
# --- TheHive ---
thehive:
image: strangebee/thehive:5.3
environment:
- TH_CORTEX_URL=http://cortex:9001
ports:
- "9000:9000"
depends_on:
- elasticsearch
# --- Cortex ---
cortex:
image: thehiveproject/cortex:3.1.8
ports:
- "9001:9001"
depends_on:
- elasticsearch
volumes:
es-data:
misp-data:
Step 2: Configure Feed Ingestion Pipeline
from pymisp import PyMISP
from pycti import OpenCTIApiClient
import json
class TIPFeedManager:
"""Manage threat intelligence feed ingestion across platform components."""
def __init__(self, misp_url, misp_key, opencti_url, opencti_token):
self.misp = PyMISP(misp_url, misp_key, ssl=False)
self.opencti = OpenCTIApiClient(opencti_url, opencti_token)
def configure_osint_feeds(self):
"""Enable default OSINT feeds in MISP."""
osint_feeds = [
{"name": "CIRCL OSINT", "id": 1},
{"name": "Botvrij.eu", "id": 2},
{"name": "abuse.ch URLhaus", "id": 5},
{"name": "abuse.ch Feodo Tracker", "id": 6},
]
for feed in osint_feeds:
try:
self.misp.enable_feed(feed["id"])
self.misp.fetch_feed(feed["id"])
print(f"[+] Enabled feed: {feed['name']}")
except Exception as e:
print(f"[-] Failed: {feed['name']}: {e}")
def configure_opencti_connectors(self):
"""List and verify OpenCTI connector status."""
connectors = self.opencti.connector.list()
for conn in connectors:
print(
f" Connector: {conn['name']} - "
f"Active: {conn['active']} - "
f"Type: {conn['connector_type']}"
)
def sync_misp_to_opencti(self):
"""Verify MISP-OpenCTI sync is operational."""
# OpenCTI MISP connector handles this automatically
# Check connector status
connectors = self.opencti.connector.list()
misp_connector = [
c for c in connectors if "misp" in c["name"].lower()
]
if misp_connector:
print(f"[+] MISP connector active: {misp_connector[0]['active']}")
else:
print("[-] MISP connector not found - configure in Docker Compose")
Step 3: Build Enrichment Pipeline with Cortex
import requests
class CortexEnrichment:
"""Integrate Cortex analyzers for automated enrichment."""
def __init__(self, cortex_url, cortex_key):
self.url = cortex_url
self.headers = {"Authorization": f"Bearer {cortex_key}"}
def list_analyzers(self):
"""List available Cortex analyzers."""
resp = requests.get(
f"{self.url}/api/analyzer",
headers=self.headers,
timeout=30,
)
if resp.status_code == 200:
analyzers = resp.json()
for a in analyzers:
print(f" {a['name']}: {a.get('description', '')[:60]}")
return analyzers
return []
def analyze_observable(self, observable_type, observable_value, analyzer_id):
"""Submit an observable for analysis."""
job = {
"data": observable_value,
"dataType": observable_type,
"tlp": 2,
"message": "TIP automated enrichment",
}
resp = requests.post(
f"{self.url}/api/analyzer/{analyzer_id}/run",
json=job,
headers=self.headers,
timeout=30,
)
if resp.status_code == 200:
return resp.json()
return None
def get_job_report(self, job_id):
"""Get the report for a completed analysis job."""
resp = requests.get(
f"{self.url}/api/job/{job_id}/report",
headers=self.headers,
timeout=60,
)
if resp.status_code == 200:
return resp.json()
return None
Step 4: Implement Analyst Dashboard Metrics
class TIPMetrics:
"""Collect platform metrics for analyst dashboards."""
def __init__(self, misp, opencti):
self.misp = misp
self.opencti = opencti
def get_platform_stats(self):
"""Collect statistics across all platform components."""
stats = {}
# MISP stats
misp_stats = self.misp.get_server_statistics()
stats["misp"] = {
"total_events": misp_stats.get("event_count", 0),
"total_attributes": misp_stats.get("attribute_count", 0),
"active_feeds": len([
f for f in self.misp.feeds()
if f.get("Feed", {}).get("enabled")
]),
}
# OpenCTI stats via GraphQL
stats["opencti"] = {
"total_indicators": self.opencti.indicator.list(
first=0, withPagination=True
).get("pagination", {}).get("globalCount", 0),
"total_reports": self.opencti.report.list(
first=0, withPagination=True
).get("pagination", {}).get("globalCount", 0),
}
return stats
Validation Criteria
- All platform components (MISP, OpenCTI, TheHive, Cortex) deployed and accessible
- MISP-OpenCTI bidirectional sync operational
- At least 3 OSINT feeds ingesting data
- Cortex analyzers configured and returning enrichment results
- Platform metrics dashboard showing real-time statistics
- STIX/TAXII export functional for intelligence sharing
References
Weekly Installs
4
Repository
mukul975/anthro…y-skillsGitHub Stars
1.3K
First Seen
2 days ago
Security Audits
Installed on
amp4
cline4
opencode4
cursor4
kimi-cli4
codex4