skills/neo4j-contrib/neo4j-skills/neo4j-driver-python-skill

neo4j-driver-python-skill

Installation
SKILL.md

Neo4j Python Driver

Package: neo4j
Current stable: v6
Docs: https://neo4j.com/docs/python-manual/current/
API ref: https://neo4j.com/docs/api/python-driver/current/


When to Use

  • Writing Python code that connects to Neo4j
  • Setting up GraphDatabase.driver(), execute_query(), or AsyncGraphDatabase in a Python app
  • Questions about sessions, transactions, result handling, async patterns, or data type mapping in Python
  • Debugging connection, serialization, or UNWIND batching issues

When NOT to Use

  • Writing or optimizing Cypher queries → use neo4j-cypher-skill
  • Upgrading from an older driver version → use neo4j-migration-skill
  • GraphRAG pipelines (neo4j-graphrag package) → use neo4j-graphrag-skill

1. Installation

pip install neo4j
pip install neo4j-rust-ext   # optional: 3–10× faster serialization, same API

For async support, no additional packages are needed — asyncio is in the standard library. For Pandas integration, install pandas separately.


2. Driver Lifecycle

Driver is thread-safe and expensive to create — create exactly one instance per application, share it everywhere, and close it on shutdown. Use it as a context manager or call .close() explicitly.

from neo4j import GraphDatabase

URI  = "neo4j+s://xxx.databases.neo4j.io"  # Aura
AUTH = ("neo4j", "password")

# Preferred: context manager handles close automatically
with GraphDatabase.driver(URI, auth=AUTH) as driver:
    driver.verify_connectivity()   # fail fast if unreachable
    # ... do work ...

# Long-lived singleton (e.g. in a service class):
driver = GraphDatabase.driver(URI, auth=AUTH)
driver.verify_connectivity()
# ... later, on shutdown:
driver.close()

URI Schemes

Scheme When to use
neo4j:// Unencrypted, cluster-routing
neo4j+s:// TLS, cluster-routing — use for Aura
bolt:// Unencrypted, single instance
bolt+s:// TLS, single instance

Auth Options

from neo4j import GraphDatabase, basic_auth, bearer_auth, kerberos_auth

GraphDatabase.driver(URI, auth=("user", "password"))   # basic — tuple shorthand
GraphDatabase.driver(URI, auth=basic_auth("user", "password"))
GraphDatabase.driver(URI, auth=bearer_auth("jwt-token"))
GraphDatabase.driver(URI, auth=kerberos_auth("base64ticket"))

3. Choosing the Right API

API When to use Auto-retry? Streaming?
driver.execute_query() Most queries — simple, safe default ❌ (eager)
session.execute_read/write() Large results, need lazy streaming
session.run() LOAD CSV, quick scripts, CALL {} IN TRANSACTIONS
AsyncGraphDatabase asyncio applications

4. execute_query — Recommended Default

The highest-level API. Manages sessions, transactions, retries, and bookmarks automatically.

EagerResult — Three Ways to Access the Return Value

execute_query returns an EagerResult object. It supports three access patterns — understanding all three prevents common confusion:

from neo4j import GraphDatabase, RoutingControl

# Pattern 1: Tuple unpacking (most common)
records, summary, keys = driver.execute_query(
    "MATCH (p:Person) RETURN p.name AS name",
    database_="neo4j",
)

# Pattern 2: Attribute access on the returned object
result = driver.execute_query(
    "MATCH (p:Person) RETURN p.name AS name",
    database_="neo4j",
)
records = result.records    # list[Record]
summary = result.summary    # ResultSummary
keys    = result.keys       # list[str], e.g. ['name']

# Pattern 3: Direct iteration — EagerResult is iterable over its records
for record in driver.execute_query("MATCH (p:Person) RETURN p.name AS name", database_="neo4j"):
    print(record["name"])   # ✅ works — iterates over records directly

# Pattern 4: Index access — also supported
result = driver.execute_query("MATCH (p:Person) RETURN p.name AS name", database_="neo4j")
first = result[0]           # ✅ first Record — same as result.records[0]

What NOT to do:

result = driver.execute_query("MATCH (p:Person) RETURN p.name AS name", database_="neo4j")

# ❌ Treating the result as a single record
result["name"]          # AttributeError — EagerResult has no key access; index into .records first

# ❌ Assuming len() gives record count directly
len(result)             # actually works (returns len of records), but surprising — be explicit:
len(result.records)     # ✅ clear intent
# Full example with read routing
records, summary, keys = driver.execute_query(
    "MATCH (p:Person {name: $name})-[:KNOWS]->(friend) RETURN friend.name AS name",
    name="Alice",
    routing_=RoutingControl.READ,   # route reads to replicas
    database_="neo4j",              # always specify — avoids a round-trip
)

