youtube-topic-researcher

Installation
SKILL.md

YouTube Topic Researcher

Research any topic or niche across YouTube to find what's working, identify content gaps, and generate data-driven video ideas.

Usage

/youtube-topic-researcher air fryer recipes
/youtube-topic-researcher "Python automation"
/youtube-topic-researcher meditation for beginners
/youtube-topic-researcher --topic "home gym setup" --max-results 75

Instructions

When the user invokes this skill:

Step 1: Parse Arguments

Extract from the user's input:

  • Topic/keyword (required): The search term to research
  • --max-results N (optional): Number of videos to analyze (default: 50, max: 100)

Step 2: Get API Key

Check the user's Claude memory for a YouTube Data API v3 key. If not found, ask the user:

"I need a YouTube Data API v3 key to research this topic. You can get one from the Google Cloud Console. Please paste your key."

Step 3: Write the Script

Write the following Python script to a temporary file at /tmp/_yt_topic_researcher_XXXX.py (where XXXX is a random suffix generated via $(openssl rand -hex 4)):

#!/usr/bin/env python3
"""
YouTube Topic Researcher

Searches YouTube for a topic/keyword, fetches video details and channel info,
and outputs structured JSON data for analysis.

Usage:
    YT_API_KEY=YOUR_KEY python3 /tmp/_yt_topic_researcher_XXXX.py "air fryer recipes"
    YT_API_KEY=YOUR_KEY python3 /tmp/_yt_topic_researcher_XXXX.py "Python automation" --max-results 75
"""

import argparse
import json
import os
import re
import sys
from datetime import datetime, timezone
from collections import Counter

try:
    from googleapiclient.discovery import build
    from googleapiclient.errors import HttpError
except ImportError:
    print("ERROR: google-api-python-client not installed. Run: pip3 install google-api-python-client")
    sys.exit(1)


def parse_duration(iso_duration: str) -> int:
    """Parse ISO 8601 duration to seconds."""
    m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso_duration or "")
    if not m:
        return 0
    return int(m.group(1) or 0) * 3600 + int(m.group(2) or 0) * 60 + int(m.group(3) or 0)


def format_number(n: int) -> str:
    if n >= 1_000_000:
        return f"{n/1_000_000:.1f}M"
    if n >= 1_000:
        return f"{n/1_000:.1f}K"
    return str(n)


def days_ago(published_at: str) -> int:
    try:
        pub = datetime.fromisoformat(published_at.replace("Z", "+00:00"))
        return (datetime.now(timezone.utc) - pub).days
    except Exception:
        return 0


def search_videos(youtube, query: str, max_results: int, order: str = "relevance") -> list:
    """Search YouTube for videos matching query."""
    video_ids = []
    next_page = None
    remaining = max_results

    while remaining > 0:
        batch_size = min(remaining, 50)
        try:
            resp = youtube.search().list(
                part="snippet",
                q=query,
                type="video",
                order=order,
                maxResults=batch_size,
                pageToken=next_page,
            ).execute()
        except HttpError as e:
            print(f"  Search error: {e}")
            break

        for item in resp.get("items", []):
            video_ids.append(item["id"]["videoId"])

        remaining -= batch_size
        next_page = resp.get("nextPageToken")
        if not next_page:
            break

    return video_ids


def fetch_video_details(youtube, video_ids: list) -> list:
    """Batch fetch video details."""
    videos = []
    for i in range(0, len(video_ids), 50):
        batch = video_ids[i:i+50]
        try:
            resp = youtube.videos().list(
                part="snippet,statistics,contentDetails",
                id=",".join(batch),
            ).execute()
            videos.extend(resp.get("items", []))
        except HttpError as e:
            print(f"  Video details error: {e}")
    return videos


