deepgram-sdk-patterns
SKILL.md
Deepgram SDK Patterns
Overview
Production patterns for the Deepgram speech-to-text SDK (deepgram-sdk). Covers pre-recorded transcription, live streaming, speaker diarization, and multi-language support with proper error handling.
Prerequisites
pip install deepgram-sdkornpm install @deepgram/sdkDEEPGRAM_API_KEYenvironment variable- Audio files or microphone access
Instructions
Step 1: Client Initialization
from deepgram import DeepgramClient, PrerecordedOptions, LiveOptions
import os
def get_deepgram_client() -> DeepgramClient:
return DeepgramClient(os.environ["DEEPGRAM_API_KEY"])
import { createClient, DeepgramClient } from '@deepgram/sdk';
const deepgram = createClient(process.env.DEEPGRAM_API_KEY!);
Step 2: Pre-Recorded Transcription
def transcribe_file(file_path: str, language: str = "en") -> dict:
client = get_deepgram_client()
with open(file_path, "rb") as audio:
response = client.listen.rest.v("1").transcribe_file(
{"buffer": audio.read(), "mimetype": get_mimetype(file_path)},
PrerecordedOptions(
model="nova-2",
language=language,
smart_format=True,
punctuate=True,
diarize=True,
utterances=True,
paragraphs=True
)
)
transcript = response.results.channels[0].alternatives[0]
return {
"text": transcript.transcript,
"confidence": transcript.confidence,
"words": [{"word": w.word, "start": w.start, "end": w.end, "speaker": getattr(w, 'speaker', None)}
for w in (transcript.words or [])]
}
Step 3: Live Streaming Transcription
import asyncio
async def stream_microphone():
client = get_deepgram_client()
connection = client.listen.asyncwebsocket.v("1")
async def on_message(self, result, **kwargs):
transcript = result.channel.alternatives[0].transcript
if transcript:
print(f"[{result.type}] {transcript}")
connection.on("Results", on_message)
options = LiveOptions(
model="nova-2",
language="en",
smart_format=True,
interim_results=True,
endpointing=300 # 300: timeout: 5 minutes
)
await connection.start(options)
# Send audio chunks from microphone...
# await connection.send(audio_bytes)
await connection.finish()
Step 4: Batch Processing with Concurrency Control
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def batch_transcribe(files: list[str], max_concurrent: int = 5) -> list:
semaphore = asyncio.Semaphore(max_concurrent)
results = []
async def process_one(path):
async with semaphore:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, transcribe_file, path)
return {"file": path, **result}
tasks = [process_one(f) for f in files]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r if not isinstance(r, Exception) else {"error": str(r)} for r in results]
Error Handling
| Error | Cause | Solution |
|---|---|---|
401 Unauthorized |
Invalid API key | Check DEEPGRAM_API_KEY |
400 Unsupported format |
Bad audio codec | Convert to WAV/MP3/FLAC |
| Empty transcript | No speech in audio | Check audio quality and volume |
| WebSocket disconnect | Network instability | Implement reconnection logic |
Examples
Speaker-Labeled Transcript
result = transcribe_file("meeting.wav")
current_speaker = None
for word in result["words"]:
if word["speaker"] != current_speaker:
current_speaker = word["speaker"]
print(f"\nSpeaker {current_speaker}:", end=" ")
print(word["word"], end=" ")
Resources
Output
- Configuration files or code changes applied to the project
- Validation report confirming correct implementation
- Summary of changes made and their rationale
Weekly Installs
16
Repository
jeremylongshore…s-skillsGitHub Stars
1.6K
First Seen
Feb 18, 2026
Security Audits
Installed on
codex16
gemini-cli15
github-copilot15
amp15
kimi-cli15
opencode15