Create a sink connector from Apache Kafka® to Google BigQuery
Set up the BigQuery sink connector to move data from Aiven for Apache Kafka® into BigQuery tables for analysis and storage.
See the full list of parameters in the GitHub documentation.
Prerequisites
- An Aiven for Apache Kafka service with Apache Kafka Connect enabled, or a dedicated Kafka Connect cluster
 - A Google Cloud project with:
- A service account and JSON service key
 - BigQuery API enabled
 - A BigQuery dataset to store data
 - Dataset access granted to the service account
 
 
Collect the following details:
- 
GOOGLE_CLOUD_PROJECT_NAME: Target Google Cloud project name - 
GOOGLE_CLOUD_SERVICE_KEY: Google Cloud service account key in JSON formatwarningWhen adding the service key to the connector configuration, provide it as an escaped string:
- Escape all 
"characters as\" - Escape any 
\nin theprivate_keyfield as\\n 
Example:
{\"type\": \"service_account\",\"project_id\": \"XXXXXX\", ...} - Escape all 
 - 
BIGQUERY_DATASET_NAME: BigQuery dataset name - 
TOPIC_LIST: Comma-separated list of Kafka topics to sink - 
Only if using Avro: Schema Registry connection details:
APACHE_KAFKA_HOSTSCHEMA_REGISTRY_PORTSCHEMA_REGISTRY_USERSCHEMA_REGISTRY_PASSWORD
 
Schema Registry connection details are available in the Overview page of your Kafka service, under Connection information, in the Schema Registry tab. Aiven uses Karapace as the Schema Registry.
Configure Google Cloud
- 
Create a service account and generate a JSON key: In the Google Cloud Console, create a service account and generate a JSON key. See Google’s guide for details. You’ll use this key in the connector configuration.
 - 
Enable the BigQuery API: In the API & Services dashboard, enable the BigQuery API if it isn’t already. See Google’s reference for details.
 - 
Create a BigQuery dataset: In the BigQuery Console, create a dataset or use an existing one. See Google’s guide. Select a region close to your Kafka service to reduce latency.
 - 
Grant dataset access to the service account: In the BigQuery Console, grant your service account the BigQuery Data Editor role on the dataset. See Google’s access control guide.
 
Write methods
- 
Google Cloud Storage (default): Uses GCS as an intermediate step. Supports all features, including delete and upsert. Parameters used only with delete or upsert:
intermediateTableSuffixkafkaKeyFieldName(required)mergeIntervalMs
 - 
Storage Write API: Streams data directly into BigQuery. Enable by setting
useStorageWriteApitotrue. This method provides lower latency for streaming workloads. Parameters used only with Storage Write API:bigQueryPartitionDecoratorcommitIntervalenableBatchMode
warningDo not use the Storage Write API with
deleteEnabledorupsertEnabled. 
If useStorageWriteApi is not set, the connector uses the standard Google Cloud Storage
API by default.
Create a BigQuery sink connector configuration
Define the connector configuration in a JSON file, for example bigquery_sink.json.
{
  "name": "CONNECTOR_NAME",
  "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
  "topics": "TOPIC_LIST",
  "project": "GOOGLE_CLOUD_PROJECT_NAME",
  "defaultDataset": ".*=BIGQUERY_DATASET_NAME",
  "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
  "schemaRegistryClient.basic.auth.credentials.source": "URL",
  "schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "key.converter.basic.auth.credentials.source": "USER_INFO",
  "key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
  "autoCreateTables": "true",
  "allBQFieldsNullable": "true",
  "keySource": "JSON",
  "keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}
To use the Storage Write API instead of the default GCS method, add
"useStorageWriteApi": "true" to the configuration.
The configuration file includes:
- 
name: Connector name - 
connector.class: Must becom.wepay.kafka.connect.bigquery.BigQuerySinkConnector - 
topics: Comma-separated list of Kafka topics to write to BigQuery - 
project: Target Google Cloud project name - 
defaultDataset: BigQuery dataset name, prefixed with.*=noteBy default, table names in BigQuery match the Kafka topic names. Use the Kafka Connect
RegexRoutertransformation to rename tables if needed. 
If your messages are in Avro format, also set these parameters:
schemaRegistryLocation: Karapace schema registry endpointkey.converterandvalue.converter: Set toio.confluent.connect.avro.AvroConverterfor Avrokey.converter.schema.registry.urlandvalue.converter.schema.registry.url: Schema registry URL (https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT)key.converter.schema.registry.basic.auth.user.infoandvalue.converter.schema.registry.basic.auth.user.info: Schema registry credentials in the formatSCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD
For table management, you can set:
autoCreateTables: Create target BigQuery tables automatically if they do not existallBQFieldsNullable: Set new BigQuery fields toNULLABLEinstead ofREQUIRED
For schema evolution, you can also set:
allowNewBigQueryFields: Add new fields from Kafka schemas to BigQuery tablesallowBigQueryRequiredFieldRelaxation: RelaxREQUIREDfields back toNULLABLE
Automatic schema evolution reduces control over table definitions and may cause errors if message schemas change in ways BigQuery cannot support.
For authentication, set:
keySource: Format of the Google Cloud service account key, set toJSONkeyfile: Google Cloud service account key as an escaped string
Create a BigQuery sink connector
- Aiven Console
 - Aiven CLI
 
- Go to the Aiven Console.
 - Select your Aiven for Apache Kafka® or Kafka Connect service.
 - Click Connectors.
 - Click Create connector (enable Kafka Connect if required).
 - Select Google BigQuery Sink from the list.
 - On the Common tab, click Edit in the Connector configuration box.
 - Paste the contents of 
bigquery_sink.json. Replace placeholders with actual values. - Click Apply, then Create connector.
 - Verify the connector status on the Connectors page.
 - Check that data appears in the BigQuery dataset. By default, table names match topic
names. Use the Kafka Connect 
RegexRoutertransformation to rename tables if required. 
Run the following command to create the connector:
avn service connector create SERVICE_NAME @bigquery_sink.json
Parameters:
SERVICE_NAME: Name of your Aiven for Apache Kafka service@bigquery_sink.json: Path to the connector configuration file
Examples
Sink a JSON topic
Suppose you have a topic iot_measurements containing JSON messages with an inline schema:
{
  "schema": {
    "type": "struct",
    "fields": [
      { "type": "int64", "field": "iot_id" },
      { "type": "string", "field": "metric" },
      { "type": "int32", "field": "measurement" }
    ]
  },
  "payload": { "iot_id": 1, "metric": "Temperature", "measurement": 14 }
}
Connector configuration:
{
  "name": "iot_sink",
  "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
  "topics": "iot_measurements",
  "project": "GOOGLE_CLOUD_PROJECT_NAME",
  "defaultDataset": ".*=BIGQUERY_DATASET_NAME",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "autoCreateTables": "true",
  "keySource": "JSON",
  "keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}
Parameters:
topics: Source topicvalue.converter: JSON converter without schema
Inline JSON schemas increase message size and add processing overhead. For efficiency, prefer Avro format with Karapace Schema Registry.
Sink an Avro topic
Suppose you have a topic students with messages in Avro format and schemas stored in Karapace.
Connector configuration:
{
  "name": "students_sink",
  "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
  "topics": "students",
  "project": "GOOGLE_CLOUD_PROJECT_NAME",
  "defaultDataset": ".*=BIGQUERY_DATASET_NAME",
  "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
  "schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "key.converter.basic.auth.credentials.source": "USER_INFO",
  "key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
  "autoCreateTables": "true",
  "keySource": "JSON",
  "keyfile": "GOOGLE_CLOUD_SERVICE_KEY"
}
Parameters:
topics: Source topickey.converterandvalue.converter: Enable Avro parsing with Karapace schema registry
Sink an Avro topic using Storage Write API
To stream Avro messages directly into BigQuery with lower latency.
Connector configuration:
{
  "name": "students_sink_write_api",
  "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
  "topics": "students",
  "project": "GOOGLE_CLOUD_PROJECT_NAME",
  "defaultDataset": ".*=BIGQUERY_DATASET_NAME",
  "schemaRetriever": "com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever",
  "schemaRegistryLocation": "https://SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD@APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "key.converter.basic.auth.credentials.source": "USER_INFO",
  "key.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "https://APACHE_KAFKA_HOST:SCHEMA_REGISTRY_PORT",
  "value.converter.basic.auth.credentials.source": "USER_INFO",
  "value.converter.schema.registry.basic.auth.user.info": "SCHEMA_REGISTRY_USER:SCHEMA_REGISTRY_PASSWORD",
  "autoCreateTables": "true",
  "keySource": "JSON",
  "keyfile": "GOOGLE_CLOUD_SERVICE_KEY",
  "useStorageWriteApi": "true",
  "commitInterval": "1000"
}
Parameters:
useStorageWriteApi: Enables direct streaming into BigQuerycommitInterval: Flush interval for Storage Write API batches (in ms)