def fetch_channel_details(youtube, channel_ids: list) -> dict:
    """Batch fetch channel details. Returns dict keyed by channel ID."""
    channels = {}
    unique_ids = list(set(channel_ids))
    for i in range(0, len(unique_ids), 50):
        batch = unique_ids[i:i+50]
        try:
            resp = youtube.channels().list(
                part="snippet,statistics",
                id=",".join(batch),
            ).execute()
            for item in resp.get("items", []):
                channels[item["id"]] = {
                    "title": item["snippet"]["title"],
                    "subscribers": int(item["statistics"].get("subscriberCount", 0)),
                    "total_views": int(item["statistics"].get("viewCount", 0)),
                    "video_count": int(item["statistics"].get("videoCount", 0)),
                    "created": item["snippet"].get("publishedAt", ""),
                }
        except HttpError as e:
            print(f"  Channel details error: {e}")
    return channels


def extract_tags(videos_data: list) -> list:
    """Extract and count all tags across videos."""
    tag_counter = Counter()
    for v in videos_data:
        tags = v.get("tags", [])
        for tag in tags:
            tag_counter[tag.lower()] += 1
    return tag_counter.most_common(50)


def classify_format(duration_sec: int, title: str) -> str:
    """Classify video format based on duration and title."""
    if duration_sec <= 60:
        return "Short"
    elif duration_sec <= 180:
        return "Short-form"
    elif duration_sec <= 600:
        return "Medium (5-10 min)"
    elif duration_sec <= 1200:
        return "Standard (10-20 min)"
    elif duration_sec <= 2400:
        return "Long-form (20-40 min)"
    else:
        return "Deep-dive (40+ min)"


def analyze_title_patterns(titles: list) -> dict:
    """Analyze common patterns in video titles."""
    patterns = {
        "has_number": 0,
        "is_question": 0,
        "has_how_to": 0,
        "has_brackets": 0,
        "has_caps_word": 0,
        "has_emoji": 0,
        "has_year": 0,
        "has_list_format": 0,
        "has_vs": 0,
        "has_review": 0,
        "avg_length": 0,
        "common_words": [],
    }

    word_counter = Counter()
    stop_words = {"the", "a", "an", "is", "it", "in", "on", "at", "to", "for", "of", "and", "or", "but", "with", "you", "your", "my", "this", "that", "i", "me", "we", "how", "what", "why", "do", "does", "can", "will", "be", "are", "was", "not", "no", "so", "if"}

    for title in titles:
        if re.search(r'\d', title):
            patterns["has_number"] += 1
        if title.rstrip().endswith("?"):
            patterns["is_question"] += 1
        if re.search(r'how\s+to', title, re.I):
            patterns["has_how_to"] += 1
        if re.search(r'[\[\(]', title):
            patterns["has_brackets"] += 1
        if re.search(r'\b[A-Z]{2,}\b', title):
            patterns["has_caps_word"] += 1
        if re.search(r'[^\w\s,.\-!?\'\"()\[\]:;/\\@#$%^&*+=~`|<>]', title):
            patterns["has_emoji"] += 1
        if re.search(r'20[12]\d', title):
            patterns["has_year"] += 1
        if re.search(r'^\d+\s', title):
            patterns["has_list_format"] += 1
        if re.search(r'\bvs\.?\b', title, re.I):
            patterns["has_vs"] += 1
        if re.search(r'\breview\b', title, re.I):
            patterns["has_review"] += 1

        words = re.findall(r'[a-zA-Z]{3,}', title.lower())
        for w in words:
            if w not in stop_words:
                word_counter[w] += 1

    total = len(titles) or 1
    patterns["avg_length"] = round(sum(len(t) for t in titles) / total, 1)
    patterns["common_words"] = word_counter.most_common(20)

    # Convert counts to percentages
    for key in ["has_number", "is_question", "has_how_to", "has_brackets", "has_caps_word", "has_emoji", "has_year", "has_list_format", "has_vs", "has_review"]:
        patterns[key] = round(patterns[key] / total * 100, 1)

    return patterns