for record in records:
    print(record["name"])

print(f"Returned {len(records)} records in {summary.result_available_after} ms")
print(f"Keys projected: {keys}")    # ['name']

# Write query — access summary via attribute or unpacking
summary = driver.execute_query(
    "CREATE (p:Person {name: $name, age: $age})",
    name="Bob", age=30,
    database_="neo4j",
).summary
print(f"Created {summary.counters.nodes_created} nodes")

⚠ Trailing Underscore Convention — Critical Gotcha

Config kwargs to execute_query must end with a single underscore to distinguish them from query parameters. This includes database_, routing_, auth_, result_transformer_, bookmark_manager_, and impersonated_user_.

No query parameter name may end with a single underscore — the driver will raise ValueError if it detects this collision. Pass such parameters via the parameters_ dict instead:

# ❌ Fails — 'name_' clashes with the driver's config namespace
driver.execute_query("MATCH (p:Person {name: $name_}) RETURN p", name_="Alice")

# ✅ Use parameters_ dict for any parameter whose name ends with _
driver.execute_query(
    "MATCH (p:Person {name: $name_}) RETURN p",
    parameters_={"name_": "Alice"},
    database_="neo4j",
)

# ✅ Or rename the Cypher parameter to avoid the underscore
driver.execute_query(
    "MATCH (p:Person {name: $name}) RETURN p",
    name="Alice",
    database_="neo4j",
)

⚠ Never f-string or format Cypher. Always use $param placeholders — prevents injection and enables query plan caching on the server.

Result Transformers

execute_query accepts a result_transformer_ callable to reshape the result before it's returned:

import neo4j

# Built-in: return a Pandas DataFrame (requires pandas installed)
df = driver.execute_query(
    "MATCH (p:Person) RETURN p.name AS name, p.age AS age",
    database_="neo4j",
    result_transformer_=neo4j.Result.to_df,
)

# Built-in: return a single record — behaviour depends on result count (see below)
record = driver.execute_query(
    "MATCH (p:Person {name: $name}) RETURN p",
    name="Alice",
    database_="neo4j",
    result_transformer_=neo4j.Result.single,
)

# Custom transformer — receives the raw Result, must consume it here
def first_names(result: neo4j.Result) -> list[str]:
    return [record["name"] for record in result]

names = driver.execute_query(
    "MATCH (p:Person) RETURN p.name AS name",
    database_="neo4j",
    result_transformer_=first_names,
)

result.single() — Raises on Zero, Not Just Multiple

result.single() is not like SQLAlchemy's .scalar_one_or_none(). It raises ResultNotSingleError when the result contains zero records as well as when it contains two or more. Many models assume it returns None for zero results — it does not, by default.

# ❌ Common misconception — assuming single() returns None for zero results
def find_person(tx):
    result = tx.run("MATCH (p:Person {name: $name}) RETURN p", name="Nobody")
    record = result.single()    # raises ResultNotSingleError, not returns None
    if record is None:          # never reached
        return None
    return record["p"]

# ✅ strict=False — returns None for zero results, still raises for 2+
def find_person_safe(tx):
    result = tx.run("MATCH (p:Person {name: $name}) RETURN p", name="Alice")
    record = result.single(strict=False)   # None if 0 results, Record if 1, raises if 2+
    if record is None:
        return None
    return record["p"]

# ✅ In execute_query with result_transformer_ — same two-mode behaviour applies
# Default (strict=True): raises for 0 or 2+ results
record = driver.execute_query(
    "MATCH (p:Person {name: $name}) RETURN p",
    name="Alice",
    database_="neo4j",
    result_transformer_=neo4j.Result.single,   # raises if Alice not found
)

# For "find or None" semantics: write a custom transformer
def single_or_none(result):
    return result.single(strict=False)   # None if not found

record = driver.execute_query(
    "MATCH (p:Person {name: $name}) RETURN p",
    name="Alice",
    database_="neo4j",
    result_transformer_=single_or_none,
)

Summary of single() modes:

Result count single() (strict=True, default) single(strict=False)
0 records raises ResultNotSingleError returns None
1 record returns the Record returns the Record
2+ records raises ResultNotSingleError raises ResultNotSingleError

5. Managed Transactions (execute_read / execute_write)

Use when you need lazy streaming over large results, or when you want to run multiple queries inside one transaction.

