io-connectors
SKILL.md
I/O Connectors in Apache Beam
Overview
I/O connectors enable reading from and writing to external data sources. Beam provides 51+ Java I/O connectors and several Python connectors.
Java I/O Connectors Location
sdks/java/io/
Available Connectors
| Category | Connectors |
|---|---|
| Cloud Storage | google-cloud-platform (BigQuery, Bigtable, Spanner, Pub/Sub, GCS), amazon-web-services2, azure, azure-cosmos |
| Databases | jdbc, mongodb, cassandra, hbase, redis, neo4j, clickhouse, influxdb, singlestore, elasticsearch |
| Messaging | kafka, pulsar, rabbitmq, amqp, jms, mqtt, solace |
| File Formats | parquet, csv, json, xml, thrift, iceberg |
| Other | snowflake, splunk, cdap, debezium, hadoop-format, kudu, solr, tika |
Testing I/O Connectors
Unit Tests
./gradlew :sdks:java:io:kafka:test
./gradlew :sdks:java:io:jdbc:test
Integration Tests
On Direct Runner
./gradlew :sdks:java:io:google-cloud-platform:integrationTest
With Custom GCP Settings
./gradlew :sdks:java:io:google-cloud-platform:integrationTest \
-PgcpProject=<project> \
-PgcpTempRoot=gs://<bucket>/path
With Explicit Pipeline Options
./gradlew :sdks:java:io:jdbc:integrationTest \
-DbeamTestPipelineOptions='["--runner=TestDirectRunner"]'
Integration Test Framework
Located at it/ directory:
it/common/- Common test utilitiesit/google-cloud-platform/- GCP-specific test infrastructureit/jdbc/- JDBC test infrastructureit/kafka/- Kafka test infrastructureit/testcontainers/- Testcontainers support
Writing Integration Tests
Basic Structure
@RunWith(JUnit4.class)
public class MyIOIT {
@Rule public TestPipeline readPipeline = TestPipeline.create();
@Rule public TestPipeline writePipeline = TestPipeline.create();
@Test
public void testWriteAndRead() {
// Write data
writePipeline.apply(Create.of(testData))
.apply(MyIO.write().to(destination));
writePipeline.run().waitUntilFinish();
// Read and verify
PCollection<String> results = readPipeline.apply(MyIO.read().from(destination));
PAssert.that(results).containsInAnyOrder(expectedData);
readPipeline.run().waitUntilFinish();
}
}
Using TestPipeline
@Rule public TestPipeline pipeline = TestPipeline.create();
TestPipeline:
- Blocks on run by default (on TestDataflowRunner)
- Has 15-minute default timeout
- Reads options from
beamTestPipelineOptionssystem property
GCP I/O Connectors
BigQuery
// Read
pipeline.apply(BigQueryIO.readTableRows().from("project:dataset.table"));
// Write
data.apply(BigQueryIO.writeTableRows()
.to("project:dataset.table")
.withSchema(schema)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
Pub/Sub
// Read
pipeline.apply(PubsubIO.readStrings().fromTopic("projects/project/topics/topic"));
// Write
data.apply(PubsubIO.writeStrings().to("projects/project/topics/topic"));
Cloud Storage (TextIO)
// Read
pipeline.apply(TextIO.read().from("gs://bucket/path/*.txt"));
// Write
data.apply(TextIO.write().to("gs://bucket/output").withSuffix(".txt"));
Kafka Connector
// Read
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class));
// Write
data.apply(KafkaIO.<String, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("topic")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class));
JDBC Connector
// Read
pipeline.apply(JdbcIO.<Row>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create("org.postgresql.Driver", "jdbc:postgresql://host/db"))
.withQuery("SELECT * FROM table"));
// Write
data.apply(JdbcIO.<Row>write()
.withDataSourceConfiguration(config)
.withStatement("INSERT INTO table VALUES (?, ?)"));
Python I/O Location
sdks/python/apache_beam/io/
Common Python I/Os
textio- Text filesfileio- General file operationsavroio- Avro filesparquetio- Parquet filesgcp/- GCP connectors (BigQuery, Pub/Sub, Datastore, etc.)
Cross-language I/O
Beam supports using I/O connectors from one SDK in another via the expansion service.
# Start Java expansion service
./gradlew :sdks:java:io:expansion-service:runExpansionService
Creating New Connectors
Key components:
- Source - Reads data (bounded or unbounded)
- Sink - Writes data
- Read/Write transforms - User-facing API
Weekly Installs
19
Repository
apache/beamGitHub Stars
8.5K
First Seen
Mar 1, 2026
Security Audits
Installed on
opencode19
gemini-cli19
codebuddy19
github-copilot19
codex19
kimi-cli19