prospect-research-integration
Installation
SKILL.md
Prospect Research Integration
You are an expert in building sales bots that automatically research and enrich prospect data from multiple sources. Your goal is to help developers create systems that gather intelligence to personalize outreach and qualify leads.
Why Research Integration Matters
The Data Gap
What you have:
- Name: John Smith
- Email: john@company.com
- Company: Acme Corp
What you need to personalize:
- Role and responsibilities
- Company initiatives
- Recent news/triggers
- Tech stack
- Pain points
- Budget authority
With Research Integration
Auto-enriched profile:
- Title: VP of Engineering
- Reports to: CTO
- Team size: 45 engineers
- Recent: Announced series B
- Tech: AWS, React, Python
- Hiring: 12 open roles
- Trigger: New product launch
Now you can personalize.
Data Source Integration
Source Configuration
class DataSourceManager:
def __init__(self):
self.sources = {}
self.priority_order = []
self.cache = ResearchCache()
def register_source(self, source_id, source_config):
self.sources[source_id] = {
"connector": source_config["connector"],
"data_types": source_config["data_types"],
"rate_limit": source_config.get("rate_limit"),
"cost_per_lookup": source_config.get("cost", 0),
"reliability": source_config.get("reliability", 0.8)
}
def lookup(self, identifier, data_types_needed):
"""Lookup data from best available sources"""
results = {}
sources_used = []
for data_type in data_types_needed:
# Find best source for this data type
source = self.find_best_source(data_type)
if source:
# Check cache first
cached = self.cache.get(identifier, data_type, source)
if cached and not cached.is_stale():
results[data_type] = cached.data
else:
# Fetch fresh
data = self.fetch_from_source(source, identifier, data_type)
if data:
results[data_type] = data
self.cache.set(identifier, data_type, source, data)
sources_used.append(source)
return {
"data": results,
"sources_used": sources_used,
"completeness": len(results) / len(data_types_needed)
}
# Source definitions
source_manager = DataSourceManager()
source_manager.register_source("clearbit", {
"connector": ClearbitConnector(),
"data_types": ["company_info", "contact_info", "tech_stack"],
"rate_limit": 100,
"cost_per_lookup": 0.10
})
source_manager.register_source("linkedin_api", {
"connector": LinkedInConnector(),
"data_types": ["contact_info", "work_history", "connections"],
"rate_limit": 50,
"cost_per_lookup": 0.05
})
source_manager.register_source("news_api", {
"connector": NewsAPIConnector(),
"data_types": ["company_news", "press_releases"],
"rate_limit": 1000,
"cost_per_lookup": 0.01
})
Company Research
def research_company(company_name, domain=None):
"""Gather comprehensive company intelligence"""
research = {
"basic_info": {},
"financials": {},
"tech_stack": {},
"news": [],
"hiring": {},
"social": {}
}
# Basic company info
basic = source_manager.lookup(
domain or company_name,
["company_size", "industry", "location", "founded"]
)
research["basic_info"] = basic["data"]
# Financial signals
financials = gather_financial_signals(company_name)
research["financials"] = {
"funding": financials.get("recent_funding"),
"revenue_estimate": financials.get("revenue"),
"growth_signals": financials.get("growth_indicators")
}
# Technology stack
tech = detect_tech_stack(domain)
research["tech_stack"] = {
"detected": tech["technologies"],
"categories": categorize_tech(tech["technologies"]),
"relevant_to_us": filter_relevant_tech(tech["technologies"])
}
# Recent news
news = source_manager.lookup(company_name, ["company_news"])
research["news"] = extract_relevant_news(news["data"], days=90)
# Hiring signals
hiring = scrape_job_postings(company_name, domain)
research["hiring"] = {
"total_openings": len(hiring),
"by_department": group_by_department(hiring),
"growth_areas": identify_growth_areas(hiring),
"relevant_roles": filter_relevant_roles(hiring)
}
return research
def gather_financial_signals(company_name):
"""Gather financial intelligence"""
signals = {}
# Check for recent funding
funding = lookup_crunchbase(company_name)
if funding:
signals["recent_funding"] = {
"amount": funding.amount,
"round": funding.round_type,
"date": funding.date,
"investors": funding.investors
}
# Check for IPO/acquisition news
corporate_events = search_sec_filings(company_name)
signals["corporate_events"] = corporate_events
return signals
Contact Research
def research_contact(email=None, name=None, company=None, linkedin_url=None):
"""Gather comprehensive contact intelligence"""
research = {
"professional": {},
"social": {},
"activity": {},
"connections": {}
}
# Professional info
if linkedin_url:
linkedin_data = scrape_linkedin_profile(linkedin_url)
research["professional"] = {
"current_role": linkedin_data.get("title"),
"tenure": calculate_tenure(linkedin_data.get("start_date")),
"previous_roles": linkedin_data.get("experience", [])[:3],
"education": linkedin_data.get("education"),
"skills": linkedin_data.get("skills", [])[:10]
}
# Enrich from data providers
enriched = source_manager.lookup(
email or linkedin_url,
["contact_info", "work_history"]
)
research["professional"].update(enriched.get("data", {}))
# Social activity
if linkedin_url:
activity = get_linkedin_activity(linkedin_url)
research["activity"] = {
"recent_posts": activity.get("posts", [])[:5],
"engagement_topics": extract_topics(activity),
"content_style": analyze_content_style(activity)
}
# Mutual connections
if company:
research["connections"] = {
"mutual_connections": find_mutual_connections(linkedin_url),
"shared_groups": find_shared_groups(linkedin_url),
"common_background": find_common_background(linkedin_url)
}
return research
Trigger Event Detection
Event Monitoring
class TriggerEventMonitor:
def __init__(self):
self.event_types = [
"funding_round",
"executive_hire",
"product_launch",
"expansion",
"acquisition",
"partnership",
"award",
"earnings"
]
def monitor_account(self, company, domain):
"""Set up monitoring for trigger events"""
monitors = []
# News monitoring
monitors.append({
"type": "news",
"query": f'"{company}" OR site:{domain}',
"frequency": "daily"
})
# Job posting monitoring
monitors.append({
"type": "jobs",
"company": company,
"frequency": "weekly"
})
# LinkedIn monitoring
monitors.append({
"type": "linkedin_company",
"company": company,
"track": ["posts", "employee_changes"]
})
return create_monitors(monitors)
def process_event(self, event, account):
"""Process detected trigger event"""
# Classify event
event_type = classify_event(event)
# Score relevance
relevance = score_event_relevance(event, account)
if relevance > 0.6:
trigger = {
"account_id": account.id,
"event_type": event_type,
"event_data": event,
"relevance_score": relevance,
"detected_at": datetime.now(),
"recommended_action": get_recommended_action(event_type)
}
# Alert for high-priority triggers
if relevance > 0.8:
send_trigger_alert(trigger)
return trigger
return None
def classify_event(event):
"""Classify event into trigger category"""
patterns = {
"funding_round": [
r"raised.*\$\d+",
r"series [a-e]",
r"funding round",
r"investment from"
],
"executive_hire": [
r"(hired|appointed|named).*(?:CEO|CTO|VP|Director)",
r"joins as",
r"new (?:CEO|CTO|VP)"
],
"product_launch": [
r"launched",
r"announces.*product",
r"introduces",
r"releases"
],
"expansion": [
r"expands to",
r"opens.*office",
r"enters.*market",
r"international expansion"
]
}
for event_type, event_patterns in patterns.items():
for pattern in event_patterns:
if re.search(pattern, event.text, re.IGNORECASE):
return event_type
return "other"
Data Synthesis
Profile Builder
class ProspectProfileBuilder:
def __init__(self):
self.research_sources = []
def build_profile(self, prospect_input):
"""Build comprehensive prospect profile"""
profile = ProspectProfile()
# Research company
company_research = research_company(
prospect_input.company,
prospect_input.domain
)
profile.company = company_research
# Research contact
contact_research = research_contact(
email=prospect_input.email,
name=prospect_input.name,
company=prospect_input.company,
linkedin_url=prospect_input.linkedin
)
profile.contact = contact_research
# Find triggers
triggers = find_recent_triggers(prospect_input.company)
profile.triggers = triggers
# Synthesize insights
profile.insights = self.synthesize_insights(profile)
# Score profile completeness
profile.completeness_score = self.calculate_completeness(profile)
return profile
def synthesize_insights(self, profile):
"""Generate actionable insights from research"""
insights = []
# Company + contact alignment
if profile.contact.get("professional", {}).get("tenure"):
tenure = profile.contact["professional"]["tenure"]
if tenure < 6:
insights.append({
"type": "new_in_role",
"insight": f"Recently started ({tenure} months)",
"opportunity": "May be evaluating new tools"
})
# Growth signals
if profile.company.get("hiring", {}).get("total_openings", 0) > 10:
insights.append({
"type": "high_growth",
"insight": f"{profile.company['hiring']['total_openings']} open roles",
"opportunity": "Scaling team, may need solutions"
})
# Funding trigger
funding = profile.company.get("financials", {}).get("funding")
if funding and days_since(funding["date"]) < 90:
insights.append({
"type": "recent_funding",
"insight": f"Raised {funding['amount']} {funding['round']}",
"opportunity": "Budget available for new initiatives"
})
# Tech stack fit
tech = profile.company.get("tech_stack", {}).get("relevant_to_us", [])
if tech:
insights.append({
"type": "tech_fit",
"insight": f"Using {', '.join(tech[:3])}",
"opportunity": "Good technical fit"
})
return insights
Research Scoring
def score_research_quality(profile):
"""Score the quality and completeness of research"""
scores = {}
# Contact completeness
contact_fields = ["title", "tenure", "previous_roles", "linkedin_url"]
contact_complete = sum(
1 for f in contact_fields
if profile.contact.get("professional", {}).get(f)
)
scores["contact"] = contact_complete / len(contact_fields)
# Company completeness
company_fields = ["size", "industry", "tech_stack", "funding"]
company_complete = sum(
1 for f in company_fields
if profile.company.get(f) or profile.company.get("basic_info", {}).get(f)
)
scores["company"] = company_complete / len(company_fields)
# Trigger freshness
if profile.triggers:
most_recent = max(t["detected_at"] for t in profile.triggers)
days_old = (datetime.now() - most_recent).days
scores["triggers"] = max(0, 1 - days_old / 90)
else:
scores["triggers"] = 0
# Overall score
scores["overall"] = (
scores["contact"] * 0.3 +
scores["company"] * 0.4 +
scores["triggers"] * 0.3
)
return scores
Caching & Freshness
Research Cache
class ResearchCache:
def __init__(self):
self.cache = {}
self.freshness_rules = {
"company_info": timedelta(days=30),
"contact_info": timedelta(days=14),
"tech_stack": timedelta(days=60),
"news": timedelta(days=1),
"hiring": timedelta(days=7)
}
def get(self, identifier, data_type, source):
key = f"{identifier}:{data_type}:{source}"
cached = self.cache.get(key)
if cached:
freshness_window = self.freshness_rules.get(
data_type,
timedelta(days=7)
)
is_stale = datetime.now() - cached["timestamp"] > freshness_window
return CacheResult(
data=cached["data"],
is_stale=is_stale,
age=datetime.now() - cached["timestamp"]
)
return None
def set(self, identifier, data_type, source, data):
key = f"{identifier}:{data_type}:{source}"
self.cache[key] = {
"data": data,
"timestamp": datetime.now(),
"source": source
}
Metrics
Research Effectiveness
Track:
- Profile completeness rate
- Data freshness scores
- Source reliability rates
- Cost per enriched lead
- Research to conversion correlation
Optimize for:
- >80% profile completeness
- <7 day average data age
- Cost efficiency across sources
Weekly Installs
6
Repository
louisblythe/salesskillsGitHub Stars
11
First Seen
Mar 18, 2026
Security Audits