with driver.session(database="neo4j") as session:

    # Read — routes to replicas; callback auto-retried on transient failure
    def get_people(tx):
        result = tx.run(
            "MATCH (p:Person) WHERE p.name STARTS WITH $prefix RETURN p.name AS name",
            prefix="Al",
        )
        # ✅ Consume the Result INSIDE the callback — it is invalid after the tx closes
        return [record["name"] for record in result]

    names = session.execute_read(get_people)

    # Write — routes to leader
    def create_person(tx):
        tx.run("CREATE (p:Person {name: $name})", name="Carol")

    session.execute_write(create_person)

Critical: Result Lifetime in Transaction Functions

Result is a lazy cursor backed by the open transaction. The transaction closes the moment the callback returns. Reading a Result after that raises ResultConsumedError.

# ❌ WRONG — leaks the Result out of the transaction
def bad_tx(tx):
    return tx.run("MATCH (p:Person) RETURN p.name AS name")
    # Result returned here; tx closes immediately after

result = session.execute_read(bad_tx)
list(result)   # raises ResultConsumedError — the cursor is already closed

# ✅ CORRECT — fully consume the result before the function returns
def good_tx(tx):
    result = tx.run("MATCH (p:Person) RETURN p.name AS name")
    return [record["name"] for record in result]   # consumed while tx is open

Multiple tx.run() Calls

If you call tx.run() a second time before the first Result is consumed, the driver automatically buffers the first result in memory before running the next query. This is safe, but means you can accidentally pull a large result into RAM. Consume each result before the next call when working with large datasets:

def multi_query_tx(tx):
    # First result — consume it immediately
    people = [r["name"] for r in tx.run("MATCH (p:Person) RETURN p.name AS name")]

    # Second query — safe, first result is already consumed
    for name in people:
        tx.run("MERGE (p:Person {name: $name})-[:VISITED]->(:City {name: 'London'})",
               name=name)

    return len(people)

Retry Safety

The callback may execute more than once on transient failures. Keep callbacks idempotent:

# ❌ Side effect runs on every retry
def dangerous_tx(tx):
    requests.post("https://api.example.com/notify")  # fires on every retry
    tx.run("CREATE (p:Person {name: $name})", name="Alice")

# ✅ Pure database work; HTTP call made only on confirmed success
def safe_tx(tx):
    tx.run("MERGE (p:Person {name: $name})", name="Alice")  # MERGE is idempotent

session.execute_write(safe_tx)
# Make the HTTP call here, outside the callback, once write is confirmed
requests.post("https://api.example.com/notify")

TransactionConfig — Timeouts & Metadata

Use @unit_of_work to attach a timeout and metadata to a managed transaction function:

from neo4j import unit_of_work

@unit_of_work(timeout=5.0, metadata={"app": "myService", "user": user_id})
def get_people(tx):
    return [r["name"] for r in tx.run("MATCH (p:Person) RETURN p.name AS name")]

session.execute_read(get_people)

The @unit_of_work decorator attaches the config to the function. Metadata appears in SHOW TRANSACTIONS and server query logs.

@unit_of_work Cannot Be Applied to Lambdas

Python does not allow decorating a lambda expression. This is a common trap — if you use a lambda for a simple transaction (which is convenient), you silently lose the ability to set a timeout or metadata:

# ❌ Syntax error — cannot decorate a lambda
session.execute_write(
    @unit_of_work(timeout=5.0)
    lambda tx: tx.run("MERGE (p:Person {name: $name})", name="Alice")
)

# ❌ Also wrong — @unit_of_work has no effect when called after the fact
fn = lambda tx: tx.run("MERGE (p:Person {name: $name})", name="Alice")
unit_of_work(timeout=5.0)(fn)   # wraps fn, but the session.execute_write call
session.execute_write(fn)       # still uses the original fn, not the wrapped version

# ✅ Correct — define a named function and decorate it
@unit_of_work(timeout=5.0, metadata={"app": "myService"})
def create_person(tx):
    tx.run("MERGE (p:Person {name: $name})", name="Alice")

session.execute_write(create_person)

# ✅ Also correct — assign the decorated version explicitly
create_person = unit_of_work(timeout=5.0)(lambda tx: tx.run(
    "MERGE (p:Person {name: $name})", name="Alice"
))
session.execute_write(create_person)

The practical rule: use named functions whenever you need a timeout or metadata; lambdas are fine for fire-and-forget callbacks where server-default timeouts are acceptable.


6. Implicit Transactions (session.run)

The lowest-level, least safe API. Not automatically retried. Use only for:

  • LOAD CSV imports (must use auto-commit transactions)
  • CALL { } IN TRANSACTIONS Cypher
  • Quick prototyping