def main():
    parser = argparse.ArgumentParser(description="Research a YouTube topic")
    parser.add_argument("topic", help="Topic or keyword to research")
    parser.add_argument("--max-results", type=int, default=50, help="Max videos to analyze (default: 50)")
    parser.add_argument("--output-dir", default=None, help="Output directory")
    args = parser.parse_args()

    api_key = os.environ.get("YT_API_KEY")
    if not api_key:
        print("ERROR: YT_API_KEY environment variable not set.")
        sys.exit(1)

    max_results = min(args.max_results, 100)
    youtube = build("youtube", "v3", developerKey=api_key)

    print(f"Researching topic: {args.topic}")
    print(f"Target: {max_results} videos\n")

    # --- Search by relevance ---
    print("Searching by relevance...")
    relevance_ids = search_videos(youtube, args.topic, max_results, order="relevance")
    print(f"  Found {len(relevance_ids)} videos by relevance")

    # --- Search by view count ---
    print("Searching by view count...")
    viewcount_ids = search_videos(youtube, args.topic, max_results, order="viewCount")
    print(f"  Found {len(viewcount_ids)} videos by view count")

    # --- Search by date (recent) ---
    print("Searching recent uploads...")
    recent_ids = search_videos(youtube, args.topic, 25, order="date")
    print(f"  Found {len(recent_ids)} recent videos")

    # Combine and deduplicate
    all_ids = list(dict.fromkeys(relevance_ids + viewcount_ids + recent_ids))
    print(f"\nTotal unique videos: {len(all_ids)}")

    # --- Fetch video details ---
    print("Fetching video details...")
    raw_videos = fetch_video_details(youtube, all_ids)
    print(f"  Got details for {len(raw_videos)} videos")

    # --- Process video data ---
    channel_ids = [v["snippet"]["channelId"] for v in raw_videos]

    # --- Fetch channel details ---
    print("Fetching channel details...")
    channels = fetch_channel_details(youtube, channel_ids)
    print(f"  Got details for {len(channels)} channels")

    # --- Build processed video list ---
    videos_data = []
    for v in raw_videos:
        duration_sec = parse_duration(v.get("contentDetails", {}).get("duration", ""))
        views = int(v.get("statistics", {}).get("viewCount", 0))
        likes = int(v.get("statistics", {}).get("likeCount", 0))
        comments = int(v.get("statistics", {}).get("commentCount", 0))
        channel_id = v["snippet"]["channelId"]
        channel_info = channels.get(channel_id, {})
        channel_subs = channel_info.get("subscribers", 0)
        age_days = days_ago(v["snippet"]["publishedAt"])

        velocity = round(views / max(age_days, 1), 1)
        outlier_score = round(views / max(channel_subs, 1), 2) if channel_subs > 0 else 0

        videos_data.append({
            "video_id": v["id"],
            "title": v["snippet"]["title"],
            "channel_id": channel_id,
            "channel_name": v["snippet"]["channelTitle"],
            "channel_subs": channel_subs,
            "published_at": v["snippet"]["publishedAt"],
            "age_days": age_days,
            "views": views,
            "likes": likes,
            "comments": comments,
            "duration_sec": duration_sec,
            "format": classify_format(duration_sec, v["snippet"]["title"]),
            "tags": v.get("snippet", {}).get("tags", []),
            "description_preview": v["snippet"].get("description", "")[:200],
            "velocity": velocity,
            "outlier_score": outlier_score,
            "engagement_rate": round((likes + comments) / max(views, 1) * 100, 2),
            "like_rate": round(likes / max(views, 1) * 100, 2),
        })

    videos_data.sort(key=lambda x: x["views"], reverse=True)

    # --- Compute aggregate stats ---
    views_list = [v["views"] for v in videos_data]
    total_views = sum(views_list)
    avg_views = round(total_views / len(views_list)) if views_list else 0
    median_views = sorted(views_list)[len(views_list) // 2] if views_list else 0

    format_counts = Counter(v["format"] for v in videos_data)
    durations = [v["duration_sec"] for v in videos_data if v["duration_sec"] > 0]
    avg_duration = round(sum(durations) / len(durations)) if durations else 0

    titles = [v["title"] for v in videos_data]
    title_patterns = analyze_title_patterns(titles)
    tag_cloud = extract_tags(videos_data)

    channel_counter = Counter(v["channel_id"] for v in videos_data)
    unique_channels = len(channel_counter)
    top_channels = channel_counter.most_common(10)

    outlier_threshold = median_views * 5
    outliers = [v for v in videos_data if v["views"] > outlier_threshold]

    channel_sizes = {"micro (<10K)": 0, "small (10K-100K)": 0, "medium (100K-1M)": 0, "large (1M+)": 0}
    for v in videos_data:
        subs = v["channel_subs"]
        if subs < 10000:
            channel_sizes["micro (<10K)"] += 1
        elif subs < 100000:
            channel_sizes["small (10K-100K)"] += 1
        elif subs < 1000000:
            channel_sizes["medium (100K-1M)"] += 1
        else:
            channel_sizes["large (1M+)"] += 1

    age_buckets = {"last_7_days": 0, "last_30_days": 0, "last_90_days": 0, "last_year": 0, "older": 0}
    for v in videos_data:
        age = v["age_days"]
        if age <= 7:
            age_buckets["last_7_days"] += 1
        elif age <= 30:
            age_buckets["last_30_days"] += 1
        elif age <= 90:
            age_buckets["last_90_days"] += 1
        elif age <= 365:
            age_buckets["last_year"] += 1
        else:
            age_buckets["older"] += 1

    # --- Build output ---
    output = {
        "topic": args.topic,
        "analyzed_at": datetime.now(timezone.utc).isoformat(),
        "total_videos_analyzed": len(videos_data),
        "summary": {
            "total_views": total_views,
            "avg_views": avg_views,
            "median_views": median_views,
            "avg_duration_sec": avg_duration,
            "avg_engagement_rate": round(sum(v["engagement_rate"] for v in videos_data) / len(videos_data), 2) if videos_data else 0,
            "unique_channels": unique_channels,
        },
        "format_distribution": dict(format_counts.most_common()),
        "channel_size_distribution": channel_sizes,
        "age_distribution": age_buckets,
        "title_patterns": title_patterns,
        "tag_cloud": tag_cloud,
        "top_channels": [
            {
                "channel_id": cid,
                "channel_name": channels.get(cid, {}).get("title", "Unknown"),
                "subscribers": channels.get(cid, {}).get("subscribers", 0),
                "videos_in_results": count,
            }
            for cid, count in top_channels
        ],
        "outlier_videos": [
            {
                "title": v["title"],
                "video_id": v["video_id"],
                "views": v["views"],
                "channel_name": v["channel_name"],
                "channel_subs": v["channel_subs"],
                "outlier_score": v["outlier_score"],
                "age_days": v["age_days"],
            }
            for v in outliers[:10]
        ],
        "top_videos": [
            {
                "title": v["title"],
                "video_id": v["video_id"],
                "views": v["views"],
                "likes": v["likes"],
                "comments": v["comments"],
                "channel_name": v["channel_name"],
                "channel_subs": v["channel_subs"],
                "duration_sec": v["duration_sec"],
                "format": v["format"],
                "engagement_rate": v["engagement_rate"],
                "velocity": v["velocity"],
                "age_days": v["age_days"],
                "published_at": v["published_at"],
            }
            for v in videos_data[:20]
        ],
        "all_videos": videos_data,
        "quota_used": {
            "search_calls": 3,
            "search_units": 300,
            "video_detail_calls": (len(all_ids) + 49) // 50,
            "video_detail_units": (len(all_ids) + 49) // 50,
            "channel_detail_calls": (len(set(channel_ids)) + 49) // 50,
            "channel_detail_units": (len(set(channel_ids)) + 49) // 50,
            "total_estimated": 300 + (len(all_ids) + 49) // 50 + (len(set(channel_ids)) + 49) // 50,
        },
    }

    # --- Save output ---
    safe_topic = re.sub(r'[^a-zA-Z0-9_-]', '_', args.topic)[:100]
    date_str = datetime.now().strftime("%Y%m%d")
    output_dir = args.output_dir or f"yt_topic_{safe_topic}_{date_str}"
    os.makedirs(output_dir, exist_ok=True)

    output_file = os.path.join(output_dir, "topic_research_data.json")
    try:
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(output, f, indent=2, ensure_ascii=False)
    except (IOError, OSError) as e:
        print(f"ERROR: Could not write output file: {e}")
        sys.exit(1)

    print(f"\nData saved to: {output_file}")
    print(f"Videos analyzed: {len(videos_data)}")
    print(f"Unique channels: {unique_channels}")
    print(f"Estimated quota used: ~{output['quota_used']['total_estimated']} units")

    return output_file


if __name__ == "__main__":
    main()

Step 4: Install Dependencies

pip3 install google-api-python-client

Suppress output unless there's an error.

Step 5: Run the Script

YT_API_KEY=API_KEY python3 /tmp/_yt_topic_researcher_XXXX.py "TOPIC" [--max-results N]

Replace:

  • TOPIC with the user's topic/keyword
  • API_KEY with the key from Step 2
  • _yt_topic_researcher_XXXX.py with the actual temp file name from Step 3
  • Add --max-results N only if the user specified it

Step 6: Clean Up

Delete the temporary script file and JSON output:

rm /tmp/_yt_topic_researcher_XXXX.py
rm -f yt_topic_*/topic_research_data.json

Replace _yt_topic_researcher_XXXX.py with the actual temp file name from Step 3.

Step 7: Read the Data

Read the generated topic_research_data.json file from the output directory.

Step 8: Generate the Analysis Report

Using the JSON data, write a comprehensive markdown report to yt_topic_[topic]_[YYYYMMDD]/topic_research_report.md with the following sections:

Report Structure:

# Topic Research: [Topic]
*Analyzed [date] | [N] videos across [N] channels*

## Executive Summary
- 3-4 bullet points: Is this niche worth entering? Key findings at a glance.
- Overall assessment: Saturated / Growing / Underserved / Emerging

## Market Overview
| Metric | Value |
|--------|-------|
| Videos Analyzed | |
| Total Views (sample) | |
| Average Views | |
| Median Views | |
| Avg Engagement Rate | |
| Unique Channels | |
| Avg Video Duration | |

## Performance Benchmarks
- What view count = "good" in this niche (based on percentiles)
- 25th / 50th / 75th / 90th percentile views
- Engagement rate benchmarks

## Content Format Analysis
Table showing format breakdown (Short, Medium, Long-form, etc.) with avg views per format.
Which format performs best? Which is most common?

## Channel Landscape
- Channel size distribution (micro/small/medium/large)
- Top channels dominating the results
- Is this a "winner take all" niche or distributed?
- Opportunities for small channels

## Title Patterns That Work
- Data from title_patterns analysis
- Most common words/phrases
- Title formulas used by top performers
- What distinguishes high-performing titles

## Tag Cloud & SEO
- Top tags used
- Tag clusters (groups of related tags)
- Missing tag opportunities

## Outlier Videos (Breakout Hits)
Table of outlier videos with views, channel size, outlier score.
What do these have in common? Why did they break out?

## Content Freshness
- Age distribution of top results
- Is YouTube favoring new or evergreen content for this topic?
- Recency signals

## Content Gaps & Opportunities
Based on all the data:
- Subtopics underrepresented in results
- Formats not being used effectively
- Angle/perspective gaps
- Audience segments not being served

## Video Ideas (Data-Backed)
3-5 specific video ideas with:
- Suggested title
- Why this would work (data backing)
- Target format and duration
- Key tags to use

## Saturation Assessment
- Competition density score (unique channels / total videos)
- Big channel dominance percentage
- Recent content velocity
- Final verdict: Is this niche worth entering?

## Quota Usage
| Operation | Units |
|-----------|-------|
| search.list | |
| videos.list | |
| channels.list | |
| **Total** | |

Step 9: Report Completion

Tell the user:

  • Output folder path with both files (data JSON + report MD)
  • Key finding highlight (most interesting insight)
  • Saturation assessment summary
  • Number of video ideas generated
  • Quota units consumed
Related skills

More from nikhilbhansali/youtube-data-skills

Installs
2
First Seen
Mar 30, 2026