deeplake-managed

SKILL.md

Deeplake Managed Service SDK

Agent-friendly SDK for ingesting data into Deeplake managed tables. Use this skill when users want to store, ingest, or query data in Deeplake. Available in both Python and Node.js/TypeScript.


Prerequisites

Required services:

  • Deeplake API server running (default: https://api.deeplake.ai)

Python

Required Python packages:

  • requests (pip install requests)

Optional dependencies (per file type):

  • Video ingestion: ffmpeg (sudo apt-get install ffmpeg)
  • PDF ingestion: pymupdf (pip install pymupdf)
  • Thumbnail generation: Pillow (pip install Pillow)
  • COCO detection format: pycocotools, Pillow, numpy (pip install pycocotools Pillow numpy)
  • LeRobot frames format: pandas, numpy (pip install pandas numpy)

Python import (primary):

from deeplake import Client
# or equivalently:
from deeplake.managed import Client

# Async variant (requires aiohttp: pip install aiohttp):
from deeplake.managed import AsyncClient

Node.js / TypeScript

Required packages:

  • WASM module built from C++ (see memory notes for build commands)

Optional dependencies (per file type):

  • Video ingestion: ffmpeg (system binary)
  • PDF ingestion: pdfjs-dist (npm install pdfjs-dist)
  • Thumbnail generation: sharp (npm install sharp)
  • COCO detection format: no external deps (pure JS mask rendering)

TypeScript import:

import { ManagedClient } from '@deeplake/node';
// or from local build:
import { ManagedClient } from '/home/ubuntu/indra/typescript/node/dist';

Quick Reference

Python

from deeplake import Client

# Initialize -- token from DEEPLAKE_API_KEY env var, workspace defaults to "default"
client = Client()
client = Client(token="dl_xxx", workspace_id="my-workspace")

# Ingest files (FILE schema)
client.ingest("videos", {"path": ["video1.mp4", "video2.mp4"]}, schema={"path": "FILE"})

# Ingest structured data with indexes for search
client.ingest("embeddings", {
    "text": ["doc1", "doc2", "doc3"],
    "embedding": [[0.1, 0.2, ...], [0.3, 0.4, ...], [0.5, 0.6, ...]],
}, index=["embedding", "text"])

# Ingest from HuggingFace
client.ingest("cifar", {"_huggingface": "cifar10"})

# Ingest with format object (see formats.md for CocoPanoptic, Coco, LeRobot, custom)
client.ingest("table", format=my_format)

# Fluent query
results = client.table("videos").select("id", "text").where("file_id = $1", "abc").limit(10)()

# Raw SQL
results = client.query("SELECT * FROM videos LIMIT 10")

# Vector similarity search
results = client.query("""
    SELECT id, text, embedding <#> $1 AS similarity
    FROM embeddings ORDER BY similarity DESC LIMIT 10
""", (query_embedding,))

# Table management
client.list_tables()
client.drop_table("old_table")
client.create_index("embeddings", "embedding")

Node.js / TypeScript

import { ManagedClient } from '@deeplake/node';

const client = new ManagedClient({ token: 'dl_xxx', workspaceId: 'my-workspace' });

// Ingest files (FILE schema)
await client.ingest("videos", { path: ["video1.mp4"] }, { schema: { path: "FILE" } });

// Ingest structured data
await client.ingest("embeddings", {
    text: ["doc1", "doc2"],
    embedding: [[0.1, 0.2], [0.3, 0.4]],
});

// Ingest with format object (see formats.md)
await client.ingest("table", null, { format: myFormat });

// Fluent query (use .execute())
const results = await client.table("videos")
    .select("id", "text").where("file_id = $1", "abc").limit(10).execute();

// Raw SQL
const rows = await client.query("SELECT * FROM videos LIMIT 10");

// Table management
await client.listTables();
await client.dropTable("old_table");
await client.createIndex("embeddings", "embedding");

Architecture

Python:  Client(token, workspace_id)
Node.js: ManagedClient({ token, workspaceId })
  |-- .ingest(table, data)       -> creates PG table via API, opens al://{ws}/{table}
  |                                 via deeplake SDK (auto credential rotation)
  |-- .query(sql)                -> POST /workspaces/{id}/tables/query -> list[dict] / QueryRow[]
  |-- .table(table)...           -> fluent SQL builder -> list[dict] / QueryRow[]
  |-- .create_index(table, col)  -> CREATE INDEX USING deeplake_index (for search)
  |-- .open_table(table)         -> deeplake.open("al://{ws}/{table}") with auto creds
  |-- .list_tables()             -> GET /workspaces/{id}/tables -> list[str] / string[]
  `-- .drop_table(table)         -> DELETE /workspaces/{id}/tables/{name}
                    |
                    v
              REST API -> PostgreSQL + pg_deeplake
  - All DB operations go through the REST API (no direct PG connection)
  - Dataset access uses al:// paths with automatic credential resolution
  - Creds endpoint: GET /api/org/{workspace}/ds/{table}/creds
  - Vector similarity: embedding <#> query_vec
  - BM25 text search:  text <#> 'search query'
  - Hybrid search:     (embedding, text)::deeplake_hybrid_record

Client Initialization

Python

from deeplake import Client

client = Client(
    token: str = None,           # API token (falls back to DEEPLAKE_API_KEY env var)
    workspace_id: str = "default",  # Target workspace (default: "default")
    api_url: str = None,         # API URL (default: https://api.deeplake.ai)
)

Node.js / TypeScript

import { ManagedClient } from '@deeplake/node';

const client = new ManagedClient({
    token: string,               // API token (required)
    workspaceId?: string,        // Target workspace (default: "default")
    apiUrl?: string,             // API URL (default: https://api.deeplake.ai)
});

Token: Falls back to the DEEPLAKE_API_KEY environment variable (Python only). The token is a JWT; org_id is extracted automatically from the JWT claims. If the token doesn't contain an org_id claim, the client falls back to fetching it from the /me API endpoint.

Backend endpoint: The client sets the C++ backend endpoint to api_url before each dataset open (not on initialization) so that al:// path resolution (credential fetching) goes through deeplake-api instead of the legacy controlplane. This avoids global state clobbering when multiple clients use different API URLs. Python: deeplake.client.endpoint = api_url. Node.js: deeplakeSetEndpoint(apiUrl).

Connection lifecycle:

# Python: just create and use -- no connection to manage
client = Client()
client.ingest("table", {"path": ["file.txt"]}, schema={"path": "FILE"})
# No close() method -- client is stateless (REST API calls only)

Ingestion

Python: client.ingest()

result = client.ingest(
    table_name: str,                    # Table name to create (must not already exist)
    data: dict[str, list] = None,       # Data dict (required unless format= is set).
                                        #   {"_huggingface": "name"} -> HuggingFace dataset
                                        #   schema has "FILE" cols -> file paths processed
                                        #   otherwise -> column data {col: [values]}
    *,
    format: Format = None,              # Format object (subclass of Format) with
                                        #   normalize() method. When set, data is ignored.
                                        #   e.g. CocoPanoptic(images_dir=..., ...)
    schema: dict[str, str] = None,      # Schema override {col: type}
                                        #   Use "FILE" for columns containing file paths
                                        #   See reference.md for all type names
    index: list[str] = None,            # Columns to create deeplake_index on after ingestion.
                                        #   Use for EMBEDDING (vector search) and TEXT (BM25) columns.
    on_progress: Callable = None,       # Progress callback(rows_written, total)
    chunk_size: int = 1000,             # Text chunk size (chars)
    chunk_overlap: int = 200,           # Text chunk overlap (chars)
) -> dict

Node.js: client.ingest()

const result = await client.ingest(
    tableName: string,                              // Table name
    data?: Record<string, unknown[]> | null,        // Data dict (or null when using format)
    options?: {
        format?: Format,                            // Format object with normalize()
        schema?: Record<string, string>,            // Schema override
        index?: string[],                           // Columns to create deeplake_index on
        onProgress?: (processed, total) => void,    // Progress callback
        chunkSize?: number,                         // Text chunk size (default 1000)
        chunkOverlap?: number,                      // Text chunk overlap (default 200)
    },
): Promise<IngestResult>

Table existence: If table_name already exists, ingest() appends data to the existing table — it does NOT drop and recreate it. To replace an existing table, call client.drop_table(table_name) first. The PG table schema must be compatible with the new data being appended.

Returns: {"table_name": "videos", "row_count": 150, "dataset_path": "al://workspace_id/videos"}

Both data and format: If both are provided, format takes precedence and data is ignored. If neither is provided, an IngestError is raised.

Thumbnails: When a format object declares image_columns() (columns with pg_schema type "IMAGE"), thumbnails are auto-generated at 4 sizes (32x32, 64x64, 128x128, 256x256) and stored in a shared thumbnails dataset at {root_path}/thumbnails. Requires Pillow (Python) or sharp (Node.js).

Chunking Strategy by File Type

File Type Extensions Strategy Columns Created
Video .mp4, .mov, .avi, .mkv, .webm 10-second segments + thumbnails id, file_id, chunk_index, start_time, end_time, video_data, thumbnail, text
Image .jpg, .jpeg, .png, .gif, .bmp, .webp Single chunk id, file_id, image, filename, text
PDF .pdf Page-by-page at 300 DPI id, file_id, page_index, image, text
Text .txt, .md, .csv, .json, .xml, .html 1000 char chunks, 200 overlap id, file_id, chunk_index, text
Other * Single binary chunk id, file_id, data, filename

Key Examples

# Ingest files (FILE schema)
client.ingest("videos", {"path": ["cam1.mp4", "cam2.mp4"]}, schema={"path": "FILE"})
client.ingest("photos", {"path": ["img1.jpg"]}, schema={"path": "FILE"})
client.ingest("manuals", {"path": ["manual.pdf"]}, schema={"path": "FILE"})

# Ingest structured data (dict = column data, schema inferred)
client.ingest("vectors", {
    "text": ["Hello", "Goodbye"],
    "embedding": [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]],
})

# Ingest with explicit schema
client.ingest("data", {"name": ["Alice", "Bob"], "age": [30, 25]},
              schema={"name": "TEXT", "age": "INT64"})

# Ingest from HuggingFace
client.ingest("mnist", {"_huggingface": "mnist"})

# Ingest with a format object (see formats.md for CocoPanoptic, Coco, LeRobot, custom)
client.ingest("table", format=my_format)

# Ingest with progress callback
def progress(rows_written, total):
    print(f"Written {rows_written} rows...")
client.ingest("docs", {"path": pdf_files}, schema={"path": "FILE"}, on_progress=progress)

For custom format classes, see formats.md. For more ingestion examples, see examples.md.


Training / Streaming

client.open_table()

Open a managed table as a deeplake.Dataset for direct access -- bypasses PostgreSQL and returns the native dataset object with built-in ML framework integration.

ds = client.open_table(table_name: str) -> deeplake.Dataset

When to use: Training loops, batch iteration, PyTorch/TensorFlow DataLoaders, async prefetch.

# Batch iteration
ds = client.open_table("videos")
for batch in ds.batches(32):
    train(batch)

# PyTorch DataLoader
from torch.utils.data import DataLoader
ds = client.open_table("training_data")
loader = DataLoader(ds.pytorch(), batch_size=32, shuffle=True, num_workers=4)

# TensorFlow tf.data.Dataset
ds = client.open_table("training_data")
tf_ds = ds.tensorflow().batch(32).prefetch(tf.data.AUTOTUNE)

Querying

Fluent Query API (Recommended)

client.table(table) returns a chainable ManagedQueryBuilder:

# Python: supports both .execute() and () to run the query
results = (
    client.table("videos")
        .select("id", "text", "start_time")
        .where("file_id = $1", "abc123")
        .where("start_time > $2", 60)
        .order_by("start_time ASC")
        .limit(10)
        .offset(0)
)()  # or .execute()
// Node.js: use .execute() only (no () shorthand)
const results = await client.table("videos")
    .select("id", "text", "start_time")
    .where("file_id = $1", "abc123")
    .where("start_time > $2", 60)
    .orderBy("start_time ASC")
    .limit(10)
    .offset(0)
    .execute();
Method Python Node.js Description
.select(*cols) .select("id", "t") .select("id", "t") Set columns (default *)
.where(cond, *params) .where("id=$1","x") .where("id=$1","x") Add WHERE (multiple = AND)
.order_by(clause) .order_by("col") .orderBy("col") Add ORDER BY
.limit(n) .limit(10) .limit(10) Set LIMIT
.offset(n) .offset(20) .offset(20) Set OFFSET
Run query .execute() or () .execute() Execute, return results

How .where() parameters work: Each .where("col = $N", value) call adds an AND condition. The $1, $2, etc. placeholders are filled by the extra arguments, numbering across all .where() calls sequentially.

Raw SQL: client.query()

# Python
rows = client.query(sql: str, params: tuple = None) -> list[dict]

# Examples
rows = client.query("SELECT * FROM videos LIMIT 10")
rows = client.query("SELECT * FROM documents WHERE file_id = $1", ("abc123",))
// Node.js
const rows = await client.query(sql: string, params?: unknown[]) -> Promise<QueryRow[]>

// Examples
const rows = await client.query("SELECT * FROM videos LIMIT 10");
const rows = await client.query("SELECT * FROM documents WHERE file_id = $1", ["abc123"]);

Queries are sent to the API via POST /workspaces/{id}/tables/query. Use $1, $2, ... for parameterized queries.

For pg_deeplake SQL features (vector search, BM25, hybrid search, indexes), see reference.md.


Table Management

# Python
tables = client.list_tables() -> list[str]
client.drop_table(table_name: str, if_exists: bool = True) -> None
client.create_index(table_name: str, column: str) -> None
// Node.js
const tables = await client.listTables();
await client.dropTable(tableName: string, ifExists?: boolean); // default true
await client.createIndex(tableName: string, column: string);

Index Creation

create_index() / createIndex() creates a deeplake_index on a column. Use it for:

  • EMBEDDING columns — enables vector cosine similarity search via <#>
  • TEXT columns — enables BM25 text search via <#>

The method executes CREATE INDEX IF NOT EXISTS ... USING deeplake_index (column) and is a no-op if the index already exists.

# Python — standalone
client.create_index("embeddings", "embedding")  # vector index
client.create_index("documents", "text")         # text index

# Python — during ingestion (creates indexes after data is committed)
client.ingest("search_index", {
    "text": documents,
    "embedding": embeddings,
}, index=["embedding", "text"])
// Node.js — standalone
await client.createIndex("embeddings", "embedding");
await client.createIndex("documents", "text");

// Node.js — during ingestion
await client.ingest("search_index", {
    text: documents,
    embedding: embeddings,
}, { index: ["embedding", "text"] });

Error Handling

Both Python and Node.js share the same error hierarchy:

ManagedServiceError          # Base class for all errors
├── AuthError                # Token invalid/expired
│   └── TokenError           # Token parsing failed
├── CredentialsError         # DB credentials fetch failed
├── IngestError              # File ingestion failed
├── TableError               # Table operation failed
└── WorkspaceError           # Workspace not found or inaccessible
# Python imports
from deeplake.managed import (
    ManagedServiceError, AuthError, CredentialsError,
    IngestError, TableError, TokenError, WorkspaceError,
)
// Node.js imports
import {
    ManagedServiceError, AuthError, CredentialsError,
    IngestError, TableError, TokenError, WorkspaceError,
} from '@deeplake/node';
Error Cause Solution
AuthError: Token required No token provided Pass token= to Client() or set DEEPLAKE_API_KEY env var
AuthError: Token does not contain org_id Token missing OrgID claim Ensure token has OrgID claim or API /me is accessible
IngestError: File not found Invalid file path Check file exists at given path
TableError: table creation failed API table creation failed Check API server is running and workspace is accessible
WorkspaceError: No storage path API returned no path Check workspace exists and has storage configured

For troubleshooting details, see reference.md.


Agent Decision Trees

Decision: How to Initialize Client

Need to create a Client
|
|-- Python?
|   |-- DEEPLAKE_API_KEY env var is set?
|   |   `-- client = Client()                          # defaults: token from env, workspace="default"
|   |-- Have explicit token?
|   |   `-- client = Client(token="dl_xxx")            # workspace defaults to "default"
|   |-- Need specific workspace?
|   |   `-- client = Client(workspace_id="my-ws")      # token from env
|   `-- Need custom API URL?
|       `-- client = Client(api_url="http://custom:8080")
|
`-- Node.js?
    `-- const client = new ManagedClient({
           token: process.env.DEEPLAKE_API_KEY!,
           workspaceId: "my-ws",        // optional, default "default"
           apiUrl: "http://custom:8080", // optional
        });

Decision: How to Ingest Data

User wants to ingest data
|
|-- Is it local files? -> use FILE schema
|   |-- Python:
|   |   `-- client.ingest("table", {"path": ["f1.mp4", "f2.mp4"]},
|   |          schema={"path": "FILE"})
|   `-- Node.js:
|       `-- await client.ingest("table", { path: ["f1.mp4"] },
|              { schema: { path: "FILE" } })
|
|-- Is it structured data (dict/lists)? -> pass a dict directly
|   |-- Python:
|   |   `-- client.ingest("table", {"col1": [...], "col2": [...]})
|   `-- Node.js:
|       `-- await client.ingest("table", { col1: [...], col2: [...] })
|
|-- Is it a HuggingFace dataset? -> use _huggingface key
|   `-- client.ingest("table", {"_huggingface": "dataset_name"})
|
|-- Is it a LeRobot robotics dataset? -> 3-table design (tasks + frames + episodes)
|   |-- from deeplake.managed.formats import LeRobotTasks, LeRobotFrames, LeRobotEpisodes
|   |-- client.ingest("tasks", format=LeRobotTasks(dataset_dir))
|   |-- client.ingest("frames", format=LeRobotFrames(dataset_dir, chunk_start=0, chunk_end=3))
|   `-- client.ingest("episodes", format=LeRobotEpisodes(dataset_dir, chunk_start=0, chunk_end=3))
|   Note: chunk_end is inclusive. Episodes requires git lfs. See examples.md Workflow 7.
|
|-- Is it a domain-specific format (COCO, etc.)? -> use format object
|   |-- Python:  client.ingest("table", format=my_format)
|   `-- Node.js: await client.ingest("table", null, { format: myFormat })
|   See formats.md for built-in formats: CocoPanoptic, Coco, LeRobot, custom
|
|-- Need custom chunking for text?
|   `-- client.ingest("table", {"path": ["doc.txt"]},
|          schema={"path": "FILE"},
|          chunk_size=500, chunk_overlap=100)
|
`-- Need explicit schema?
|   `-- client.ingest("table", {...}, schema={
|          "name": "TEXT",
|          "count": "INT64",
|          "vector": "EMBEDDING",
|      })
|
`-- Need indexes for search performance?
    `-- client.ingest("table", {...}, index=["embedding", "text"])
       Or standalone: client.create_index("table", "embedding")

Decision: How to Query Data

User wants to query data
|
|-- Simple SELECT (small result)?
|   |-- Python fluent: client.table("table").select("id", "text").limit(100)()
|   |-- Python raw:    client.query("SELECT * FROM table LIMIT 100")
|   |-- Node fluent:   await client.table("table").select("id", "text").limit(100).execute()
|   `-- Node raw:      await client.query("SELECT * FROM table LIMIT 100")
|
|-- Large result set (streaming)?
|   `-- Use client.open_table("table") for direct dataset access
|      with batch iteration, PyTorch/TF DataLoaders, etc.
|
|-- Need semantic/vector search?
|   `-- client.query("""
|          SELECT *, embedding <#> $1 AS score
|          FROM table ORDER BY score DESC LIMIT 10
|      """, (query_embedding,))
|
|-- Need text search?
|   `-- client.query("""
|          SELECT * FROM table
|          WHERE text @> $1
|      """, ("keyword",))
|
`-- Need hybrid search (vector + text)?
    `-- client.query("""
           SELECT *, (embedding, text)::deeplake_hybrid_record <#>
           deeplake_hybrid_record($1, $2, 0.7, 0.3) AS score
           FROM table ORDER BY score DESC LIMIT 10
       """, (query_emb, "search text"))

Decision: User Wants to Train on Data

User wants to train / iterate over data
|
|-- Fast native batch iteration (RECOMMENDED for large datasets)?
|   |-- ds = client.open_table("table")
|   |   for batch in ds.batches(256):  # dict of numpy arrays
|   |       states = torch.tensor(np.stack([batch[c] for c in cols], axis=1))
|   |-- For column subsets, use query first:
|   |   view = ds.query("SELECT col1, col2 WHERE episode_index < 100")
|   `   for batch in view.batches(256): ...
|
|-- Need PyTorch DataLoader (small datasets or need shuffle)?
|   `-- ds = client.open_table("table")
|      loader = DataLoader(ds.pytorch(), batch_size=32, shuffle=True)
|      NOTE: Slower on large remote datasets — prefer ds.batches() above
|
|-- Need TensorFlow tf.data?
|   `-- ds = client.open_table("table")
|      tf_ds = ds.tensorflow().batch(32).prefetch(AUTOTUNE)
|
`-- Training on LeRobot data?
    |-- Behavior cloning (state->action):
    |   ds = client.open_table("droid_frames")
    |   view = ds.query("SELECT state_x, ..., action_x, ... WHERE episode_index < 100")
    |   for batch in view.batches(256):
    |       states = torch.tensor(np.stack([batch[c] for c in STATE_COLS], axis=1))
    |       actions = torch.tensor(np.stack([batch[c] for c in ACTION_COLS], axis=1))
    |       # train(model, states, actions)
    |
    `-- Video-conditioned training:
        ds = client.open_table("droid_episodes")
        # Access video bytes via ds[i]["exterior_1_video"], etc.

Decision: Error Recovery

Operation failed with error
|
|-- AuthError?
|   |-- "Token required" -> Set DEEPLAKE_API_KEY env var or pass token= to Client()
|   |-- "Token does not contain org_id" -> Ensure token has OrgID claim
|   `-- "Token expired" -> Get new token
|
|-- IngestError?
|   |-- "data must be a dict" -> Pass a dict, not list/str/int
|   |-- "data must not be empty" -> Dict must have at least one key
|   |-- "File not found" -> Check file path exists
|   |-- "ffmpeg not found" -> Install ffmpeg for video processing
|   `-- "fitz not found" / "pdfjs-dist not found" -> Install pymupdf (Python) or pdfjs-dist (Node.js)
|
|-- TableError?
|   |-- "create_deeplake_table failed" -> Check pg_deeplake extension
|   |-- "Table already exists" -> Use drop_table() first or different name
|   `-- "Index creation failed" -> Check column exists and is EMBEDDING or TEXT type
|
|-- Thumbnail generation failed? (logged as warning, non-fatal)
|   |-- Python: Install Pillow (`pip install Pillow`)
|   `-- Node.js: Install sharp (`npm install sharp`)
|
`-- ManagedServiceError?
    `-- Check API server is running at the configured api_url

Supporting Files

  • reference.md -- pg_deeplake SQL reference (vector search, BM25, hybrid search, indexes), data types, limits, performance tuning, troubleshooting
  • examples.md -- Complete end-to-end workflow examples and detailed ingestion examples
  • formats.md -- Format base class, custom format classes, normalize()/schema()/image_columns() rules
Weekly Installs
11
GitHub Stars
1
First Seen
6 days ago
Installed on
opencode11
gemini-cli11
github-copilot11
codex11
amp11
cline11