neo4j-spark-skill
Neo4j Connector for Apache Spark
When to Use
- Reading Neo4j nodes/relationships into Spark DataFrames
- Writing Spark DataFrames to Neo4j as nodes or relationships
- Databricks notebooks connecting to Neo4j
- Delta Lake → Neo4j ingestion pipelines
- Partitioned parallel reads from large Neo4j graphs
When NOT to Use
- Python bolt driver / execute_query →
neo4j-driver-python-skill - Cypher query writing →
neo4j-cypher-skill - GDS graph algorithms →
neo4j-gds-skill - Spring Boot + Neo4j →
neo4j-spring-data-skill
Version Matrix
| Connector | Spark | Scala | Databricks Runtime | Neo4j |
|---|---|---|---|---|
| 5.4.x | 3.3, 3.4, 3.5 | 2.12, 2.13 | 12.2, 13.3, 14.3 LTS | 4.4, 5.x, 2025.x |
Maven artifact (Scala 2.12, Spark 3):
org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3
Scala 2.13 variant:
org.neo4j:neo4j-connector-apache-spark_2.13:5.4.2_for_spark_3
Setup
Standalone Spark (PySpark)
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate())
Standalone Spark (Scala)
val spark = SparkSession.builder
.appName("neo4j-app")
.config("spark.jars.packages",
"org.neo4j:neo4j-connector-apache-spark_2.12:5.4.2_for_spark_3")
.config("neo4j.url", "neo4j+s://xxxx.databases.neo4j.io")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "neo4j")
.config("neo4j.authentication.basic.password", "password")
.getOrCreate()
Databricks — Cluster Installation
- Cluster → Libraries → Install New → Maven
- Search:
org.neo4j:neo4j-connector-apache-spark_2.12— match Scala version to runtime - Cluster → Advanced Options → Spark tab — add config:
neo4j.url neo4j+s://xxxx.databases.neo4j.io neo4j.authentication.type basic neo4j.authentication.basic.username {{secrets/neo4j/username}} neo4j.authentication.basic.password {{secrets/neo4j/password}} - Use Single user access mode (Unity Catalog shared mode not supported)
Databricks — Secrets (preferred over plaintext)
# Store credentials once:
# databricks secrets create-scope --scope neo4j
# databricks secrets put --scope neo4j --key url
# databricks secrets put --scope neo4j --key username
# databricks secrets put --scope neo4j --key password
neo4j_url = dbutils.secrets.get(scope="neo4j", key="url")
neo4j_user = dbutils.secrets.get(scope="neo4j", key="username")
neo4j_pass = dbutils.secrets.get(scope="neo4j", key="password")
spark.conf.set("neo4j.url", neo4j_url)
spark.conf.set("neo4j.authentication.type", "basic")
spark.conf.set("neo4j.authentication.basic.username", neo4j_user)
spark.conf.set("neo4j.authentication.basic.password", neo4j_pass)
Key Configuration Options
| Option | Description | Default |
|---|---|---|
neo4j.url |
Bolt/Neo4j URI | — (required) |
neo4j.authentication.type |
none, basic, kerberos, bearer |
basic |
neo4j.authentication.basic.username |
Username | driver default |
neo4j.authentication.basic.password |
Password | driver default |
neo4j.authentication.bearer.token |
Bearer token | — |
neo4j.database |
Target database | driver default |
neo4j.access.mode |
read or write |
read |
neo4j.encryption.enabled |
TLS (ignored with +s/+ssc URI) |
false |
Reading from Neo4j
Three mutually exclusive read modes — use exactly one per .read() call.
Label scan (nodes)
# PySpark
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load())
df.printSchema()
df.show()
// Scala
val df = spark.read
.format("org.neo4j.spark.DataSource")
.option("labels", ":Person")
.load()
Multi-label filter (AND): .option("labels", ":Person:Employee")
Result includes <id> (internal Neo4j id) and <labels> columns.
Cypher query read
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("query", "MATCH (p:Person)-[:ACTED_IN]->(m:Movie) RETURN p.name AS actor, m.title AS movie, m.year AS year")
.load())
Use explicit RETURN aliases — they become DataFrame column names. No SKIP/LIMIT in query (connector handles pagination).
Relationship scan
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", ":Customer")
.option("relationship.target.labels", ":Product")
.load())
Result columns: <rel.id>, <rel.type>, <source.*>, <target.*>, plus relationship properties.
Read partition tuning
df = (spark.read.format("org.neo4j.spark.DataSource")
.option("labels", ":Transaction")
.option("partitions", "10") # parallel partitions (default: 1)
.option("batch.size", "5000") # rows per partition batch (default: 5000)
.option("schema.flatten.limit", "100") # rows sampled for schema inference
.load())
Full read options reference: references/read-patterns.md
Writing to Neo4j
SaveMode
| SaveMode | Cypher | Requires |
|---|---|---|
Append |
CREATE |
nothing extra |
Overwrite |
MERGE |
node.keys (nodes) or *.node.keys (rels) |
ErrorIfExists |
CREATE + error if exists |
— |
Always create uniqueness constraints on node.keys properties before writing in Overwrite mode.
Write nodes — Append (CREATE)
from pyspark.sql import Row
people = spark.createDataFrame([
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
])
(people.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("labels", ":Person")
.save())
Write nodes — Overwrite (MERGE)
(people.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Person")
.option("node.keys", "name") # comma-separated; df_col:node_prop if names differ
.save())
node.keys with rename: .option("node.keys", "df_col:node_property,id:personId")
Write nodes — Scala
import org.apache.spark.sql.SaveMode
peopleDF.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("labels", ":Person")
.option("node.keys", "name")
.save()
Write relationships
Use coalesce(1) before relationship writes to avoid deadlocks.
rel_df = spark.createDataFrame([
{"cust_id": "C1", "prod_id": "P1", "qty": 3},
{"cust_id": "C2", "prod_id": "P2", "qty": 1},
])
(rel_df.coalesce(1)
.write.format("org.neo4j.spark.DataSource")
.mode("Append")
.option("relationship", "BOUGHT")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Customer")
.option("relationship.source.save.mode", "Match") # require existing nodes
.option("relationship.source.node.keys", "cust_id:id")
.option("relationship.target.labels", ":Product")
.option("relationship.target.save.mode", "Match")
.option("relationship.target.node.keys", "prod_id:id")
.option("relationship.properties", "qty:quantity")
.save())
relationship.source.save.mode / relationship.target.save.mode:
Match— find existing nodes (fail if missing)Append— always CREATE new nodesOverwrite— MERGE nodes
Full write options reference: references/write-patterns.md
Databricks — Delta Lake → Neo4j Pipeline
# Read from Delta table (Unity Catalog or DBFS)
delta_df = spark.read.format("delta").table("catalog.schema.customers")
# Optional: filter/transform in Spark before writing
filtered = delta_df.filter("active = true").select("customer_id", "name", "region")
# Write to Neo4j
(filtered.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Customer")
.option("node.keys", "customer_id")
.option("batch.size", "20000")
.save())
Pipeline pattern for relationships — load both node sets first, then write edges:
# Step 1: ensure nodes exist
customers_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Customer").option("node.keys", "customer_id").save()
products_df.write.format("org.neo4j.spark.DataSource").mode("Overwrite") \
.option("labels", ":Product").option("node.keys", "product_id").save()
# Step 2: write relationships (single partition)
orders_df.coalesce(1).write.format("org.neo4j.spark.DataSource").mode("Append") \
.option("relationship", "ORDERED") \
.option("relationship.save.strategy", "keys") \
.option("relationship.source.labels", ":Customer") \
.option("relationship.source.save.mode", "Match") \
.option("relationship.source.node.keys", "customer_id:customer_id") \
.option("relationship.target.labels", ":Product") \
.option("relationship.target.save.mode", "Match") \
.option("relationship.target.node.keys", "product_id:product_id") \
.save()
Write Performance Tuning
| Scenario | Recommendation |
|---|---|
| Node writes (no lock contention) | repartition(N) where N ≤ Neo4j CPU cores |
| Relationship writes (lock risk) | coalesce(1) — single partition |
| Large datasets | batch.size 10000–20000 (adjust to heap) |
| MERGE-heavy loads | Add uniqueness constraint on node.keys properties first |
# Aggressive batch — monitor Neo4j heap; OOM risk above 50k
(big_df.repartition(8)
.write.format("org.neo4j.spark.DataSource")
.mode("Overwrite")
.option("labels", ":Event")
.option("node.keys", "event_id")
.option("batch.size", "20000")
.save())
Common Errors
| Error | Cause | Fix |
|---|---|---|
ClassNotFoundException: org.neo4j.spark.DataSource |
JAR not on classpath | Add spark.jars.packages or attach library |
| Deadlock on relationship write | Multiple partitions locking nodes | coalesce(1) before write |
| Duplicate nodes on Overwrite | No uniqueness constraint on keys | CREATE CONSTRAINT ON (n:Label) ASSERT n.prop IS UNIQUE |
| OOM on Neo4j side | batch.size too large |
Reduce to 5000–10000; check heap |
Schema all string columns |
No APOC, schema not sampled | Set schema.flatten.limit higher; or use query mode with explicit types |
Access mode is read error on write |
Session opened in read mode | Remove neo4j.access.mode or set to write |
| Databricks Shared cluster fails | Unity Catalog shared mode unsupported | Switch to Single User access mode |
Checklist
- Connector JAR version matches Spark version suffix (
_for_spark_3) - Scala version in artifact matches cluster runtime (2.12 vs 2.13)
- Credentials in Databricks secrets or env vars — not hardcoded
-
node.keysset when usingOverwritemode - Uniqueness constraint created on
node.keysproperties before MERGE writes -
coalesce(1)applied before relationship writes -
batch.sizesized to Neo4j heap (start 5000, tune up) - Delta Lake → Neo4j: nodes written before relationships
-
querymode: noSKIP/LIMITin Cypher (connector paginates internally) - Databricks: Single User access mode (not Shared)