io-connectors
I/O Connectors in Apache Beam
Overview
I/O connectors enable reading from and writing to external data sources. Beam provides 51+ Java I/O connectors and several Python connectors.
Java I/O Connectors Location
sdks/java/io/
Available Connectors
| Category | Connectors |
|---|---|
| Cloud Storage | google-cloud-platform (BigQuery, Bigtable, Spanner, Pub/Sub, GCS), amazon-web-services2, azure, azure-cosmos |
| Databases | jdbc, mongodb, cassandra, hbase, redis, neo4j, clickhouse, influxdb, singlestore, elasticsearch |
| Messaging | kafka, pulsar, rabbitmq, amqp, jms, mqtt, solace |
| File Formats | parquet, csv, json, xml, thrift, iceberg |
| Other | snowflake, splunk, cdap, debezium, hadoop-format, kudu, solr, tika |
Testing I/O Connectors
Unit Tests
./gradlew :sdks:java:io:kafka:test
./gradlew :sdks:java:io:jdbc:test
Integration Tests
On Direct Runner
./gradlew :sdks:java:io:google-cloud-platform:integrationTest
With Custom GCP Settings
./gradlew :sdks:java:io:google-cloud-platform:integrationTest \
-PgcpProject=<project> \
-PgcpTempRoot=gs://<bucket>/path
With Explicit Pipeline Options
./gradlew :sdks:java:io:jdbc:integrationTest \
-DbeamTestPipelineOptions='["--runner=TestDirectRunner"]'
Integration Test Framework
Located at it/ directory:
it/common/- Common test utilitiesit/google-cloud-platform/- GCP-specific test infrastructureit/jdbc/- JDBC test infrastructureit/kafka/- Kafka test infrastructureit/testcontainers/- Testcontainers support
Writing Integration Tests
Basic Structure
@RunWith(JUnit4.class)
public class MyIOIT {
@Rule public TestPipeline readPipeline = TestPipeline.create();
@Rule public TestPipeline writePipeline = TestPipeline.create();
@Test
public void testWriteAndRead() {
// Write data
writePipeline.apply(Create.of(testData))
.apply(MyIO.write().to(destination));
writePipeline.run().waitUntilFinish();
// Read and verify
PCollection<String> results = readPipeline.apply(MyIO.read().from(destination));
PAssert.that(results).containsInAnyOrder(expectedData);
readPipeline.run().waitUntilFinish();
}
}
Using TestPipeline
@Rule public TestPipeline pipeline = TestPipeline.create();
TestPipeline:
- Blocks on run by default (on TestDataflowRunner)
- Has 15-minute default timeout
- Reads options from
beamTestPipelineOptionssystem property
GCP I/O Connectors
BigQuery
// Read
pipeline.apply(BigQueryIO.readTableRows().from("project:dataset.table"));
// Write
data.apply(BigQueryIO.writeTableRows()
.to("project:dataset.table")
.withSchema(schema)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
Pub/Sub
// Read
pipeline.apply(PubsubIO.readStrings().fromTopic("projects/project/topics/topic"));
// Write
data.apply(PubsubIO.writeStrings().to("projects/project/topics/topic"));
Cloud Storage (TextIO)
// Read
pipeline.apply(TextIO.read().from("gs://bucket/path/*.txt"));
// Write
data.apply(TextIO.write().to("gs://bucket/output").withSuffix(".txt"));
Kafka Connector
// Read
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class));
// Write
data.apply(KafkaIO.<String, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("topic")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
JDBC Connector
// Read
pipeline.apply(JdbcIO.<Row>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("org.postgresql.Driver", "jdbc:postgresql://host/db"))
.withQuery("SELECT * FROM table"));
// Write
data.apply(JdbcIO.<Row>write()
.withDataSourceConfiguration(config)
.withStatement("INSERT INTO table VALUES (?, ?)"));
Python I/O Location
sdks/python/apache_beam/io/
Common Python I/Os
textio- Text filesfileio- General file operationsavroio- Avro filesparquetio- Parquet filesgcp/- GCP connectors (BigQuery, Pub/Sub, Datastore, etc.)
Cross-language I/O
Beam supports using I/O connectors from one SDK in another via the expansion service.
# Start Java expansion service
./gradlew :sdks:java:io:expansion-service:runExpansionService
Creating New Connectors
Key components:
- Source - Reads data (bounded or unbounded)
- Sink - Writes data
- Read/Write transforms - User-facing API
More from apache/beam
gradle-build
Guides understanding and using the Gradle build system in Apache Beam. Use when building projects, understanding dependencies, or troubleshooting build issues.
48java-development
Guides Java SDK development in Apache Beam, including building, testing, running examples, and understanding the project structure. Use when working with Java code in sdks/java/, runners/, or examples/java/.
27python-development
Guides Python SDK development in Apache Beam, including environment setup, testing, building, and running pipelines. Use when working with Python code in sdks/python/.
25license-compliance
Ensures all new files include proper Apache 2.0 license headers. Use when creating any new file in the Apache Beam repository.
24ci-cd
Guides understanding and working with Apache Beam's CI/CD system using GitHub Actions. Use when debugging CI failures, understanding test workflows, or modifying CI configuration.
23contributing
Guides the contribution workflow for Apache Beam, including creating PRs, issue management, code review process, and release cycles. Use when contributing code, creating PRs, or understanding the contribution process.
23