with driver.session(database="neo4j") as session:
    result = session.run("CREATE (p:Person {name: $name})", name="Alice")
    summary = result.consume()   # ⚠ call consume() to ensure the tx commits
    print(summary.counters.nodes_created)

Commit timing is non-obvious: an implicit transaction commits at the latest when the session is closed, or immediately before the next query in the same session. Do not rely on this ordering — always call .consume() when you need a guaranteed commit before proceeding.

# ❌ Fragile — commit timing is undefined between the two runs
with driver.session(database="neo4j") as session:
    session.run("CREATE (p:Person {name: 'Alice'})")
    session.run("MATCH (p:Person {name: 'Alice'}) SET p.age = 30")  # may not see Alice

# ✅ Explicit consume ensures first tx is committed before the second runs
with driver.session(database="neo4j") as session:
    session.run("CREATE (p:Person {name: 'Alice'})").consume()
    session.run("MATCH (p:Person {name: 'Alice'}) SET p.age = 30")

Since the driver cannot determine whether session.run() requires read or write access, it defaults to write mode. If your implicit transaction is read-only, declare it:

with driver.session(database="neo4j", default_access_mode=neo4j.READ_ACCESS) as session:
    result = session.run("MATCH (p:Person) RETURN p.name AS name")

7. Explicit Transactions

Use when a transaction must span multiple functions or coordinate with external systems.

with driver.session(database="neo4j") as session:
    tx = session.begin_transaction()
    try:
        do_part_a(tx)
        do_part_b(tx)
        tx.commit()
    except Exception as e:
        tx.rollback()   # rollback can itself raise on network failure — see below
        raise

def do_part_a(tx):
    tx.run("CREATE (p:Person {name: $name})", name="Alice")

Rollback Can Raise

tx.rollback() is a network call. If the connection is broken, it raises. Don't let it swallow the original exception:

try:
    tx.commit()
except Exception as original:
    try:
        tx.rollback()
    except Exception as rollback_err:
        original.__suppress_context__ = False
        raise rollback_err from original   # chain both exceptions
    raise

Commit Uncertainty

If tx.commit() raises a network-level exception, the commit may or may not have succeeded on the server. Design writes to be idempotent with MERGE and unique constraints so retrying is always safe.


8. Async API

For asyncio applications. The async API mirrors the sync API exactly — replace GraphDatabase with AsyncGraphDatabase and await every call.

Async Driver Is Also a Singleton

The async driver is just as expensive to create as the sync driver — connection pool, DNS resolution, and TLS handshake all happen at construction time. Do not recreate it per request.

# ❌ Wrong — recreates the driver (and tears down the connection pool) on every call
async def handle_request(name: str):
    async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
        records, _, _ = await driver.execute_query("MATCH (p:Person {name: $name}) RETURN p",
                                                    name=name, database_="neo4j")
    return records

# ✅ Correct — driver created once at app startup, shared for the lifetime of the process
driver = AsyncGraphDatabase.driver(URI, auth=AUTH)

async def handle_request(name: str):
    records, _, _ = await driver.execute_query("MATCH (p:Person {name: $name}) RETURN p",
                                                name=name, database_="neo4j")
    return records

# Close at shutdown
await driver.close()

Web Framework Lifespan Pattern (FastAPI / Starlette)

from contextlib import asynccontextmanager
from fastapi import FastAPI
from neo4j import AsyncGraphDatabase

_driver = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global _driver
    _driver = AsyncGraphDatabase.driver(URI, auth=AUTH)
    await _driver.verify_connectivity()
    yield                       # app runs here
    await _driver.close()       # called on shutdown

app = FastAPI(lifespan=lifespan)

def get_driver():
    return _driver              # injected via FastAPI Depends()

@app.get("/people")
async def get_people():
    records, _, _ = await get_driver().execute_query(
        "MATCH (p:Person) RETURN p.name AS name",
        database_="neo4j",
        routing_=RoutingControl.READ,
    )
    return [r["name"] for r in records]

Basic Async Usage

import asyncio
from neo4j import AsyncGraphDatabase

URI  = "neo4j+s://xxx.databases.neo4j.io"
AUTH = ("neo4j", "password")

async def main():
    async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
        await driver.verify_connectivity()

        records, summary, keys = await driver.execute_query(
            "MATCH (p:Person) RETURN p.name AS name",
            database_="neo4j",
            routing_=RoutingControl.READ,
        )
        names = [r["name"] for r in records]
        print(names)

asyncio.run(main())

Async Managed Transactions

async def get_people(tx):
    result = await tx.run("MATCH (p:Person) RETURN p.name AS name")
    # ✅ Consume inside the async callback — await the collection
    return await result.values()   # returns list of lists [[name], [name], ...]

