youtube-thumbnails

Installation
SKILL.md

YouTube Thumbnail Downloader

Downloads the top 10 most-viewed thumbnails per category (regular videos, Shorts, live streams) from any YouTube channel.

Usage

/youtube-thumbnails @ChannelHandle
/youtube-thumbnails @ChannelHandle --skip-verify
/youtube-thumbnails https://youtube.com/@Handle
/youtube-thumbnails UCxxxxxxx

Instructions

When the user invokes this skill:

Step 1: Parse Arguments

Extract from the user's input:

  • Channel identifier (required): @handle, channel URL, or channel ID (UCxxxxxxx)
  • --skip-verify (optional): if present, skip Level 2 HTTP verification for faster execution

Step 2: Get API Key

Check the user's Claude memory for a YouTube Data API v3 key. If not found, ask the user:

"I need a YouTube Data API v3 key to fetch channel data. You can get one from the Google Cloud Console. Please paste your key."

Step 3: Write the Script

Write the following Python script to a temporary file in the current working directory named _yt_thumb_downloader.py:

#!/usr/bin/env python3
"""
YouTube Channel Thumbnail Downloader

Fetches top 10 most-viewed videos, Shorts, and live streams from a YouTube channel,
downloads their thumbnails, and generates an Obsidian-compatible markdown index.

Usage:
    python3 _yt_thumb_downloader.py @ChannelHandle --api-key YOUR_KEY
    python3 _yt_thumb_downloader.py "https://youtube.com/@Handle" --api-key YOUR_KEY
    python3 _yt_thumb_downloader.py UCxxxxxxx --api-key YOUR_KEY
"""

import argparse
import os
import re
import sys
import requests
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError


# ---------------------------------------------------------------------------
# Channel Resolution
# ---------------------------------------------------------------------------

def resolve_channel(youtube, channel_input: str) -> dict | None:
    """Resolve @handle, URL, or channel ID to channel info.

    Returns dict with keys: id, title, uploads_playlist, subscriber_count, video_count
    """
    # Strip URL wrapper if provided
    raw = channel_input.strip().rstrip("/")
    # Extract handle or ID from various URL formats
    for pattern in [
        r"youtube\.com/@([\w.-]+)",
        r"youtube\.com/channel/(UC[\w-]{22})",
        r"youtube\.com/c/([\w.-]+)",
    ]:
        m = re.search(pattern, raw)
        if m:
            raw = m.group(1)
            break

    # Direct channel ID
    if raw.startswith("UC") and len(raw) == 24:
        return _fetch_channel_by_id(youtube, raw)

    # Try forHandle (1 quota unit, most reliable)
    handle = raw.lstrip("@")
    try:
        resp = youtube.channels().list(
            part="snippet,statistics,contentDetails",
            forHandle=handle,
        ).execute()
        if resp.get("items"):
            return _parse_channel(resp["items"][0])
    except HttpError:
        pass

    # Fallback: search
    try:
        search_resp = youtube.search().list(
            part="snippet", q=handle, type="channel", maxResults=5
        ).execute()
        for item in search_resp.get("items", []):
            cid = item["snippet"]["channelId"]
            return _fetch_channel_by_id(youtube, cid)
    except HttpError as e:
        print(f"  Search fallback failed: {e}")

    return None


def _fetch_channel_by_id(youtube, channel_id: str) -> dict | None:
    resp = youtube.channels().list(
        part="snippet,statistics,contentDetails",
        id=channel_id,
    ).execute()
    if resp.get("items"):
        return _parse_channel(resp["items"][0])
    return None


def _parse_channel(item: dict) -> dict:
    return {
        "id": item["id"],
        "title": item["snippet"]["title"],
        "uploads_playlist": item["contentDetails"]["relatedPlaylists"]["uploads"],
        "subscriber_count": int(item["statistics"].get("subscriberCount", 0)),
        "video_count": int(item["statistics"].get("videoCount", 0)),
    }


# ---------------------------------------------------------------------------
# Fetch All Videos from Uploads Playlist
# ---------------------------------------------------------------------------

