neo4j-driver-python-skill
Neo4j Python Driver
Package: neo4j
Current stable: v6
Docs: https://neo4j.com/docs/python-manual/current/
API ref: https://neo4j.com/docs/api/python-driver/current/
When to Use
- Writing Python code that connects to Neo4j
- Setting up
GraphDatabase.driver(),execute_query(), orAsyncGraphDatabasein a Python app - Questions about sessions, transactions, result handling, async patterns, or data type mapping in Python
- Debugging connection, serialization, or UNWIND batching issues
When NOT to Use
- Writing or optimizing Cypher queries → use
neo4j-cypher-skill - Upgrading from an older driver version → use
neo4j-migration-skill - GraphRAG pipelines (
neo4j-graphragpackage) → useneo4j-graphrag-skill
1. Installation
pip install neo4j
pip install neo4j-rust-ext # optional: 3–10× faster serialization, same API
For async support, no additional packages are needed — asyncio is in the standard library. For Pandas integration, install pandas separately.
2. Driver Lifecycle
Driver is thread-safe and expensive to create — create exactly one instance per application, share it everywhere, and close it on shutdown. Use it as a context manager or call .close() explicitly.
from neo4j import GraphDatabase
URI = "neo4j+s://xxx.databases.neo4j.io" # Aura
AUTH = ("neo4j", "password")
# Preferred: context manager handles close automatically
with GraphDatabase.driver(URI, auth=AUTH) as driver:
driver.verify_connectivity() # fail fast if unreachable
# ... do work ...
# Long-lived singleton (e.g. in a service class):
driver = GraphDatabase.driver(URI, auth=AUTH)
driver.verify_connectivity()
# ... later, on shutdown:
driver.close()
URI Schemes
| Scheme | When to use |
|---|---|
neo4j:// |
Unencrypted, cluster-routing |
neo4j+s:// |
TLS, cluster-routing — use for Aura |
bolt:// |
Unencrypted, single instance |
bolt+s:// |
TLS, single instance |
Auth Options
from neo4j import GraphDatabase, basic_auth, bearer_auth, kerberos_auth
GraphDatabase.driver(URI, auth=("user", "password")) # basic — tuple shorthand
GraphDatabase.driver(URI, auth=basic_auth("user", "password"))
GraphDatabase.driver(URI, auth=bearer_auth("jwt-token"))
GraphDatabase.driver(URI, auth=kerberos_auth("base64ticket"))
3. Choosing the Right API
| API | When to use | Auto-retry? | Streaming? |
|---|---|---|---|
driver.execute_query() |
Most queries — simple, safe default | ✅ | ❌ (eager) |
session.execute_read/write() |
Large results, need lazy streaming | ✅ | ✅ |
session.run() |
LOAD CSV, quick scripts, CALL {} IN TRANSACTIONS |
❌ | ✅ |
AsyncGraphDatabase |
asyncio applications | ✅ | ✅ |
4. execute_query — Recommended Default
The highest-level API. Manages sessions, transactions, retries, and bookmarks automatically.
EagerResult — Three Ways to Access the Return Value
execute_query returns an EagerResult object. It supports three access patterns — understanding all three prevents common confusion:
from neo4j import GraphDatabase, RoutingControl
# Pattern 1: Tuple unpacking (most common)
records, summary, keys = driver.execute_query(
"MATCH (p:Person) RETURN p.name AS name",
database_="neo4j",
)
# Pattern 2: Attribute access on the returned object
result = driver.execute_query(
"MATCH (p:Person) RETURN p.name AS name",
database_="neo4j",
)
records = result.records # list[Record]
summary = result.summary # ResultSummary
keys = result.keys # list[str], e.g. ['name']
# Pattern 3: Direct iteration — EagerResult is iterable over its records
for record in driver.execute_query("MATCH (p:Person) RETURN p.name AS name", database_="neo4j"):
print(record["name"]) # ✅ works — iterates over records directly
# Pattern 4: Index access — also supported
result = driver.execute_query("MATCH (p:Person) RETURN p.name AS name", database_="neo4j")
first = result[0] # ✅ first Record — same as result.records[0]
What NOT to do:
result = driver.execute_query("MATCH (p:Person) RETURN p.name AS name", database_="neo4j")
# ❌ Treating the result as a single record
result["name"] # AttributeError — EagerResult has no key access; index into .records first
# ❌ Assuming len() gives record count directly
len(result) # actually works (returns len of records), but surprising — be explicit:
len(result.records) # ✅ clear intent
# Full example with read routing
records, summary, keys = driver.execute_query(
"MATCH (p:Person {name: $name})-[:KNOWS]->(friend) RETURN friend.name AS name",
name="Alice",
routing_=RoutingControl.READ, # route reads to replicas
database_="neo4j", # always specify — avoids a round-trip
)
for record in records:
print(record["name"])
print(f"Returned {len(records)} records in {summary.result_available_after} ms")
print(f"Keys projected: {keys}") # ['name']
# Write query — access summary via attribute or unpacking
summary = driver.execute_query(
"CREATE (p:Person {name: $name, age: $age})",
name="Bob", age=30,
database_="neo4j",
).summary
print(f"Created {summary.counters.nodes_created} nodes")
⚠ Trailing Underscore Convention — Critical Gotcha
Config kwargs to execute_query must end with a single underscore to distinguish them from query parameters. This includes database_, routing_, auth_, result_transformer_, bookmark_manager_, and impersonated_user_.
No query parameter name may end with a single underscore — the driver will raise ValueError if it detects this collision. Pass such parameters via the parameters_ dict instead:
# ❌ Fails — 'name_' clashes with the driver's config namespace
driver.execute_query("MATCH (p:Person {name: $name_}) RETURN p", name_="Alice")
# ✅ Use parameters_ dict for any parameter whose name ends with _
driver.execute_query(
"MATCH (p:Person {name: $name_}) RETURN p",
parameters_={"name_": "Alice"},
database_="neo4j",
)
# ✅ Or rename the Cypher parameter to avoid the underscore
driver.execute_query(
"MATCH (p:Person {name: $name}) RETURN p",
name="Alice",
database_="neo4j",
)
⚠ Never f-string or format Cypher. Always use $param placeholders — prevents injection and enables query plan caching on the server.
Result Transformers
execute_query accepts a result_transformer_ callable to reshape the result before it's returned:
import neo4j
# Built-in: return a Pandas DataFrame (requires pandas installed)
df = driver.execute_query(
"MATCH (p:Person) RETURN p.name AS name, p.age AS age",
database_="neo4j",
result_transformer_=neo4j.Result.to_df,
)
# Built-in: return a single record — behaviour depends on result count (see below)
record = driver.execute_query(
"MATCH (p:Person {name: $name}) RETURN p",
name="Alice",
database_="neo4j",
result_transformer_=neo4j.Result.single,
)
# Custom transformer — receives the raw Result, must consume it here
def first_names(result: neo4j.Result) -> list[str]:
return [record["name"] for record in result]
names = driver.execute_query(
"MATCH (p:Person) RETURN p.name AS name",
database_="neo4j",
result_transformer_=first_names,
)
result.single() — Raises on Zero, Not Just Multiple
result.single() is not like SQLAlchemy's .scalar_one_or_none(). It raises ResultNotSingleError when the result contains zero records as well as when it contains two or more. Many models assume it returns None for zero results — it does not, by default.
# ❌ Common misconception — assuming single() returns None for zero results
def find_person(tx):
result = tx.run("MATCH (p:Person {name: $name}) RETURN p", name="Nobody")
record = result.single() # raises ResultNotSingleError, not returns None
if record is None: # never reached
return None
return record["p"]
# ✅ strict=False — returns None for zero results, still raises for 2+
def find_person_safe(tx):
result = tx.run("MATCH (p:Person {name: $name}) RETURN p", name="Alice")
record = result.single(strict=False) # None if 0 results, Record if 1, raises if 2+
if record is None:
return None
return record["p"]
# ✅ In execute_query with result_transformer_ — same two-mode behaviour applies
# Default (strict=True): raises for 0 or 2+ results
record = driver.execute_query(
"MATCH (p:Person {name: $name}) RETURN p",
name="Alice",
database_="neo4j",
result_transformer_=neo4j.Result.single, # raises if Alice not found
)
# For "find or None" semantics: write a custom transformer
def single_or_none(result):
return result.single(strict=False) # None if not found
record = driver.execute_query(
"MATCH (p:Person {name: $name}) RETURN p",
name="Alice",
database_="neo4j",
result_transformer_=single_or_none,
)
Summary of single() modes:
| Result count | single() (strict=True, default) |
single(strict=False) |
|---|---|---|
| 0 records | raises ResultNotSingleError |
returns None |
| 1 record | returns the Record |
returns the Record |
| 2+ records | raises ResultNotSingleError |
raises ResultNotSingleError |
5. Managed Transactions (execute_read / execute_write)
Use when you need lazy streaming over large results, or when you want to run multiple queries inside one transaction.
with driver.session(database="neo4j") as session:
# Read — routes to replicas; callback auto-retried on transient failure
def get_people(tx):
result = tx.run(
"MATCH (p:Person) WHERE p.name STARTS WITH $prefix RETURN p.name AS name",
prefix="Al",
)
# ✅ Consume the Result INSIDE the callback — it is invalid after the tx closes
return [record["name"] for record in result]
names = session.execute_read(get_people)
# Write — routes to leader
def create_person(tx):
tx.run("CREATE (p:Person {name: $name})", name="Carol")
session.execute_write(create_person)
Critical: Result Lifetime in Transaction Functions
Result is a lazy cursor backed by the open transaction. The transaction closes the moment the callback returns. Reading a Result after that raises ResultConsumedError.
# ❌ WRONG — leaks the Result out of the transaction
def bad_tx(tx):
return tx.run("MATCH (p:Person) RETURN p.name AS name")
# Result returned here; tx closes immediately after
result = session.execute_read(bad_tx)
list(result) # raises ResultConsumedError — the cursor is already closed
# ✅ CORRECT — fully consume the result before the function returns
def good_tx(tx):
result = tx.run("MATCH (p:Person) RETURN p.name AS name")
return [record["name"] for record in result] # consumed while tx is open
Multiple tx.run() Calls
If you call tx.run() a second time before the first Result is consumed, the driver automatically buffers the first result in memory before running the next query. This is safe, but means you can accidentally pull a large result into RAM. Consume each result before the next call when working with large datasets:
def multi_query_tx(tx):
# First result — consume it immediately
people = [r["name"] for r in tx.run("MATCH (p:Person) RETURN p.name AS name")]
# Second query — safe, first result is already consumed
for name in people:
tx.run("MERGE (p:Person {name: $name})-[:VISITED]->(:City {name: 'London'})",
name=name)
return len(people)
Retry Safety
The callback may execute more than once on transient failures. Keep callbacks idempotent:
# ❌ Side effect runs on every retry
def dangerous_tx(tx):
requests.post("https://api.example.com/notify") # fires on every retry
tx.run("CREATE (p:Person {name: $name})", name="Alice")
# ✅ Pure database work; HTTP call made only on confirmed success
def safe_tx(tx):
tx.run("MERGE (p:Person {name: $name})", name="Alice") # MERGE is idempotent
session.execute_write(safe_tx)
# Make the HTTP call here, outside the callback, once write is confirmed
requests.post("https://api.example.com/notify")
TransactionConfig — Timeouts & Metadata
Use @unit_of_work to attach a timeout and metadata to a managed transaction function:
from neo4j import unit_of_work
@unit_of_work(timeout=5.0, metadata={"app": "myService", "user": user_id})
def get_people(tx):
return [r["name"] for r in tx.run("MATCH (p:Person) RETURN p.name AS name")]
session.execute_read(get_people)
The @unit_of_work decorator attaches the config to the function. Metadata appears in SHOW TRANSACTIONS and server query logs.
⚠ @unit_of_work Cannot Be Applied to Lambdas
Python does not allow decorating a lambda expression. This is a common trap — if you use a lambda for a simple transaction (which is convenient), you silently lose the ability to set a timeout or metadata:
# ❌ Syntax error — cannot decorate a lambda
session.execute_write(
@unit_of_work(timeout=5.0)
lambda tx: tx.run("MERGE (p:Person {name: $name})", name="Alice")
)
# ❌ Also wrong — @unit_of_work has no effect when called after the fact
fn = lambda tx: tx.run("MERGE (p:Person {name: $name})", name="Alice")
unit_of_work(timeout=5.0)(fn) # wraps fn, but the session.execute_write call
session.execute_write(fn) # still uses the original fn, not the wrapped version
# ✅ Correct — define a named function and decorate it
@unit_of_work(timeout=5.0, metadata={"app": "myService"})
def create_person(tx):
tx.run("MERGE (p:Person {name: $name})", name="Alice")
session.execute_write(create_person)
# ✅ Also correct — assign the decorated version explicitly
create_person = unit_of_work(timeout=5.0)(lambda tx: tx.run(
"MERGE (p:Person {name: $name})", name="Alice"
))
session.execute_write(create_person)
The practical rule: use named functions whenever you need a timeout or metadata; lambdas are fine for fire-and-forget callbacks where server-default timeouts are acceptable.
6. Implicit Transactions (session.run)
The lowest-level, least safe API. Not automatically retried. Use only for:
LOAD CSVimports (must use auto-commit transactions)CALL { } IN TRANSACTIONSCypher- Quick prototyping
with driver.session(database="neo4j") as session:
result = session.run("CREATE (p:Person {name: $name})", name="Alice")
summary = result.consume() # ⚠ call consume() to ensure the tx commits
print(summary.counters.nodes_created)
Commit timing is non-obvious: an implicit transaction commits at the latest when the session is closed, or immediately before the next query in the same session. Do not rely on this ordering — always call .consume() when you need a guaranteed commit before proceeding.
# ❌ Fragile — commit timing is undefined between the two runs
with driver.session(database="neo4j") as session:
session.run("CREATE (p:Person {name: 'Alice'})")
session.run("MATCH (p:Person {name: 'Alice'}) SET p.age = 30") # may not see Alice
# ✅ Explicit consume ensures first tx is committed before the second runs
with driver.session(database="neo4j") as session:
session.run("CREATE (p:Person {name: 'Alice'})").consume()
session.run("MATCH (p:Person {name: 'Alice'}) SET p.age = 30")
Since the driver cannot determine whether session.run() requires read or write access, it defaults to write mode. If your implicit transaction is read-only, declare it:
with driver.session(database="neo4j", default_access_mode=neo4j.READ_ACCESS) as session:
result = session.run("MATCH (p:Person) RETURN p.name AS name")
7. Explicit Transactions
Use when a transaction must span multiple functions or coordinate with external systems.
with driver.session(database="neo4j") as session:
tx = session.begin_transaction()
try:
do_part_a(tx)
do_part_b(tx)
tx.commit()
except Exception as e:
tx.rollback() # rollback can itself raise on network failure — see below
raise
def do_part_a(tx):
tx.run("CREATE (p:Person {name: $name})", name="Alice")
Rollback Can Raise
tx.rollback() is a network call. If the connection is broken, it raises. Don't let it swallow the original exception:
try:
tx.commit()
except Exception as original:
try:
tx.rollback()
except Exception as rollback_err:
original.__suppress_context__ = False
raise rollback_err from original # chain both exceptions
raise
Commit Uncertainty
If tx.commit() raises a network-level exception, the commit may or may not have succeeded on the server. Design writes to be idempotent with MERGE and unique constraints so retrying is always safe.
8. Async API
For asyncio applications. The async API mirrors the sync API exactly — replace GraphDatabase with AsyncGraphDatabase and await every call.
Async Driver Is Also a Singleton
The async driver is just as expensive to create as the sync driver — connection pool, DNS resolution, and TLS handshake all happen at construction time. Do not recreate it per request.
# ❌ Wrong — recreates the driver (and tears down the connection pool) on every call
async def handle_request(name: str):
async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
records, _, _ = await driver.execute_query("MATCH (p:Person {name: $name}) RETURN p",
name=name, database_="neo4j")
return records
# ✅ Correct — driver created once at app startup, shared for the lifetime of the process
driver = AsyncGraphDatabase.driver(URI, auth=AUTH)
async def handle_request(name: str):
records, _, _ = await driver.execute_query("MATCH (p:Person {name: $name}) RETURN p",
name=name, database_="neo4j")
return records
# Close at shutdown
await driver.close()
Web Framework Lifespan Pattern (FastAPI / Starlette)
from contextlib import asynccontextmanager
from fastapi import FastAPI
from neo4j import AsyncGraphDatabase
_driver = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _driver
_driver = AsyncGraphDatabase.driver(URI, auth=AUTH)
await _driver.verify_connectivity()
yield # app runs here
await _driver.close() # called on shutdown
app = FastAPI(lifespan=lifespan)
def get_driver():
return _driver # injected via FastAPI Depends()
@app.get("/people")
async def get_people():
records, _, _ = await get_driver().execute_query(
"MATCH (p:Person) RETURN p.name AS name",
database_="neo4j",
routing_=RoutingControl.READ,
)
return [r["name"] for r in records]
Basic Async Usage
import asyncio
from neo4j import AsyncGraphDatabase
URI = "neo4j+s://xxx.databases.neo4j.io"
AUTH = ("neo4j", "password")
async def main():
async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
await driver.verify_connectivity()
records, summary, keys = await driver.execute_query(
"MATCH (p:Person) RETURN p.name AS name",
database_="neo4j",
routing_=RoutingControl.READ,
)
names = [r["name"] for r in records]
print(names)
asyncio.run(main())
Async Managed Transactions
async def get_people(tx):
result = await tx.run("MATCH (p:Person) RETURN p.name AS name")
# ✅ Consume inside the async callback — await the collection
return await result.values() # returns list of lists [[name], [name], ...]
async def create_person(tx, name: str):
await tx.run("MERGE (p:Person {name: $name})", name=name)
async def run_queries(driver):
async with driver.session(database="neo4j") as session:
people = await session.execute_read(get_people)
await session.execute_write(create_person, "Carol")
Async Result Methods
| Method | Returns | Notes |
|---|---|---|
await result.values() |
list[list] |
Each inner list is one row of values |
await result.data() |
list[dict] |
Each dict is one record keyed by column name |
await result.single() |
one Record |
Raises if not exactly one record |
await result.fetch(n) |
list[Record] |
Up to n records |
await result.consume() |
ResultSummary |
Discards remaining, returns summary |
async for record in result |
iterates Record |
Lazy streaming |
⚠ Do Not Mix Sync and Async Drivers
Never use the sync GraphDatabase.driver in an asyncio context — it blocks the event loop. Always use AsyncGraphDatabase.driver in async code, even for a single query.
# ❌ Blocks the event loop — other coroutines cannot run during the query
async def bad():
with GraphDatabase.driver(URI, auth=AUTH) as driver:
records, _, _ = driver.execute_query("MATCH (p:Person) RETURN p")
# ✅ Async driver keeps the event loop free
async def good():
async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
records, _, _ = await driver.execute_query("MATCH (p:Person) RETURN p")
Concurrency with asyncio
async def run_concurrent(driver):
# Run multiple queries concurrently with asyncio.gather
results = await asyncio.gather(
driver.execute_query("MATCH (a:Artist) RETURN a.name AS name", database_="neo4j"),
driver.execute_query("MATCH (v:Venue) RETURN v.name AS name", database_="neo4j"),
)
artists = [r["name"] for r in results[0].records]
venues = [r["name"] for r in results[1].records]
9. Error Handling
from neo4j.exceptions import (
Neo4jError,
DriverError,
ServiceUnavailable,
SessionExpired,
TransientError,
AuthError,
ConstraintError, # unique/existence constraint violation — most common app-level error
)
try:
driver.execute_query("...", database_="neo4j")
except AuthError as e:
print("Bad credentials:", e)
except ServiceUnavailable as e:
print("No servers reachable:", e)
except TransientError as e:
# execute_query and execute_read/write retry automatically;
# this is only raised once retries are exhausted
print(f"Transient error after retries: {e.code}")
except ConstraintError as e:
# Unique or existence constraint violation — subclass of Neo4jError
# Must be caught BEFORE the generic Neo4jError handler
print(f"Constraint violation [{e.code}]: {e.message}")
except Neo4jError as e:
# Server-side Cypher or other database error
print(f"Neo4j error [{e.code}]: {e.message}")
# GQL status code for stable programmatic handling:
if e.gql_status == "42001": # SyntaxError
print("Fix the query syntax")
ConstraintError — Unique Constraint Violations
ConstraintError is the most common application-level database error and should almost always be handled explicitly. It is a subclass of Neo4jError, so a bare except Neo4jError will catch it — but silently, without letting you branch on "this specific node already exists" vs "something else went wrong".
from neo4j.exceptions import ConstraintError
def create_user(driver, username: str) -> bool:
"""Returns True if created, False if username already exists."""
try:
driver.execute_query(
"CREATE (u:User {username: $username})",
username=username,
database_="neo4j",
)
return True
except ConstraintError:
# Neo4j raised Neo.ClientError.Schema.ConstraintValidationFailed
# because a unique constraint on User.username was violated
return False
# Constraint violation codes follow a predictable pattern:
# e.code == "Neo.ClientError.Schema.ConstraintValidationFailed"
# e.message contains the constraint name and offending value
Important catch ordering: because ConstraintError is a subclass of Neo4jError, always catch it before the generic Neo4jError handler, or it will be swallowed by the parent.
GQL status codes (stable across versions) are preferable to error message strings for programmatic handling. Use e.gql_status or e.find_by_gql_status("42001") rather than parsing e.message.
10. Record Access & Null Safety
Accessing Values
records, _, _ = driver.execute_query(
"MATCH (p:Person) RETURN p.name AS name, p.age AS age",
database_="neo4j",
)
record = records[0]
# By key — raises KeyError if key absent
name = record["name"]
# By index — 0-based, positional
name = record[0]
# .get() — returns None for both absent keys AND graph null (see below)
name = record.get("name")
name = record.get("name", "Unknown") # with default
# .data() — converts the record to a dict keyed by column name
d = record.data() # {"name": "Alice", "age": 30}
record.data() Is Not JSON-Serializable
.data() returns a dict, but the values are still driver objects for graph and temporal types — it does not recursively convert to Python primitives. Calling json.dumps(record.data()) will raise TypeError if any field contains a Node, Relationship, Path, or neo4j.time.* value.
# Query returning a node: MATCH (p:Person) RETURN p
record = records[0]
d = record.data()
# d == {"p": <Node element_id='4:...' labels=frozenset({'Person'}) properties={'name': 'Alice'}>}
# The value is a Node object, NOT a dict
import json
json.dumps(d) # ❌ raises TypeError: Object of type Node is not JSON serializable
# ❌ Also fails for temporal types:
# d == {"created_at": <DateTime 2024-01-01T00:00:00.000000000+00:00>}
json.dumps(d) # TypeError: Object of type DateTime is not JSON serializable
To get a fully JSON-safe dict, extract properties explicitly:
# ✅ For simple scalar fields — .data() is fine
records, _, _ = driver.execute_query(
"MATCH (p:Person) RETURN p.name AS name, p.age AS age", # scalar projections
database_="neo4j",
)
d = records[0].data() # {"name": "Alice", "age": 30} — safe to json.dumps()
# ✅ For node/relationship fields — extract properties manually
records, _, _ = driver.execute_query(
"MATCH (p:Person) RETURN p", # returns whole node
database_="neo4j",
)
node = records[0]["p"] # neo4j.graph.Node
props = dict(node) # {"name": "Alice", "age": 30} — plain dict of properties
# ✅ For temporal types — convert explicitly before serializing
from neo4j.time import DateTime
dt = records[0]["created_at"] # neo4j.time.DateTime
iso = str(dt) # "2024-01-01T00:00:00.000000000+00:00" — JSON-safe string
py_dt = dt.to_native() # datetime.datetime — also JSON-safe via isoformat()
# ✅ General pattern: project scalars in Cypher rather than returning whole nodes
records, _, _ = driver.execute_query("""
MATCH (p:Person)
RETURN p.name AS name, p.age AS age, toString(p.created_at) AS created_at
""",
database_="neo4j",
)
# Now .data() is fully JSON-safe
safe_dicts = [r.data() for r in records]
json.dumps(safe_dicts) # ✅ works
Null Safety — Absent Key vs Graph Null
These are two distinct situations that both surface as None when using .get() — which hides the difference:
| Situation | record["key"] |
record.get("key") |
|---|---|---|
| Key projected, value non-null | the value | the value |
| Key projected, value is graph null | None |
None |
| Key absent (typo / not in RETURN) | raises KeyError |
None |
# ❌ Typo — silent None when using .get(), explodes with [] when you least expect it
record.get("nme") # None — no error, typo goes undetected
record["nme"] # KeyError — caught earlier
# When a column is from OPTIONAL MATCH, graph null gives None:
# Query: OPTIONAL MATCH (p)-[:LIVES_IN]->(c:City) RETURN p.name AS name, c.name AS city
city = record.get("city") # None when no City matched — same as an absent key via .get()
The safest pattern for optional columns:
# Check key presence explicitly for truly optional columns:
if "city" in record.keys() and record["city"] is not None:
city = record["city"]
else:
city = "Unknown"
# Or rely on .get() with a sentinel and accept that it covers both cases:
city = record.get("city") or "Unknown"
Graph Types
# Node
node = record["p"] # neo4j.graph.Node
node.element_id # stable identifier within this transaction
node.labels # frozenset({'Person'})
node["name"] # property access by key
dict(node) # all properties as plain dict
# Relationship
rel = record["r"] # neo4j.graph.Relationship
rel.type # 'KNOWS'
rel.start_node.element_id
rel.end_node.element_id
rel["since"] # property
# ⚠ element_id is only guaranteed stable within one transaction.
# Do not use it to MATCH entities across separate transactions.
Temporal Types
The driver returns Neo4j temporal values as neo4j.time types, not native Python datetime. Conversion is lossy:
from neo4j.time import DateTime
dt = record["created_at"] # neo4j.time.DateTime
type(dt) # <class 'neo4j.time.DateTime'>
# Convert to Python datetime — loses sub-microsecond precision and some timezone info
py_dt = dt.to_native() # datetime.datetime
# Pass Python datetime as a parameter — driver converts automatically
from datetime import datetime, timezone
driver.execute_query(
"CREATE (e:Event {at: $ts})",
ts=datetime.now(timezone.utc),
database_="neo4j",
)
11. Data Types & Parameter Mapping
Allowed Parameter Types
Only these types (and None) are valid as query parameter values:
| Python type | Cypher type |
|---|---|
str |
String |
int |
Integer |
float |
Float |
bool |
Boolean |
list / tuple |
List |
dict |
Map |
None |
null |
datetime.date |
Date |
datetime.datetime |
DateTime |
datetime.time |
Time |
datetime.timedelta |
Duration |
neo4j.time.* types |
Corresponding Cypher temporal |
Custom classes, dataclasses, Pydantic models, and enums are not automatically serialised. Convert to dict or primitive values before passing as parameters.
from dataclasses import dataclass, asdict
@dataclass
class Person:
name: str
age: int
p = Person("Alice", 30)
# ❌ Fails — driver can't serialise a dataclass
driver.execute_query("CREATE (p:Person $props)", props=p, database_="neo4j")
# ✅ Convert to dict first
driver.execute_query("CREATE (p:Person $props)", props=asdict(p), database_="neo4j")
# or pass fields individually:
driver.execute_query("CREATE (p:Person {name: $name, age: $age})",
name=p.name, age=p.age, database_="neo4j")
12. Performance
Always Specify the Database
Omitting database_ causes the driver to resolve the home database with an extra network round-trip on every call.
# execute_query:
driver.execute_query("...", database_="neo4j")
# Session:
driver.session(database="neo4j")
Route Reads to Replicas
from neo4j import RoutingControl
# execute_query:
driver.execute_query("MATCH ...", routing_=RoutingControl.READ, database_="neo4j")
# Managed transaction — execute_read routes automatically:
session.execute_read(my_read_fn)
Batch Writes with UNWIND
Pass a list[dict] — each dict becomes one row in the Cypher loop. This is the only shape the driver serialises correctly for UNWIND.
# ❌ Wrong — passing a list of dataclass instances or custom objects
people = [Person("Alice", 30), Person("Bob", 25)]
driver.execute_query("UNWIND $people AS p MERGE (:Person {name: p.name})",
people=people, database_="neo4j") # raises at runtime
# ✅ Correct — list of plain dicts
people = [
{"name": "Alice", "age": 30, "city": "London"},
{"name": "Bob", "age": 25, "city": "Paris"},
]
driver.execute_query("""
UNWIND $people AS person
MERGE (p:Person {name: person.name})
SET p.age = person.age
MERGE (c:City {name: person.city})
MERGE (p)-[:LIVES_IN]->(c)
""",
people=people,
database_="neo4j",
)
Group Multiple Writes in One Transaction
# Bad: one transaction per item — high overhead
for item in items:
driver.execute_query("CREATE (n:Node {id: $id})", id=item["id"], database_="neo4j")
# Good: all in one managed transaction
def bulk_create(tx):
for item in items:
tx.run("CREATE (n:Node {id: $id})", id=item["id"])
with driver.session(database="neo4j") as session:
session.execute_write(bulk_create)
Lazy vs Eager Loading
# execute_query is always eager — fine for small/medium result sets
records, _, _ = driver.execute_query("MATCH (p:Person) RETURN p", database_="neo4j")
# For large results, iterate lazily inside a managed transaction
def process_large_result(tx):
result = tx.run("MATCH (p:Person) RETURN p.name AS name")
for record in result: # streams one record at a time
process(record["name"]) # don't build a list
with driver.session(database="neo4j") as session:
session.execute_read(process_large_result)
Concurrency — The GIL Matters
The Python GIL means that threads do not give true parallelism for CPU-bound work, but they do overlap on I/O (network waits). For heavy parallel database work, asyncio with AsyncGraphDatabase is the better approach:
# Sync threading — helps with I/O overlap, but GIL limits true parallelism
from concurrent.futures import ThreadPoolExecutor
def query(name):
records, _, _ = driver.execute_query(
"MATCH (p:Person {name: $name}) RETURN p", name=name, database_="neo4j"
)
return records
with ThreadPoolExecutor(max_workers=10) as pool:
results = list(pool.map(query, names))
# Async — preferred for high-concurrency read workloads
async def run_all(names):
async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
tasks = [
driver.execute_query("MATCH (p:Person {name: $name}) RETURN p",
name=name, database_="neo4j")
for name in names
]
return await asyncio.gather(*tasks)
Connection Pool Tuning
driver = GraphDatabase.driver(
URI, auth=AUTH,
max_connection_pool_size=50, # default: 100
connection_acquisition_timeout=30, # seconds to wait for a free connection
max_connection_lifetime=3600, # seconds; recycle old connections
connection_timeout=15, # seconds to establish a new connection
keep_alive=True, # TCP keepalive
)
Session exhaustion: each open session holds a connection. If sessions are not closed promptly, the pool is exhausted and new sessions block for up to connection_acquisition_timeout seconds then raise ClientError. Always use sessions as context managers.
13. Causal Consistency & Bookmarks
Within a single session, queries are automatically causally chained — nothing to do.
Across sessions, use execute_query (auto-managed) or pass bookmarks explicitly:
from neo4j import Bookmarks
# Sessions A and B run concurrently; session C must see both writes
with driver.session(database="neo4j") as session_a:
session_a.execute_write(lambda tx: tx.run("MERGE (p:Person {name: 'Alice'})"))
bookmarks_a = session_a.last_bookmarks()
with driver.session(database="neo4j") as session_b:
session_b.execute_write(lambda tx: tx.run("MERGE (p:Person {name: 'Bob'})"))
bookmarks_b = session_b.last_bookmarks()
combined = Bookmarks.from_raw_values(
*bookmarks_a.raw_values, *bookmarks_b.raw_values
)
# Session C waits until both Alice and Bob exist
with driver.session(database="neo4j", bookmarks=combined) as session_c:
session_c.execute_write(
lambda tx: tx.run("MATCH (a:Person {name:'Alice'}), (b:Person {name:'Bob'}) "
"MERGE (a)-[:KNOWS]->(b)")
)
execute_query shares a BookmarkManager automatically across calls — usually all you need.
14. Repository Pattern — Recommended Structure
from neo4j import Driver, RoutingControl
from dataclasses import dataclass
@dataclass
class Person:
name: str
age: int
class PersonRepository:
def __init__(self, driver: Driver, database: str = "neo4j"):
self._driver = driver
self._db = database
def find_by_name_prefix(self, prefix: str) -> list[Person]:
records, _, _ = self._driver.execute_query(
"MATCH (p:Person) WHERE p.name STARTS WITH $prefix RETURN p.name AS name, p.age AS age",
prefix=prefix,
routing_=RoutingControl.READ,
database_=self._db,
)
return [Person(name=r["name"], age=r["age"]) for r in records]
def create(self, person: Person) -> None:
self._driver.execute_query(
"CREATE (p:Person {name: $name, age: $age})",
name=person.name, age=person.age,
database_=self._db,
)
def bulk_create(self, people: list[Person]) -> None:
rows = [{"name": p.name, "age": p.age} for p in people]
self._driver.execute_query(
"UNWIND $rows AS row MERGE (p:Person {name: row.name}) SET p.age = row.age",
rows=rows,
database_=self._db,
)
15. Quick Reference: Common Mistakes
| Mistake | Fix |
|---|---|
| f-string / format Cypher params | Use $param placeholders always |
Param name ending with _ |
Pass via parameters_={"name_": val} dict |
Omitting database_ |
Always set — saves a round-trip every call |
Returning Result from a tx function |
Consume to list / dict inside the function |
Buffering large results before second tx.run() |
Consume eagerly or restructure to avoid parallel cursors |
Side effects inside execute_read/write callbacks |
Move outside — callback may be retried |
| Passing dataclass/Pydantic objects as params | Convert to dict or primitive fields first |
| Passing custom objects to UNWIND | list[dict] is the only supported shape |
Using record.get() to detect absent vs graph null |
They both return None; use "key" in record.keys() for absent key detection |
Not calling .consume() after session.run() |
Commit timing undefined; call .consume() for guaranteed commit |
Using sync driver inside asyncio |
Use AsyncGraphDatabase.driver — sync driver blocks the event loop |
| Recreating async driver per request | Async driver is a singleton — create once at app startup |
| Not closing sessions | Use with driver.session(...) as session — leaked sessions exhaust the pool |
Creating a new Driver per request |
Create once at startup; share everywhere |
| One transaction per write in a loop | Batch with UNWIND or group in one execute_write callback |
MERGE for guaranteed-new data |
Use CREATE — MERGE does an internal match first |
Using execute_write for reads |
Use execute_read — routes to replicas |
Calling json.dumps(record.data()) with graph/temporal fields |
Project scalar fields in Cypher or convert driver objects explicitly |
result["name"] on an EagerResult |
Index into result.records first, or unpack: records, _, _ = ... |
Assuming result.single() returns None for zero results |
It raises — use result.single(strict=False) for None-on-empty behaviour |
@unit_of_work on a lambda |
Assign the decorated version: fn = unit_of_work(timeout=5)(lambda tx: ...) |
Catching Neo4jError before ConstraintError |
Catch ConstraintError first — it's a subclass of Neo4jError |