async def create_person(tx, name: str):
    await tx.run("MERGE (p:Person {name: $name})", name=name)

async def run_queries(driver):
    async with driver.session(database="neo4j") as session:
        people = await session.execute_read(get_people)
        await session.execute_write(create_person, "Carol")

Async Result Methods

Method Returns Notes
await result.values() list[list] Each inner list is one row of values
await result.data() list[dict] Each dict is one record keyed by column name
await result.single() one Record Raises if not exactly one record
await result.fetch(n) list[Record] Up to n records
await result.consume() ResultSummary Discards remaining, returns summary
async for record in result iterates Record Lazy streaming

⚠ Do Not Mix Sync and Async Drivers

Never use the sync GraphDatabase.driver in an asyncio context — it blocks the event loop. Always use AsyncGraphDatabase.driver in async code, even for a single query.

# ❌ Blocks the event loop — other coroutines cannot run during the query
async def bad():
    with GraphDatabase.driver(URI, auth=AUTH) as driver:
        records, _, _ = driver.execute_query("MATCH (p:Person) RETURN p")

# ✅ Async driver keeps the event loop free
async def good():
    async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
        records, _, _ = await driver.execute_query("MATCH (p:Person) RETURN p")

Concurrency with asyncio

async def run_concurrent(driver):
    # Run multiple queries concurrently with asyncio.gather
    results = await asyncio.gather(
        driver.execute_query("MATCH (a:Artist) RETURN a.name AS name", database_="neo4j"),
        driver.execute_query("MATCH (v:Venue) RETURN v.name AS name",  database_="neo4j"),
    )
    artists = [r["name"] for r in results[0].records]
    venues  = [r["name"] for r in results[1].records]

9. Error Handling

from neo4j.exceptions import (
    Neo4jError,
    DriverError,
    ServiceUnavailable,
    SessionExpired,
    TransientError,
    AuthError,
    ConstraintError,       # unique/existence constraint violation — most common app-level error
)

try:
    driver.execute_query("...", database_="neo4j")
except AuthError as e:
    print("Bad credentials:", e)
except ServiceUnavailable as e:
    print("No servers reachable:", e)
except TransientError as e:
    # execute_query and execute_read/write retry automatically;
    # this is only raised once retries are exhausted
    print(f"Transient error after retries: {e.code}")
except ConstraintError as e:
    # Unique or existence constraint violation — subclass of Neo4jError
    # Must be caught BEFORE the generic Neo4jError handler
    print(f"Constraint violation [{e.code}]: {e.message}")
except Neo4jError as e:
    # Server-side Cypher or other database error
    print(f"Neo4j error [{e.code}]: {e.message}")
    # GQL status code for stable programmatic handling:
    if e.gql_status == "42001":   # SyntaxError
        print("Fix the query syntax")

ConstraintError — Unique Constraint Violations

ConstraintError is the most common application-level database error and should almost always be handled explicitly. It is a subclass of Neo4jError, so a bare except Neo4jError will catch it — but silently, without letting you branch on "this specific node already exists" vs "something else went wrong".

from neo4j.exceptions import ConstraintError

def create_user(driver, username: str) -> bool:
    """Returns True if created, False if username already exists."""
    try:
        driver.execute_query(
            "CREATE (u:User {username: $username})",
            username=username,
            database_="neo4j",
        )
        return True
    except ConstraintError:
        # Neo4j raised Neo.ClientError.Schema.ConstraintValidationFailed
        # because a unique constraint on User.username was violated
        return False

# Constraint violation codes follow a predictable pattern:
# e.code == "Neo.ClientError.Schema.ConstraintValidationFailed"
# e.message contains the constraint name and offending value

Important catch ordering: because ConstraintError is a subclass of Neo4jError, always catch it before the generic Neo4jError handler, or it will be swallowed by the parent.

GQL status codes (stable across versions) are preferable to error message strings for programmatic handling. Use e.gql_status or e.find_by_gql_status("42001") rather than parsing e.message.


10. Record Access & Null Safety

Accessing Values

records, _, _ = driver.execute_query(
    "MATCH (p:Person) RETURN p.name AS name, p.age AS age",
    database_="neo4j",
)

record = records[0]

# By key — raises KeyError if key absent
name = record["name"]

# By index — 0-based, positional
name = record[0]

# .get() — returns None for both absent keys AND graph null (see below)
name = record.get("name")
name = record.get("name", "Unknown")   # with default

# .data() — converts the record to a dict keyed by column name
d = record.data()   # {"name": "Alice", "age": 30}

