youtube-trending-scanner

Installation
SKILL.md

YouTube Trending Scanner

Scan what's trending right now in any YouTube niche -- find breakout videos, rising channels, and emerging topics.

Usage

/youtube-trending-scanner "meditation"
/youtube-trending-scanner "AI tools" --days 14
/youtube-trending-scanner "home cooking" --days 30

Instructions

When the user invokes this skill:

Step 1: Parse Arguments

Extract:

  • Niche/keyword (required): The niche to scan
  • --days N (optional): Time window to scan (default: 14, max: 30)

Step 2: Get API Key

Check Claude memory for YouTube Data API v3 key. If not found, ask:

"I need a YouTube Data API v3 key. You can get one from the Google Cloud Console. Please paste your key."

Step 3: Write the Script

Write the following Python script to /tmp/_yt_trending_scanner_XXXX.py (where XXXX is a random suffix, e.g. $(openssl rand -hex 4)):

#!/usr/bin/env python3
"""
YouTube Trending Scanner

Scans recent videos in a niche to identify trending topics,
velocity outliers, and rising channels.
"""

import argparse
import json
import os
import re
import sys
from datetime import datetime, timezone, timedelta
from collections import Counter

try:
    from googleapiclient.discovery import build
    from googleapiclient.errors import HttpError
except ImportError:
    print("ERROR: google-api-python-client not installed. Run: pip3 install google-api-python-client")
    sys.exit(1)


def parse_duration(iso_duration: str) -> int:
    m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso_duration or "")
    if not m:
        return 0
    return int(m.group(1) or 0) * 3600 + int(m.group(2) or 0) * 60 + int(m.group(3) or 0)


def format_number(n: int) -> str:
    if n >= 1_000_000: return f"{n/1_000_000:.1f}M"
    if n >= 1_000: return f"{n/1_000:.1f}K"
    return str(n)


def days_since(published_at: str) -> float:
    try:
        pub = datetime.fromisoformat(published_at.replace("Z", "+00:00"))
        delta = datetime.now(timezone.utc) - pub
        return max(delta.total_seconds() / 86400, 0.1)
    except Exception:
        return 1


def search_recent(youtube, query: str, days: int, order: str, max_results: int = 50) -> list:
    """Search for recent videos in a time window."""
    after = (datetime.now(timezone.utc) - timedelta(days=days)).strftime("%Y-%m-%dT00:00:00Z")
    video_ids = []
    next_page = None
    remaining = max_results

    while remaining > 0:
        batch = min(remaining, 50)
        try:
            resp = youtube.search().list(
                part="snippet",
                q=query,
                type="video",
                order=order,
                publishedAfter=after,
                maxResults=batch,
                pageToken=next_page,
            ).execute()
        except HttpError as e:
            print(f"  Search error: {e}")
            break

        for item in resp.get("items", []):
            video_ids.append(item["id"]["videoId"])

        remaining -= batch
        next_page = resp.get("nextPageToken")
        if not next_page:
            break

    return video_ids


def fetch_video_details(youtube, video_ids: list) -> list:
    videos = []
    for i in range(0, len(video_ids), 50):
        batch = video_ids[i:i+50]
        try:
            resp = youtube.videos().list(
                part="snippet,statistics,contentDetails",
                id=",".join(batch),
            ).execute()
            videos.extend(resp.get("items", []))
        except HttpError as e:
            print(f"  Video details error: {e}")
    return videos


def fetch_channel_details(youtube, channel_ids: list) -> dict:
    channels = {}
    unique_ids = list(set(channel_ids))
    for i in range(0, len(unique_ids), 50):
        batch = unique_ids[i:i+50]
        try:
            resp = youtube.channels().list(
                part="snippet,statistics",
                id=",".join(batch),
            ).execute()
            for item in resp.get("items", []):
                channels[item["id"]] = {
                    "title": item["snippet"]["title"],
                    "subscribers": int(item["statistics"].get("subscriberCount", 0)),
                    "total_views": int(item["statistics"].get("viewCount", 0)),
                    "video_count": int(item["statistics"].get("videoCount", 0)),
                    "created": item["snippet"].get("publishedAt", ""),
                }
        except HttpError as e:
            print(f"  Channel details error: {e}")
    return channels