def fetch_all_video_ids(youtube, uploads_playlist: str) -> list[str]:
    """Paginate through uploads playlist and collect all video IDs."""
    video_ids = []
    next_page = None

    while True:
        resp = youtube.playlistItems().list(
            part="contentDetails",
            playlistId=uploads_playlist,
            maxResults=50,
            pageToken=next_page,
        ).execute()

        for item in resp.get("items", []):
            video_ids.append(item["contentDetails"]["videoId"])

        next_page = resp.get("nextPageToken")
        if not next_page:
            break

    return video_ids


# ---------------------------------------------------------------------------
# Fetch Details & Classify
# ---------------------------------------------------------------------------

def parse_duration(iso_duration: str) -> int:
    """Parse ISO 8601 duration (e.g. PT4M13S) to seconds."""
    m = re.match(r"PT(?:(\d+)H)?(?:(\d+)M)?(?:(\d+)S)?", iso_duration or "")
    if not m:
        return 0
    h = int(m.group(1) or 0)
    mins = int(m.group(2) or 0)
    s = int(m.group(3) or 0)
    return h * 3600 + mins * 60 + s


def format_views(count: int) -> str:
    """Format view count for display."""
    if count >= 1_000_000:
        return f"{count / 1_000_000:.1f}M"
    if count >= 1_000:
        return f"{count / 1_000:.1f}K"
    return str(count)


def verify_is_short(session: requests.Session, video_id: str) -> bool:
    """HTTP probe: check if a video is actually a YouTube Short.

    GET /shorts/{id} returns 200 if it's a Short, or 303 redirect to /watch?v= if not.
    """
    try:
        resp = session.head(
            f"https://www.youtube.com/shorts/{video_id}",
            allow_redirects=False,
            timeout=10,
        )
        return resp.status_code == 200
    except requests.RequestException:
        return False  # conservative: don't assume Short


def verify_is_livestream(session: requests.Session, video_id: str) -> bool:
    """HTTP page check: distinguish real live streams from premieres.

    Real live streams have "Streamed live" on the page; premieres show "Premiered" instead.
    """
    try:
        resp = session.get(
            f"https://www.youtube.com/watch?v={video_id}",
            headers={"Accept-Language": "en"},
            timeout=15,
        )
        return "Streamed live" in resp.text
    except requests.RequestException:
        return True  # conservative: keep Level 1 result


def fetch_and_classify(youtube, video_ids: list[str], skip_verify: bool = False) -> dict:
    """Batch-fetch video details and classify into videos, shorts, live_streams.

    Level 1: API-based classification (duration, liveStreamingDetails).
    Level 2: HTTP-based verification (unless skip_verify is True).

    Returns dict with keys: videos, shorts, live_streams — each a list of dicts
    sorted by view count descending, limited to top 10.
    """
    all_videos = []

    # --- Level 1: API-based candidate selection ---
    for i in range(0, len(video_ids), 50):
        batch = video_ids[i : i + 50]
        resp = youtube.videos().list(
            part="snippet,statistics,contentDetails,liveStreamingDetails",
            id=",".join(batch),
        ).execute()

        for item in resp.get("items", []):
            duration_sec = parse_duration(
                item.get("contentDetails", {}).get("duration", "")
            )
            view_count = int(item.get("statistics", {}).get("viewCount", 0))
            live_details = item.get("liveStreamingDetails", {})
            is_live = bool(
                live_details.get("actualStartTime")
                or live_details.get("scheduledStartTime")
            )

            # Level 1 classify (priority: live stream > short > regular)
            if is_live:
                category = "live_streams"
            elif 0 < duration_sec <= 180:
                category = "shorts"
            else:
                category = "videos"

            all_videos.append({
                "id": item["id"],
                "title": item["snippet"]["title"],
                "view_count": view_count,
                "duration_sec": duration_sec,
                "category": category,
                "published_at": item["snippet"]["publishedAt"],
            })

    # --- Level 2: HTTP-based verification ---
    if not skip_verify:
        session = requests.Session()

        # Verify live stream candidates
        live_candidates = [v for v in all_videos if v["category"] == "live_streams"]
        if live_candidates:
            print(f"\n  Verifying {len(live_candidates)} live stream candidates...")
            confirmed = 0
            for idx, video in enumerate(live_candidates, 1):
                if verify_is_livestream(session, video["id"]):
                    confirmed += 1
                else:
                    video["category"] = "videos"
                if idx % 10 == 0 or idx == len(live_candidates):
                    print(f"    {idx}/{len(live_candidates)} checked, {confirmed} confirmed...")

        # Verify short candidates
        short_candidates = [v for v in all_videos if v["category"] == "shorts"]
        if short_candidates:
            print(f"\n  Verifying {len(short_candidates)} short candidates...")
            confirmed = 0
            for idx, video in enumerate(short_candidates, 1):
                if verify_is_short(session, video["id"]):
                    confirmed += 1
                else:
                    video["category"] = "videos"
                if idx % 10 == 0 or idx == len(short_candidates):
                    print(f"    {idx}/{len(short_candidates)} checked, {confirmed} confirmed...")

        session.close()

    # Split, sort, and take top 10 per category
    result = {}
    for cat in ("videos", "shorts", "live_streams"):
        items = [v for v in all_videos if v["category"] == cat]
        items.sort(key=lambda v: v["view_count"], reverse=True)
        result[cat] = items[:10]

    return result