record.data() Is Not JSON-Serializable

.data() returns a dict, but the values are still driver objects for graph and temporal types — it does not recursively convert to Python primitives. Calling json.dumps(record.data()) will raise TypeError if any field contains a Node, Relationship, Path, or neo4j.time.* value.

# Query returning a node: MATCH (p:Person) RETURN p
record = records[0]

d = record.data()
# d == {"p": <Node element_id='4:...' labels=frozenset({'Person'}) properties={'name': 'Alice'}>}
# The value is a Node object, NOT a dict

import json
json.dumps(d)   # ❌ raises TypeError: Object of type Node is not JSON serializable

# ❌ Also fails for temporal types:
# d == {"created_at": <DateTime 2024-01-01T00:00:00.000000000+00:00>}
json.dumps(d)   # TypeError: Object of type DateTime is not JSON serializable

To get a fully JSON-safe dict, extract properties explicitly:

# ✅ For simple scalar fields — .data() is fine
records, _, _ = driver.execute_query(
    "MATCH (p:Person) RETURN p.name AS name, p.age AS age",  # scalar projections
    database_="neo4j",
)
d = records[0].data()     # {"name": "Alice", "age": 30} — safe to json.dumps()

# ✅ For node/relationship fields — extract properties manually
records, _, _ = driver.execute_query(
    "MATCH (p:Person) RETURN p",   # returns whole node
    database_="neo4j",
)
node = records[0]["p"]            # neo4j.graph.Node
props = dict(node)                # {"name": "Alice", "age": 30} — plain dict of properties

# ✅ For temporal types — convert explicitly before serializing
from neo4j.time import DateTime
dt = records[0]["created_at"]     # neo4j.time.DateTime
iso = str(dt)                     # "2024-01-01T00:00:00.000000000+00:00" — JSON-safe string
py_dt = dt.to_native()            # datetime.datetime — also JSON-safe via isoformat()

# ✅ General pattern: project scalars in Cypher rather than returning whole nodes
records, _, _ = driver.execute_query("""
    MATCH (p:Person)
    RETURN p.name AS name, p.age AS age, toString(p.created_at) AS created_at
    """,
    database_="neo4j",
)
# Now .data() is fully JSON-safe
safe_dicts = [r.data() for r in records]
json.dumps(safe_dicts)   # ✅ works

Null Safety — Absent Key vs Graph Null

These are two distinct situations that both surface as None when using .get() — which hides the difference:

Situation record["key"] record.get("key")
Key projected, value non-null the value the value
Key projected, value is graph null None None
Key absent (typo / not in RETURN) raises KeyError None
# ❌ Typo — silent None when using .get(), explodes with [] when you least expect it
record.get("nme")       # None — no error, typo goes undetected
record["nme"]           # KeyError — caught earlier

# When a column is from OPTIONAL MATCH, graph null gives None:
# Query: OPTIONAL MATCH (p)-[:LIVES_IN]->(c:City) RETURN p.name AS name, c.name AS city
city = record.get("city")    # None when no City matched — same as an absent key via .get()

The safest pattern for optional columns:

# Check key presence explicitly for truly optional columns:
if "city" in record.keys() and record["city"] is not None:
    city = record["city"]
else:
    city = "Unknown"

# Or rely on .get() with a sentinel and accept that it covers both cases:
city = record.get("city") or "Unknown"

Graph Types

# Node
node = record["p"]              # neo4j.graph.Node
node.element_id                 # stable identifier within this transaction
node.labels                     # frozenset({'Person'})
node["name"]                    # property access by key
dict(node)                      # all properties as plain dict

# Relationship
rel = record["r"]               # neo4j.graph.Relationship
rel.type                        # 'KNOWS'
rel.start_node.element_id
rel.end_node.element_id
rel["since"]                    # property

# ⚠ element_id is only guaranteed stable within one transaction.
# Do not use it to MATCH entities across separate transactions.

Temporal Types

The driver returns Neo4j temporal values as neo4j.time types, not native Python datetime. Conversion is lossy:

from neo4j.time import DateTime

dt = record["created_at"]       # neo4j.time.DateTime
type(dt)                        # <class 'neo4j.time.DateTime'>

# Convert to Python datetime — loses sub-microsecond precision and some timezone info
py_dt = dt.to_native()          # datetime.datetime

# Pass Python datetime as a parameter — driver converts automatically
from datetime import datetime, timezone
driver.execute_query(
    "CREATE (e:Event {at: $ts})",
    ts=datetime.now(timezone.utc),
    database_="neo4j",
)

11. Data Types & Parameter Mapping

Allowed Parameter Types