def main():
    parser = argparse.ArgumentParser(description="Scan YouTube trends in a niche")
    parser.add_argument("niche", help="Niche/keyword to scan")
    parser.add_argument("--days", type=int, default=14, help="Time window in days (default: 14)")
    parser.add_argument("--output-dir", default=None)
    args = parser.parse_args()

    api_key = os.environ.get("YT_API_KEY")
    if not api_key:
        print("ERROR: YT_API_KEY environment variable not set.")
        sys.exit(1)

    days = min(args.days, 30)
    youtube = build("youtube", "v3", developerKey=api_key)
    quota_used = 0

    print(f"Scanning trends in: {args.niche}")
    print(f"Time window: last {days} days\n")

    # --- Search recent by relevance ---
    print("Searching recent videos (by relevance)...")
    relevance_ids = search_recent(youtube, args.niche, days, "relevance", 50)
    quota_used += 100
    print(f"  Found {len(relevance_ids)}")

    # --- Search recent by view count ---
    print("Searching recent videos (by view count)...")
    viewcount_ids = search_recent(youtube, args.niche, days, "viewCount", 50)
    quota_used += 100
    print(f"  Found {len(viewcount_ids)}")

    # --- Search recent by date ---
    print("Searching newest uploads...")
    date_ids = search_recent(youtube, args.niche, days, "date", 50)
    quota_used += 100
    print(f"  Found {len(date_ids)}")

    # --- Also search with broader window for baseline ---
    print("Fetching baseline (last 90 days by viewCount)...")
    baseline_after = (datetime.now(timezone.utc) - timedelta(days=90)).strftime("%Y-%m-%dT00:00:00Z")
    try:
        baseline_resp = youtube.search().list(
            part="snippet", q=args.niche, type="video",
            order="viewCount", publishedAfter=baseline_after, maxResults=50,
        ).execute()
        baseline_ids = [item["id"]["videoId"] for item in baseline_resp.get("items", [])]
        quota_used += 100
    except HttpError:
        baseline_ids = []
    print(f"  Found {len(baseline_ids)} baseline videos")

    # Combine all
    all_ids = list(dict.fromkeys(relevance_ids + viewcount_ids + date_ids + baseline_ids))
    print(f"\nTotal unique videos: {len(all_ids)}")

    # --- Fetch details ---
    print("Fetching video details...")
    raw_videos = fetch_video_details(youtube, all_ids)
    quota_used += (len(all_ids) + 49) // 50
    print(f"  Got {len(raw_videos)} videos")

    channel_ids = [v["snippet"]["channelId"] for v in raw_videos]
    print("Fetching channel details...")
    channels = fetch_channel_details(youtube, channel_ids)
    quota_used += (len(set(channel_ids)) + 49) // 50
    print(f"  Got {len(channels)} channels")

    # --- Process ---
    videos_data = []
    for v in raw_videos:
        views = int(v.get("statistics", {}).get("viewCount", 0))
        likes = int(v.get("statistics", {}).get("likeCount", 0))
        comments = int(v.get("statistics", {}).get("commentCount", 0))
        duration = parse_duration(v.get("contentDetails", {}).get("duration", ""))
        channel_id = v["snippet"]["channelId"]
        ch = channels.get(channel_id, {})
        subs = ch.get("subscribers", 0)
        age_days = days_since(v["snippet"]["publishedAt"])
        velocity = round(views / max(age_days, 0.1))

        vs_ratio = round(views / max(subs, 1), 2) if subs > 0 else 0

        videos_data.append({
            "video_id": v["id"],
            "title": v["snippet"]["title"],
            "channel_id": channel_id,
            "channel_name": v["snippet"]["channelTitle"],
            "channel_subs": subs,
            "published_at": v["snippet"]["publishedAt"],
            "age_days": round(age_days, 1),
            "views": views,
            "likes": likes,
            "comments": comments,
            "duration_sec": duration,
            "velocity": velocity,
            "vs_ratio": vs_ratio,
            "engagement_rate": round((likes + comments) / max(views, 1) * 100, 2),
            "tags": v.get("snippet", {}).get("tags", []),
            "is_recent": age_days <= days,
        })

    # Split into recent and baseline
    recent = [v for v in videos_data if v["is_recent"]]
    baseline = [v for v in videos_data if not v["is_recent"]]

    # --- Find velocity outliers ---
    if recent:
        velocities = [v["velocity"] for v in recent]
        avg_velocity = sum(velocities) / len(velocities)
        median_velocity = sorted(velocities)[len(velocities) // 2]

        velocity_outliers = [v for v in recent if v["velocity"] > avg_velocity * 3]
        velocity_outliers.sort(key=lambda x: x["velocity"], reverse=True)
    else:
        avg_velocity = 0
        median_velocity = 0
        velocity_outliers = []

    # --- Find rising channels (small channels with high views) ---
    rising_channels = []
    channel_videos = {}
    for v in recent:
        cid = v["channel_id"]
        if cid not in channel_videos:
            channel_videos[cid] = []
        channel_videos[cid].append(v)

    for cid, vids in channel_videos.items():
        ch = channels.get(cid, {})
        subs = ch.get("subscribers", 0)
        if subs < 100000:
            total_recent_views = sum(v["views"] for v in vids)
            avg_vs_ratio = sum(v["vs_ratio"] for v in vids) / len(vids)
            if avg_vs_ratio > 1.0 or total_recent_views > subs * 2:
                rising_channels.append({
                    "channel_id": cid,
                    "channel_name": ch.get("title", "Unknown"),
                    "subscribers": subs,
                    "recent_videos": len(vids),
                    "total_recent_views": total_recent_views,
                    "avg_vs_ratio": round(avg_vs_ratio, 2),
                    "top_video": max(vids, key=lambda x: x["views"])["title"],
                })

    rising_channels.sort(key=lambda x: x["avg_vs_ratio"], reverse=True)

    # --- Trending topics (word frequency in recent titles) ---
    word_counter = Counter()
    bigram_counter = Counter()
    stop_words = {"the", "a", "an", "is", "it", "in", "on", "at", "to", "for", "of", "and", "or", "but", "with", "you", "your", "my", "this", "that", "i", "me", "we", "how", "what", "why", "do", "does", "can", "will", "be", "are", "was", "not", "no", "so", "if", "its", "just", "like", "get", "new", "one"}

    for v in recent:
        words = re.findall(r'[a-zA-Z]{3,}', v["title"].lower())
        filtered = [w for w in words if w not in stop_words]
        for w in filtered:
            word_counter[w] += 1
        for i in range(len(filtered) - 1):
            bigram_counter[f"{filtered[i]} {filtered[i+1]}"] += 1

    # --- Publishing velocity ---
    recent_per_day = len(recent) / max(days, 1)
    baseline_per_day = len(baseline) / 90 if baseline else 0

    # --- Format distribution ---
    format_counter = Counter()
    for v in recent:
        d = v["duration_sec"]
        if d <= 60: fmt = "Short (<1 min)"
        elif d <= 300: fmt = "Short-form (1-5 min)"
        elif d <= 900: fmt = "Medium (5-15 min)"
        elif d <= 1800: fmt = "Standard (15-30 min)"
        else: fmt = "Long-form (30+ min)"
        format_counter[fmt] += 1

    # --- Build output ---
    output = {
        "niche": args.niche,
        "time_window_days": days,
        "analyzed_at": datetime.now(timezone.utc).isoformat(),
        "summary": {
            "total_recent_videos": len(recent),
            "total_baseline_videos": len(baseline),
            "recent_publishing_rate": round(recent_per_day, 2),
            "baseline_publishing_rate": round(baseline_per_day, 2),
            "avg_velocity": round(avg_velocity),
            "median_velocity": round(median_velocity),
            "unique_channels_recent": len(channel_videos),
        },
        "velocity_outliers": [
            {
                "title": v["title"],
                "video_id": v["video_id"],
                "views": v["views"],
                "velocity": v["velocity"],
                "age_days": v["age_days"],
                "channel_name": v["channel_name"],
                "channel_subs": v["channel_subs"],
                "vs_ratio": v["vs_ratio"],
            }
            for v in velocity_outliers[:15]
        ],
        "rising_channels": rising_channels[:10],
        "trending_topics": {
            "words": word_counter.most_common(30),
            "bigrams": bigram_counter.most_common(20),
        },
        "format_distribution": dict(format_counter.most_common()),
        "top_recent_videos": [
            {
                "title": v["title"],
                "video_id": v["video_id"],
                "views": v["views"],
                "velocity": v["velocity"],
                "channel_name": v["channel_name"],
                "channel_subs": v["channel_subs"],
                "age_days": v["age_days"],
                "engagement_rate": v["engagement_rate"],
            }
            for v in sorted(recent, key=lambda x: x["views"], reverse=True)[:20]
        ],
        "all_recent_videos": recent,
        "quota_used": {"total_estimated": quota_used},
    }

    # --- Save ---
    safe_niche = re.sub(r'[^a-zA-Z0-9_-]', '_', args.niche)[:100]
    date_str = datetime.now().strftime("%Y%m%d")
    output_dir = args.output_dir or f"yt_trending_{safe_niche}_{date_str}"
    os.makedirs(output_dir, exist_ok=True)

    output_file = os.path.join(output_dir, "trending_data.json")
    try:
        with open(output_file, "w", encoding="utf-8") as f:
            json.dump(output, f, indent=2, ensure_ascii=False)
    except (IOError, OSError) as e:
        print(f"ERROR: Could not write output file: {e}")
        sys.exit(1)

    print(f"\nData saved to: {output_file}")
    print(f"Recent videos: {len(recent)}")
    print(f"Velocity outliers: {len(velocity_outliers)}")
    print(f"Rising channels: {len(rising_channels)}")
    print(f"Estimated quota used: ~{quota_used} units")


if __name__ == "__main__":
    main()

Step 4: Install Dependencies

pip3 install google-api-python-client

Step 5: Run the Script

YT_API_KEY=API_KEY python3 /tmp/_yt_trending_scanner_XXXX.py "NICHE" [--days N]

Step 6: Clean Up

rm -f /tmp/_yt_trending_scanner_XXXX.py

Step 7: Read the Data

Read the generated trending_data.json file.

Step 8: Generate the Trend Report

Write a report to the output directory as trending_report.md:

# Trending Scanner: [Niche]
*Scanned [date] | Last [N] days | [N] videos analyzed*

## Hot Right Now
Overall trend assessment: Is this niche heating up, stable, or cooling down?
Compare recent publishing rate vs baseline.

## Breakout Videos (Velocity Outliers)
| # | Title | Views | Velocity (views/day) | Channel | Channel Size | Age |
|---|-------|-------|---------------------|---------|--------------|-----|
These videos are getting disproportionate views. What do they have in common?

## Trending Topics
Words and phrases appearing frequently in recent high-performing content.
Topic clusters and emerging themes.

## Rising Channels
Small channels getting unusual traction right now.
| Channel | Subs | Recent Videos | Recent Views | Avg View/Sub Ratio |
|---------|------|---------------|--------------|-------------------|

## Format Trends
What formats are being used? Which are performing best?
Shorts vs long-form breakdown.

## Content Velocity
- Current niche publishing rate vs baseline
- Is competition increasing or decreasing?
- Saturation signals

## Timely Content Recommendations
3-5 specific video ideas based on current trends:
- What to make THIS WEEK
- Why (data backing)
- Format and angle recommendation

## Trend Assessment
- Growing / Stable / Declining
- First-mover opportunities
- Risks and considerations

## Quota Usage
| Operation | Units |
|-----------|-------|

Step 9: Report Completion

Tell the user:

  • Output folder path
  • Trend assessment (growing/stable/declining)
  • Top breakout video highlight
  • Number of rising channels found
  • Quota consumed
Related skills

More from nikhilbhansali/youtube-data-skills

Installs
2
First Seen
Mar 30, 2026