# ---------------------------------------------------------------------------
# Download Thumbnails
# ---------------------------------------------------------------------------

def download_thumbnail(video_id: str, title: str, rank: int, output_dir: str) -> str | None:
    """Download the highest-quality thumbnail available.

    Returns the saved filename or None on failure.
    """
    urls = [
        f"https://img.youtube.com/vi/{video_id}/maxresdefault.jpg",
        f"https://img.youtube.com/vi/{video_id}/sddefault.jpg",
        f"https://img.youtube.com/vi/{video_id}/hqdefault.jpg",
        f"https://img.youtube.com/vi/{video_id}/mqdefault.jpg",
    ]

    safe_title = "".join(c for c in title[:50] if c.isalnum() or c in " -_").strip()
    safe_title = re.sub(r"\s+", "_", safe_title)
    filename = f"{rank:02d}_{safe_title}_{video_id}.jpg"
    filepath = os.path.join(output_dir, filename)

    for url in urls:
        try:
            resp = requests.get(url, timeout=10)
            if resp.status_code == 200 and len(resp.content) > 1000:
                with open(filepath, "wb") as f:
                    f.write(resp.content)
                return filename
        except Exception:
            continue

    return None


def download_category_thumbnails(category_items: list[dict], output_dir: str) -> list[dict]:
    """Download thumbnails for a list of categorized videos.

    Returns the items list with 'filename' added to each entry.
    """
    os.makedirs(output_dir, exist_ok=True)
    for rank, item in enumerate(category_items, start=1):
        filename = download_thumbnail(item["id"], item["title"], rank, output_dir)
        item["filename"] = filename
        status = "OK" if filename else "FAILED"
        print(f"  [{status}] {rank:2d}. {item['title'][:60]}")
    return category_items


# ---------------------------------------------------------------------------
# Generate index.md
# ---------------------------------------------------------------------------

def generate_index(channel: dict, classified: dict, base_dir: str):
    """Write an Obsidian-compatible index.md with embedded thumbnails."""
    lines = []

    # Header
    lines.append(f"# {channel['title']} - Top Thumbnails\n")
    lines.append(f"**Channel:** [youtube.com/channel/{channel['id']}](https://youtube.com/channel/{channel['id']})")
    lines.append(f"**Subscribers:** {format_views(channel['subscriber_count'])}")
    lines.append(f"**Total videos:** {channel['video_count']}\n")

    # Content breakdown
    lines.append("## Content Breakdown\n")
    lines.append(f"| Category | Total Found | Top Shown |")
    lines.append(f"|----------|------------|-----------|")
    for cat, label in [("videos", "Regular Videos"), ("shorts", "Shorts"), ("live_streams", "Live Streams")]:
        count = len(classified[cat])
        lines.append(f"| {label} | {count} | {min(count, 10)} |")
    lines.append("")

    # Each category section
    category_config = [
        ("videos", "Top Videos", "videos"),
        ("shorts", "Top Shorts", "shorts"),
        ("live_streams", "Top Live Streams", "live_streams"),
    ]

    for cat_key, heading, folder in category_config:
        items = classified[cat_key]
        if not items:
            continue

        lines.append(f"## {heading}\n")
        for rank, item in enumerate(items, start=1):
            title = item["title"]
            views = format_views(item["view_count"])
            vid_url = f"https://youtube.com/watch?v={item['id']}"
            lines.append(f"### {rank}. [{title}]({vid_url})")
            lines.append(f"**Views:** {views}\n")
            if item.get("filename"):
                lines.append(f"![]({folder}/{item['filename']})\n")
            else:
                lines.append("*Thumbnail not available*\n")

    index_path = os.path.join(base_dir, "index.md")
    with open(index_path, "w", encoding="utf-8") as f:
        f.write("\n".join(lines))

    return index_path


# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------

def main():
    parser = argparse.ArgumentParser(
        description="Download top thumbnails from a YouTube channel"
    )
    parser.add_argument(
        "channel",
        help="@handle, channel URL, or channel ID (UCxxxxxxx)",
    )
    parser.add_argument(
        "--api-key",
        required=True,
        help="YouTube Data API v3 key",
    )
    parser.add_argument(
        "--output-dir",
        default=None,
        help="Output directory (default: Channel_Name/)",
    )
    parser.add_argument(
        "--skip-verify",
        action="store_true",
        help="Skip Level 2 HTTP verification (faster, but may misclassify premieres/shorts)",
    )
    args = parser.parse_args()

    youtube = build("youtube", "v3", developerKey=args.api_key)

    # Step 1: Resolve channel
    print(f"Resolving channel: {args.channel}")
    channel = resolve_channel(youtube, args.channel)
    if not channel:
        print("ERROR: Could not resolve channel. Check the handle/URL/ID.")
        sys.exit(1)
    print(f"  Found: {channel['title']} ({channel['video_count']} videos)")

    # Set up output directory
    safe_name = "".join(c for c in channel["title"] if c.isalnum() or c in " -_").strip()
    safe_name = re.sub(r"\s+", "_", safe_name)
    base_dir = args.output_dir or safe_name
    os.makedirs(base_dir, exist_ok=True)

    # Step 2: Fetch all video IDs
    print(f"\nFetching uploads playlist...")
    video_ids = fetch_all_video_ids(youtube, channel["uploads_playlist"])
    print(f"  Found {len(video_ids)} uploads")

    if not video_ids:
        print("No videos found. Exiting.")
        sys.exit(0)

    # Step 3: Fetch details & classify
    print(f"\nFetching video details and classifying...")
    classified = fetch_and_classify(youtube, video_ids, skip_verify=args.skip_verify)
    for cat, label in [("videos", "Videos"), ("shorts", "Shorts"), ("live_streams", "Live streams")]:
        print(f"  {label}: {len(classified[cat])} (showing top {min(len(classified[cat]), 10)})")

    # Step 4: Download thumbnails
    for cat, label, folder in [
        ("videos", "Videos", "videos"),
        ("shorts", "Shorts", "shorts"),
        ("live_streams", "Live Streams", "live_streams"),
    ]:
        items = classified[cat]
        if not items:
            print(f"\n{label}: none found, skipping")
            continue
        print(f"\nDownloading {label} thumbnails...")
        out_path = os.path.join(base_dir, folder)
        download_category_thumbnails(items, out_path)

    # Step 5: Generate index.md
    print(f"\nGenerating index.md...")
    index_path = generate_index(channel, classified, base_dir)
    print(f"  Written to {index_path}")

    print(f"\nDone! Output in: {base_dir}/")


if __name__ == "__main__":
    main()

Step 4: Install Dependencies

pip3 install google-api-python-client requests

Suppress output unless there's an error. These are likely already installed but run it to be safe.

Step 5: Run the Script

python3 _yt_thumb_downloader.py CHANNEL_ARG --api-key API_KEY [--skip-verify]

Replace:

  • CHANNEL_ARG with the channel handle/URL/ID from the user's input
  • API_KEY with the key from Step 2
  • Add --skip-verify only if the user included it

The script outputs to ./Channel_Name/ in the current working directory by default.

Step 6: Clean Up

Delete the temporary script file:

rm _yt_thumb_downloader.py

Step 7: Report Results

Tell the user:

  • Output folder path
  • Number of thumbnails downloaded per category (videos, shorts, live streams)
  • Path to the index.md file
  • Suggest opening index.md in Obsidian to see all thumbnails with embedded images
Related skills

More from nikhilbhansali/youtube-data-skills

Installs
2
First Seen
Mar 30, 2026