python-bigquery-sdk
Python BigQuery SDK
Provides workflows and best practices for the google-cloud-bigquery Python client library (v3.x) covering client setup, querying, schema definition, data loading, and result consumption.
Installation
pip install google-cloud-bigquery
# Optional extras for Arrow/pandas integration
pip install "google-cloud-bigquery[pandas,pyarrow]"
Client Initialisation
Instantiate Client once per process and reuse it. The client is thread-safe.
from google.cloud import bigquery
# Picks up credentials from GOOGLE_APPLICATION_CREDENTIALS or ADC
client = bigquery.Client(project="my-project")
# Explicit project + credentials
from google.oauth2 import service_account
credentials = service_account.Credentials.from_service_account_file("key.json")
client = bigquery.Client(project="my-project", credentials=credentials)
Key rules:
- Never create a
Clientinside a per-request or per-row function. - Always set
projectexplicitly in production; avoid relying on environment inference. - Close the client with
client.close()or use it as a context manager when appropriate.
Running Queries
Simple query (blocking)
query = "SELECT name, age FROM `my-project.my_dataset.users` WHERE age > 18"
rows = client.query_and_wait(query) # Returns RowIterator directly (v3+)
for row in rows:
print(row["name"], row.age)
query_and_wait is preferred over the legacy client.query(...).result() pattern for short interactive queries.
Parameterised queries (mandatory for untrusted input)
Always use query parameters — never string-format user input into SQL.
from google.cloud.bigquery import ScalarQueryParameter, QueryJobConfig
config = QueryJobConfig(
query_parameters=[
ScalarQueryParameter("min_age", "INT64", 18),
ScalarQueryParameter("country", "STRING", "US"),
]
)
sql = """
SELECT name FROM `project.dataset.users`
WHERE age > @min_age AND country = @country
"""
rows = client.query_and_wait(sql, job_config=config)
Parameter types map to BigQuery SQL types: "STRING", "INT64", "FLOAT64", "BOOL", "TIMESTAMP", "DATE", "BYTES".
Use ArrayQueryParameter for list inputs:
from google.cloud.bigquery import ArrayQueryParameter
config = QueryJobConfig(
query_parameters=[
ArrayQueryParameter("ids", "INT64", [1, 2, 3]),
]
)
Asynchronous / long-running queries
job = client.query(sql, job_config=config) # Returns QueryJob immediately
# ... do other work ...
rows = job.result(timeout=300) # Block until complete or timeout
print(f"Bytes processed: {job.total_bytes_processed}")
Check job.state ("RUNNING", "DONE") and job.error_result before consuming results.
Dry-run (cost estimation)
dry_config = QueryJobConfig(dry_run=True, use_query_cache=False)
job = client.query(sql, job_config=dry_config)
print(f"Estimated bytes: {job.total_bytes_processed}")
Consuming Results
Iterate rows
for row in rows:
value = row["column_name"] # by name
value = row[0] # by position
value = row.column_name # attribute access
Convert to pandas DataFrame
df = rows.to_dataframe() # requires pandas + pyarrow
df = rows.to_dataframe(dtypes={"age": "Int64"})
Convert to Arrow Table
table = rows.to_arrow()
to_arrow() is faster than to_dataframe() for large result sets; convert after if needed.
Page size control
rows = client.query_and_wait(sql, max_results=1000) # cap result rows
Schema Definition
Define schemas explicitly — never rely on autodetect in production.
schema = [
bigquery.SchemaField("user_id", "INT64", mode="REQUIRED"),
bigquery.SchemaField("email", "STRING", mode="REQUIRED"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="NULLABLE"),
bigquery.SchemaField(
"address",
"RECORD",
mode="NULLABLE",
fields=[
bigquery.SchemaField("city", "STRING"),
bigquery.SchemaField("postcode", "STRING"),
],
),
]
| Mode | Meaning |
|---|---|
REQUIRED |
NOT NULL; value must be present |
NULLABLE |
Default; value may be NULL |
REPEATED |
Array of the given type |
Standard SQL types: STRING, BYTES, INTEGER/INT64, FLOAT/FLOAT64, NUMERIC, BIGNUMERIC, BOOLEAN/BOOL, TIMESTAMP, DATE, TIME, DATETIME, GEOGRAPHY, JSON, RECORD/STRUCT.
Creating Tables
dataset_ref = client.dataset("my_dataset")
table_ref = dataset_ref.table("my_table")
table = bigquery.Table(table_ref, schema=schema)
# Time partitioning
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="created_at",
expiration_ms=7 * 24 * 60 * 60 * 1000, # 7 days
)
# Clustering
table.clustering_fields = ["country", "user_id"]
table = client.create_table(table, exists_ok=True)
Loading Data
From a local file
job_config = bigquery.LoadJobConfig(
schema=schema,
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
with open("data.ndjson", "rb") as f:
job = client.load_table_from_file(f, table_ref, job_config=job_config)
job.result() # Wait for completion
From Google Cloud Storage
uri = "gs://my-bucket/data/*.parquet"
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.PARQUET,
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
job.result()
From a pandas DataFrame
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()
Write dispositions
| Value | Behaviour |
|---|---|
WRITE_TRUNCATE |
Replace all existing rows |
WRITE_APPEND |
Add rows to existing data |
WRITE_EMPTY |
Fail if table already contains data |
Inserting Rows (Streaming)
Use the Storage Write API via google-cloud-bigquery-storage for high-throughput streaming. For simple cases:
errors = client.insert_rows_json(table_ref, [
{"user_id": 1, "email": "a@example.com"},
{"user_id": 2, "email": "b@example.com"},
])
if errors:
raise RuntimeError(f"Streaming insert errors: {errors}")
Caution: insert_rows_json does not guarantee exactly-once delivery and has per-row cost. Prefer batch load jobs for bulk ingestion.
Error Handling
from google.cloud.exceptions import GoogleCloudError
from google.api_core.exceptions import BadRequest, NotFound
try:
rows = client.query_and_wait(sql)
except BadRequest as exc:
# SQL syntax / schema errors
print(f"Query error: {exc.message}")
except NotFound as exc:
print(f"Table or dataset not found: {exc}")
except GoogleCloudError as exc:
print(f"API error: {exc}")
Always inspect job.errors after async jobs:
job = client.query(sql)
job.result()
if job.errors:
for err in job.errors:
print(err["message"], err.get("reason"))
Quick Reference: Key Classes
| Class | Module | Purpose |
|---|---|---|
Client |
google.cloud.bigquery.client |
Entry point for all API calls |
QueryJobConfig |
google.cloud.bigquery.job |
Query execution options |
LoadJobConfig |
google.cloud.bigquery.job |
Load job options |
SchemaField |
google.cloud.bigquery.schema |
Column definition |
Table |
google.cloud.bigquery.table |
Table resource |
Dataset |
google.cloud.bigquery.dataset |
Dataset resource |
TimePartitioning |
google.cloud.bigquery.table |
Partitioning config |
ScalarQueryParameter |
google.cloud.bigquery.query |
Single-value param |
ArrayQueryParameter |
google.cloud.bigquery.query |
Array param |
StructQueryParameter |
google.cloud.bigquery.query |
Struct param |
WriteDisposition |
google.cloud.bigquery.enums |
Overwrite vs append |
SourceFormat |
google.cloud.bigquery.enums |
Input file format |
Additional Resources
Reference Files
For deeper coverage consult:
references/advanced-patterns.md— Schema evolution, partitioned table queries, export jobs, retry configuration, DB-API usage, and performance patterns
Official Docs
More from the-perfect-developer/the-perfect-opencode
html
Apply Google HTML style guide conventions to HTML code
19turso-libsql
This skill should be used when the user asks to "connect to Turso", "use libSQL", "set up a Turso database", "query Turso with TypeScript", or needs guidance on Turso Cloud, embedded replicas, or vector search with libSQL.
11alpinejs
This skill should be used when the user asks to "add Alpine.js", "create Alpine component", "use Alpine directives", "build interactive UI with Alpine", or needs guidance on Alpine.js development patterns and best practices.
10python-dependency-injection
This skill should be used when the user asks to "implement dependency injection in Python", "use the dependency-injector library", "decouple Python components", "write testable Python services", or needs guidance on Inversion of Control, DI containers, provider types, and wiring in Python applications.
3copilot-sdk
This skill should be used when the user asks to "integrate GitHub Copilot into an app", "use the Copilot SDK", "build a Copilot-powered agent", "embed Copilot in a service", or needs guidance on the GitHub Copilot SDK for Python, TypeScript, Go, or .NET.
3conventional-git-commit
This skill MUST be loaded on every git commit without exception. It should also be used when the user asks to "write a conventional commit", "format a commit message", "follow conventional commits spec", "create a semantic commit", "make a commit", "commit changes", or "git commit". Every commit message produced in this project MUST conform to this specification.
3