social-media-api-integration

Installation
SKILL.md

Social Media API Integration

Build reliable integrations with social platform APIs for automated content distribution.

Platform API Overview

Platform API Style Auth Rate Limits Key Constraint
Bluesky AT Protocol App password / OAuth 3000/5min Decentralized, open protocol
Mastodon REST OAuth 2.0 300/5min per IP Instance-specific endpoints
LinkedIn REST OAuth 2.0 100/day posts Strict content policies
Dev.to REST API Key 30/30s Article-focused
Medium REST OAuth 2.0 + Bearer 100/day Import API only
RSS Pull-based None N/A Read-only syndication

Authentication Patterns

OAuth 2.0 Flow (LinkedIn, Mastodon)

from authlib.integrations.httpx_client import AsyncOAuth2Client

class SocialOAuth:
    def __init__(self, client_id: str, client_secret: str, redirect_uri: str):
        self.client = AsyncOAuth2Client(
            client_id=client_id,
            client_secret=client_secret,
            redirect_uri=redirect_uri,
        )

    def get_auth_url(self, scope: str) -> str:
        url, state = self.client.create_authorization_url(
            "https://platform.example.com/oauth/authorize",
            scope=scope,
        )
        return url

    async def exchange_code(self, code: str) -> dict:
        token = await self.client.fetch_token(  # allow-secret
            "https://platform.example.com/oauth/token",
            code=code,
        )
        return token

API Key Authentication (Dev.to, Bluesky)

import httpx

class DevToClient:
    def __init__(self, api_key: str):  # allow-secret
        self.client = httpx.AsyncClient(
            base_url="https://dev.to/api",
            headers={"api-key": api_key, "Accept": "application/json"},
        )

    async def create_article(self, title: str, body: str, tags: list[str], published: bool = False):
        return await self.client.post("/articles", json={
            "article": {
                "title": title,
                "body_markdown": body,
                "tags": tags,
                "published": published,
            }
        })

Multi-Platform Posting

Content Adapter Pattern

from dataclasses import dataclass
from abc import ABC, abstractmethod

@dataclass
class Post:
    title: str
    body: str
    url: str | None = None
    tags: list[str] | None = None
    image_url: str | None = None

class PlatformAdapter(ABC):
    @abstractmethod
    async def publish(self, post: Post) -> str:
        """Publish and return the post URL."""

    @abstractmethod
    def format_content(self, post: Post) -> dict:
        """Format post for platform constraints."""

class BlueskyAdapter(PlatformAdapter):
    async def publish(self, post: Post) -> str:
        content = self.format_content(post)
        # AT Protocol posting
        response = await self.client.post(
            "com.atproto.repo.createRecord",
            json=content,
        )
        return response["uri"]

    def format_content(self, post: Post) -> dict:
        text = post.body[:300]  # 300 char limit
        if post.url:
            text = f"{text}\n\n{post.url}"
        return {"collection": "app.bsky.feed.post", "record": {"text": text}}

class MastodonAdapter(PlatformAdapter):
    async def publish(self, post: Post) -> str:
        content = self.format_content(post)
        response = await self.client.post("/api/v1/statuses", json=content)
        return response.json()["url"]

    def format_content(self, post: Post) -> dict:
        text = post.body[:500]  # 500 char default
        if post.url:
            text = f"{text}\n\n{post.url}"
        return {"status": text, "visibility": "public"}

Distribution Orchestrator

class ContentDistributor:
    def __init__(self, adapters: dict[str, PlatformAdapter]):
        self.adapters = adapters

    async def distribute(self, post: Post, platforms: list[str] | None = None) -> dict[str, str]:
        targets = platforms or list(self.adapters.keys())
        results = {}
        for platform in targets:
            adapter = self.adapters[platform]
            try:
                url = await adapter.publish(post)
                results[platform] = url
            except Exception as e:
                results[platform] = f"ERROR: {e}"
        return results

Rate Limiting

Client-Side Rate Limiter

import asyncio
import time

class RateLimiter:
    def __init__(self, max_requests: int, window_seconds: float):
        self.max_requests = max_requests
        self.window = window_seconds
        self.requests: list[float] = []
        self.lock = asyncio.Lock()

    async def acquire(self):
        async with self.lock:
            now = time.time()
            self.requests = [t for t in self.requests if now - t < self.window]
            if len(self.requests) >= self.max_requests:
                wait_time = self.requests[0] + self.window - now
                await asyncio.sleep(wait_time)
            self.requests.append(time.time())

Retry on Rate Limit

async def api_call_with_retry(func, *args, max_retries: int = 3):
    for attempt in range(max_retries):
        try:
            return await func(*args)
        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = int(e.response.headers.get("Retry-After", 60))
                await asyncio.sleep(retry_after)
            else:
                raise
    raise Exception("Max retries exceeded")

Scheduling

from datetime import datetime, timedelta

class PostScheduler:
    def __init__(self, distributor: ContentDistributor):
        self.distributor = distributor
        self.queue: list[tuple[datetime, Post, list[str]]] = []

    def schedule(self, post: Post, platforms: list[str], publish_at: datetime):
        self.queue.append((publish_at, post, platforms))
        self.queue.sort(key=lambda x: x[0])

    async def run(self):
        while self.queue:
            publish_at, post, platforms = self.queue[0]
            now = datetime.now()
            if now >= publish_at:
                self.queue.pop(0)
                await self.distributor.distribute(post, platforms)
            else:
                await asyncio.sleep((publish_at - now).total_seconds())

Analytics Retrieval

@dataclass
class PostMetrics:
    views: int
    likes: int
    shares: int
    comments: int
    clicks: int

async def aggregate_metrics(post_urls: dict[str, str]) -> dict[str, PostMetrics]:
    metrics = {}
    for platform, url in post_urls.items():
        adapter = adapters[platform]
        metrics[platform] = await adapter.get_metrics(url)
    return metrics

Anti-Patterns

  • Posting identical content everywhere — Adapt format and tone per platform
  • Ignoring rate limits — Always implement client-side rate limiting
  • Storing tokens in code — Use environment variables or secret managers
  • No error handling on post failure — Queue for retry, log failures
  • Synchronous multi-platform posting — Use async/parallel posting with individual error handling
  • Hardcoded platform URLs — Instance URLs vary (Mastodon), use configuration
Related skills

More from 4444j99/a-i--skills

Installs
1
GitHub Stars
6
First Seen
Apr 19, 2026