Only these types (and None) are valid as query parameter values:

Python type Cypher type
str String
int Integer
float Float
bool Boolean
list / tuple List
dict Map
None null
datetime.date Date
datetime.datetime DateTime
datetime.time Time
datetime.timedelta Duration
neo4j.time.* types Corresponding Cypher temporal

Custom classes, dataclasses, Pydantic models, and enums are not automatically serialised. Convert to dict or primitive values before passing as parameters.

from dataclasses import dataclass, asdict

@dataclass
class Person:
    name: str
    age: int

p = Person("Alice", 30)

# ❌ Fails — driver can't serialise a dataclass
driver.execute_query("CREATE (p:Person $props)", props=p, database_="neo4j")

# ✅ Convert to dict first
driver.execute_query("CREATE (p:Person $props)", props=asdict(p), database_="neo4j")
# or pass fields individually:
driver.execute_query("CREATE (p:Person {name: $name, age: $age})",
                     name=p.name, age=p.age, database_="neo4j")

12. Performance

Always Specify the Database

Omitting database_ causes the driver to resolve the home database with an extra network round-trip on every call.

# execute_query:
driver.execute_query("...", database_="neo4j")

# Session:
driver.session(database="neo4j")

Route Reads to Replicas

from neo4j import RoutingControl

# execute_query:
driver.execute_query("MATCH ...", routing_=RoutingControl.READ, database_="neo4j")

# Managed transaction — execute_read routes automatically:
session.execute_read(my_read_fn)

Batch Writes with UNWIND

Pass a list[dict] — each dict becomes one row in the Cypher loop. This is the only shape the driver serialises correctly for UNWIND.

# ❌ Wrong — passing a list of dataclass instances or custom objects
people = [Person("Alice", 30), Person("Bob", 25)]
driver.execute_query("UNWIND $people AS p MERGE (:Person {name: p.name})",
                     people=people, database_="neo4j")   # raises at runtime

# ✅ Correct — list of plain dicts
people = [
    {"name": "Alice", "age": 30, "city": "London"},
    {"name": "Bob",   "age": 25, "city": "Paris"},
]
driver.execute_query("""
    UNWIND $people AS person
    MERGE (p:Person {name: person.name})
    SET p.age = person.age
    MERGE (c:City {name: person.city})
    MERGE (p)-[:LIVES_IN]->(c)
    """,
    people=people,
    database_="neo4j",
)

Group Multiple Writes in One Transaction

# Bad: one transaction per item — high overhead
for item in items:
    driver.execute_query("CREATE (n:Node {id: $id})", id=item["id"], database_="neo4j")

# Good: all in one managed transaction
def bulk_create(tx):
    for item in items:
        tx.run("CREATE (n:Node {id: $id})", id=item["id"])

with driver.session(database="neo4j") as session:
    session.execute_write(bulk_create)

Lazy vs Eager Loading

# execute_query is always eager — fine for small/medium result sets
records, _, _ = driver.execute_query("MATCH (p:Person) RETURN p", database_="neo4j")

# For large results, iterate lazily inside a managed transaction
def process_large_result(tx):
    result = tx.run("MATCH (p:Person) RETURN p.name AS name")
    for record in result:          # streams one record at a time
        process(record["name"])    # don't build a list

with driver.session(database="neo4j") as session:
    session.execute_read(process_large_result)

Concurrency — The GIL Matters

The Python GIL means that threads do not give true parallelism for CPU-bound work, but they do overlap on I/O (network waits). For heavy parallel database work, asyncio with AsyncGraphDatabase is the better approach:

# Sync threading — helps with I/O overlap, but GIL limits true parallelism
from concurrent.futures import ThreadPoolExecutor

def query(name):
    records, _, _ = driver.execute_query(
        "MATCH (p:Person {name: $name}) RETURN p", name=name, database_="neo4j"
    )
    return records

with ThreadPoolExecutor(max_workers=10) as pool:
    results = list(pool.map(query, names))

# Async — preferred for high-concurrency read workloads
async def run_all(names):
    async with AsyncGraphDatabase.driver(URI, auth=AUTH) as driver:
        tasks = [
            driver.execute_query("MATCH (p:Person {name: $name}) RETURN p",
                                  name=name, database_="neo4j")
            for name in names
        ]
        return await asyncio.gather(*tasks)

Connection Pool Tuning

driver = GraphDatabase.driver(
    URI, auth=AUTH,
    max_connection_pool_size=50,              # default: 100
    connection_acquisition_timeout=30,        # seconds to wait for a free connection
    max_connection_lifetime=3600,             # seconds; recycle old connections
    connection_timeout=15,                    # seconds to establish a new connection
    keep_alive=True,                          # TCP keepalive
)

