atproto-ingest
AT Protocol Data Ingestion
Layer 1 - Data Acquisition for Bluesky/AT Protocol social graph and content.
GF(3) Trit: +1 (Generator) — Produces data streams for downstream processing.
Authentication
App Password (Recommended for scripts)
# Create session
curl -X POST https://bsky.social/xrpc/com.atproto.server.createSession \
-H "Content-Type: application/json" \
-d '{"identifier": "handle.bsky.social", "password": "app-password-here"}'
# Response contains accessJwt and refreshJwt
export BSKY_TOKEN="eyJ..."
OAuth (For apps)
# Authorization endpoint: https://bsky.social/oauth/authorize
# Token endpoint: https://bsky.social/oauth/token
# Scopes: atproto, transition:generic
Capabilities
1. fetch-user-posts
Get all posts from a user by handle or DID.
# Resolve handle to DID
curl "https://bsky.social/xrpc/com.atproto.identity.resolveHandle?handle=user.bsky.social"
# Get author feed (paginated)
curl "https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor=did:plc:xxx&limit=100&cursor=$CURSOR" \
-H "Authorization: Bearer $BSKY_TOKEN"
Pagination loop:
def fetch_all_posts(actor: str, token: str) -> list:
posts, cursor = [], None
while True:
url = f"https://bsky.social/xrpc/app.bsky.feed.getAuthorFeed?actor={actor}&limit=100"
if cursor:
url += f"&cursor={cursor}"
resp = requests.get(url, headers={"Authorization": f"Bearer {token}"})
data = resp.json()
posts.extend(data.get("feed", []))
cursor = data.get("cursor")
if not cursor:
break
time.sleep(0.5) # Rate limit respect
return posts
2. get-engagement-graph
Map likes, reposts, replies, and quotes for a post.
# Get likes
curl "https://bsky.social/xrpc/app.bsky.feed.getLikes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100" \
-H "Authorization: Bearer $BSKY_TOKEN"
# Get reposts
curl "https://bsky.social/xrpc/app.bsky.feed.getRepostedBy?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100" \
-H "Authorization: Bearer $BSKY_TOKEN"
# Get quotes (search for post URI)
curl "https://bsky.social/xrpc/app.bsky.feed.getQuotes?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&limit=100" \
-H "Authorization: Bearer $BSKY_TOKEN"
Engagement record schema:
CREATE TABLE engagement (
post_uri VARCHAR PRIMARY KEY,
author_did VARCHAR,
like_count INTEGER,
repost_count INTEGER,
reply_count INTEGER,
quote_count INTEGER,
likers VARCHAR[], -- Array of DIDs
reposters VARCHAR[],
indexed_at TIMESTAMP
);
3. stream-mentions
Real-time monitoring via Firehose or polling.
# Firehose (WebSocket) - all network events
wscat -c wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos
# Polling fallback - notifications endpoint
curl "https://bsky.social/xrpc/app.bsky.notification.listNotifications?limit=50" \
-H "Authorization: Bearer $BSKY_TOKEN"
Firehose filter (Python):
import websocket
import cbor2
def on_message(ws, message):
header, body = cbor2.loads(message)
if header.get("op") == 1: # Commit
for op in body.get("ops", []):
if "app.bsky.feed.post" in op.get("path", ""):
record = op.get("record", {})
# Check for mentions in facets
for facet in record.get("facets", []):
for feature in facet.get("features", []):
if feature.get("$type") == "app.bsky.richtext.facet#mention":
if feature.get("did") == TARGET_DID:
yield record
ws = websocket.WebSocketApp("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos",
on_message=on_message)
4. extract-thread-tree
Full conversation tree from a post.
curl "https://bsky.social/xrpc/app.bsky.feed.getPostThread?uri=at://did:plc:xxx/app.bsky.feed.post/yyy&depth=10&parentHeight=10" \
-H "Authorization: Bearer $BSKY_TOKEN"
Thread tree structure:
def extract_thread_tree(thread_data: dict) -> dict:
"""Flatten thread to adjacency list for DuckDB."""
nodes = []
edges = []
def walk(node, parent_uri=None):
post = node.get("post", {})
uri = post.get("uri")
nodes.append({
"uri": uri,
"author_did": post.get("author", {}).get("did"),
"text": post.get("record", {}).get("text"),
"created_at": post.get("record", {}).get("createdAt"),
"depth": node.get("depth", 0)
})
if parent_uri:
edges.append({"parent": parent_uri, "child": uri})
for reply in node.get("replies", []):
walk(reply, uri)
walk(thread_data.get("thread", {}))
return {"nodes": nodes, "edges": edges}
5. get-follower-network
First and second-order connections.
# Followers
curl "https://bsky.social/xrpc/app.bsky.graph.getFollowers?actor=did:plc:xxx&limit=100&cursor=$CURSOR" \
-H "Authorization: Bearer $BSKY_TOKEN"
# Following
curl "https://bsky.social/xrpc/app.bsky.graph.getFollows?actor=did:plc:xxx&limit=100&cursor=$CURSOR" \
-H "Authorization: Bearer $BSKY_TOKEN"
Second-order expansion:
def get_follower_network(actor: str, depth: int = 2) -> dict:
"""BFS expansion of follower graph."""
visited = set()
edges = []
queue = [(actor, 0)]
while queue:
current, d = queue.pop(0)
if current in visited or d > depth:
continue
visited.add(current)
followers = paginate_all("app.bsky.graph.getFollowers", actor=current)
for f in followers:
did = f["did"]
edges.append({"from": did, "to": current, "type": "follows"})
if d < depth:
queue.append((did, d + 1))
return {"nodes": list(visited), "edges": edges}
Rate Limiting
| Endpoint Category | Rate Limit | Window |
|---|---|---|
| Read (feed, graph) | 3000/5min | Rolling |
| Write (post, like) | 1500/hr | Rolling |
| Firehose | Unlimited | - |
| Search | 100/min | Rolling |
Backoff strategy:
def rate_limited_request(url, headers, max_retries=5):
for attempt in range(max_retries):
resp = requests.get(url, headers=headers)
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", 2 ** attempt))
time.sleep(wait)
continue
return resp
raise Exception("Rate limit exceeded")
DuckDB Ingestion Format
-- Posts table
CREATE TABLE bsky_posts (
uri VARCHAR PRIMARY KEY,
cid VARCHAR,
author_did VARCHAR,
text TEXT,
created_at TIMESTAMP,
reply_parent VARCHAR,
reply_root VARCHAR,
embed_type VARCHAR,
langs VARCHAR[],
labels VARCHAR[],
ingested_at TIMESTAMP DEFAULT now()
);
-- Social graph
CREATE TABLE bsky_graph (
subject_did VARCHAR,
object_did VARCHAR,
relation VARCHAR, -- 'follows', 'blocks', 'mutes'
created_at TIMESTAMP,
PRIMARY KEY (subject_did, object_did, relation)
);
-- Engagement events
CREATE TABLE bsky_engagement (
uri VARCHAR,
actor_did VARCHAR,
action VARCHAR, -- 'like', 'repost', 'quote', 'reply'
target_uri VARCHAR,
created_at TIMESTAMP,
PRIMARY KEY (uri)
);
-- Insert from JSON
COPY bsky_posts FROM 'posts.json' (FORMAT JSON, ARRAY true);
AT Protocol Lexicon Reference
| Lexicon | Purpose |
|---|---|
app.bsky.feed.getAuthorFeed |
User's posts |
app.bsky.feed.getPostThread |
Thread tree |
app.bsky.feed.getLikes |
Who liked |
app.bsky.feed.getRepostedBy |
Who reposted |
app.bsky.graph.getFollowers |
Follower list |
app.bsky.graph.getFollows |
Following list |
app.bsky.actor.getProfile |
User profile |
com.atproto.sync.subscribeRepos |
Firehose |
Integration with Layer 2
Output feeds into:
- GF(3) Trit 0 (Transformer): Sentiment analysis, embedding generation
- GF(3) Trit -1 (Consumer): Dashboard display, alert triggers
# Pipeline handoff
posts = fetch_all_posts(actor, token)
duckdb.execute("INSERT INTO bsky_posts SELECT * FROM read_json(?)", [posts])
# Signal next layer
publish_event("bsky.ingestion.complete", {"actor": actor, "count": len(posts)})
More from plurigrid/asi
academic-research
Search academic papers across arXiv, PubMed, Semantic Scholar, bioRxiv, medRxiv, Google Scholar, and more. Get BibTeX citations, download PDFs, analyze citation networks. Use for literature reviews, finding papers, and academic research.
49wev-tesseract
WEV Tesseract Skill
33tree-sitter
AST-based code analysis using tree-sitter. Use for parsing code structure, extracting symbols, finding patterns with tree-sitter queries, analyzing complexity, and understanding code architecture. Supports Python, JavaScript, TypeScript, Go, Rust, C, C++, Swift, Java, Kotlin, Julia, and more.
21alife
Comprehensive Artificial Life skill combining ALIFE2025 proceedings, classic texts (Axelrod, Epstein-Axtell), ALIEN simulation, Lenia, NCA, swarm intelligence, and evolutionary computation. 337 pages extracted, 80+ papers, 153 figures.
16reverse-engineering
Reverse Engineering Skill
16bdd-mathematical-verification
BDD-Driven Mathematical Content Verification Skill
16