Session exhaustion: each open session holds a connection. If sessions are not closed promptly, the pool is exhausted and new sessions block for up to connection_acquisition_timeout seconds then raise ClientError. Always use sessions as context managers.


13. Causal Consistency & Bookmarks

Within a single session, queries are automatically causally chained — nothing to do.

Across sessions, use execute_query (auto-managed) or pass bookmarks explicitly:

from neo4j import Bookmarks

# Sessions A and B run concurrently; session C must see both writes
with driver.session(database="neo4j") as session_a:
    session_a.execute_write(lambda tx: tx.run("MERGE (p:Person {name: 'Alice'})"))
    bookmarks_a = session_a.last_bookmarks()

with driver.session(database="neo4j") as session_b:
    session_b.execute_write(lambda tx: tx.run("MERGE (p:Person {name: 'Bob'})"))
    bookmarks_b = session_b.last_bookmarks()

combined = Bookmarks.from_raw_values(
    *bookmarks_a.raw_values, *bookmarks_b.raw_values
)

# Session C waits until both Alice and Bob exist
with driver.session(database="neo4j", bookmarks=combined) as session_c:
    session_c.execute_write(
        lambda tx: tx.run("MATCH (a:Person {name:'Alice'}), (b:Person {name:'Bob'}) "
                          "MERGE (a)-[:KNOWS]->(b)")
    )

execute_query shares a BookmarkManager automatically across calls — usually all you need.


14. Repository Pattern — Recommended Structure

from neo4j import Driver, RoutingControl
from dataclasses import dataclass

@dataclass
class Person:
    name: str
    age: int

class PersonRepository:
    def __init__(self, driver: Driver, database: str = "neo4j"):
        self._driver = driver
        self._db = database

    def find_by_name_prefix(self, prefix: str) -> list[Person]:
        records, _, _ = self._driver.execute_query(
            "MATCH (p:Person) WHERE p.name STARTS WITH $prefix RETURN p.name AS name, p.age AS age",
            prefix=prefix,
            routing_=RoutingControl.READ,
            database_=self._db,
        )
        return [Person(name=r["name"], age=r["age"]) for r in records]

    def create(self, person: Person) -> None:
        self._driver.execute_query(
            "CREATE (p:Person {name: $name, age: $age})",
            name=person.name, age=person.age,
            database_=self._db,
        )

    def bulk_create(self, people: list[Person]) -> None:
        rows = [{"name": p.name, "age": p.age} for p in people]
        self._driver.execute_query(
            "UNWIND $rows AS row MERGE (p:Person {name: row.name}) SET p.age = row.age",
            rows=rows,
            database_=self._db,
        )

15. Quick Reference: Common Mistakes

Mistake Fix
f-string / format Cypher params Use $param placeholders always
Param name ending with _ Pass via parameters_={"name_": val} dict
Omitting database_ Always set — saves a round-trip every call
Returning Result from a tx function Consume to list / dict inside the function
Buffering large results before second tx.run() Consume eagerly or restructure to avoid parallel cursors
Side effects inside execute_read/write callbacks Move outside — callback may be retried
Passing dataclass/Pydantic objects as params Convert to dict or primitive fields first
Passing custom objects to UNWIND list[dict] is the only supported shape
Using record.get() to detect absent vs graph null They both return None; use "key" in record.keys() for absent key detection
Not calling .consume() after session.run() Commit timing undefined; call .consume() for guaranteed commit
Using sync driver inside asyncio Use AsyncGraphDatabase.driver — sync driver blocks the event loop
Recreating async driver per request Async driver is a singleton — create once at app startup
Not closing sessions Use with driver.session(...) as session — leaked sessions exhaust the pool
Creating a new Driver per request Create once at startup; share everywhere
One transaction per write in a loop Batch with UNWIND or group in one execute_write callback
MERGE for guaranteed-new data Use CREATEMERGE does an internal match first
Using execute_write for reads Use execute_read — routes to replicas
Calling json.dumps(record.data()) with graph/temporal fields Project scalar fields in Cypher or convert driver objects explicitly
result["name"] on an EagerResult Index into result.records first, or unpack: records, _, _ = ...
Assuming result.single() returns None for zero results It raises — use result.single(strict=False) for None-on-empty behaviour
@unit_of_work on a lambda Assign the decorated version: fn = unit_of_work(timeout=5)(lambda tx: ...)
Catching Neo4jError before ConstraintError Catch ConstraintError first — it's a subclass of Neo4jError
Weekly Installs
3
GitHub Stars
28
First Seen
1 day ago