Configure the Iceberg sink connector with a PostgreSQL JDBC catalog
The JDBC catalog stores Iceberg table metadata in a PostgreSQL® database. Use it to integrate Apache Kafka®, Amazon S3, and a PostgreSQL-based metadata store.
Prerequisites
- Aiven for Apache Kafka® service with Apache Kafka Connect enabled, or a dedicated Aiven for Apache Kafka Connect® service
 - Apache Kafka topic that contains the source data
 - Apache Kafka control topic used to coordinate Iceberg table commits
(for example, 
control-iceberg) - Aiven for PostgreSQL® service or another PostgreSQL instance that is publicly accessible
 - Amazon S3 bucket for storing Iceberg table data
 - IAM user or role with permissions to read and write to the S3 bucket
 - AWS access key ID and secret access key for the IAM credentials
 
Create an Iceberg sink connector configuration
To configure the Iceberg sink connector with the JDBC catalog, define a JSON file that uses PostgreSQL for metadata and Amazon S3 for storage.
Loading worker properties is not supported. Use iceberg.kafka.* properties instead.
{
  "name": "CONNECTOR_NAME",
  "connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
  "tasks.max": "2",
  "topics": "KAFKA_TOPICS",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": "false",
  "value.converter.schemas.enable": "false",
  "consumer.override.auto.offset.reset": "earliest",
  "iceberg.kafka.auto.offset.reset": "earliest",
  "iceberg.tables": "DATABASE.TABLE",
  "iceberg.tables.auto-create-enabled": "true",
  "iceberg.control.commit.interval-ms": "1000",
  "iceberg.control.commit.timeout-ms": "20000",
  "iceberg.catalog.type": "jdbc",
  "iceberg.catalog.uri": "jdbc:postgresql://HOST:PORT/DATABASE?user=USERNAME&password=PASSWORD&ssl=require",
  "iceberg.catalog.warehouse": "s3://BUCKET_NAME",
  "iceberg.catalog.client.region": "AWS_REGION",
  "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
  "iceberg.catalog.jdbc.useSSL": "true",
  "iceberg.catalog.jdbc.verifyServerCertificate": "true",
  "iceberg.catalog.s3.access-key-id": "AWS_ACCESS_KEY_ID",
  "iceberg.catalog.s3.secret-access-key": "AWS_SECRET_ACCESS_KEY",
  "iceberg.catalog.s3.path-style-access": "true",
  "iceberg.kafka.bootstrap.servers": "KAFKA_HOST:PORT",
  "iceberg.kafka.security.protocol": "SSL",
  "iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
  "iceberg.kafka.ssl.keystore.password": "KEYSTORE_PASSWORD",
  "iceberg.kafka.ssl.keystore.type": "PKCS12",
  "iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
  "iceberg.kafka.ssl.truststore.password": "TRUSTSTORE_PASSWORD",
  "iceberg.kafka.ssl.key.password": "KEY_PASSWORD"
}
Parameters:
name: Name of the connectorconnector.class: Connector implementation classtopics: Comma-separated list of Apache Kafka topics with source dataiceberg.catalog.type: Catalog type. Usejdbcfor PostgreSQLiceberg.catalog.uri: JDBC connection string for PostgreSQLiceberg.catalog.warehouse: S3 bucket URI for table storageiceberg.catalog.client.region: AWS region where the S3 bucket is located. Required if no region is set in the environment or system propertiesiceberg.tables: Target Iceberg table inDATABASE.TABLEformat.iceberg.tables.auto-create-enabled: Automatically create tables if they do not existiceberg.catalog.io-impl: File I/O implementation for S3iceberg.catalog.s3.access-key-idandsecret-access-key: AWS credentials for S3iceberg.kafka.*: Apache Kafka security settings
For the full list of parameters, see the Iceberg Kafka Connect configuration.
Create the connector
- Aiven Console
 - Aiven CLI
 
- Access the Aiven Console.
 - Select your Aiven for Apache Kafka or Aiven for Apache Kafka Connect service.
 - Click Connectors.
 - Click Create connector if Apache Kafka Connect is enabled on the service. If not, click Enable connector on this service.
 
Alternatively, to enable connectors:
- 
Click Service settings in the sidebar.
 - 
In the Service management section, click Actions > Enable Kafka connect.
 - 
In the sink connectors list, select Iceberg Sink Connector, and click Get started.
 - 
On the Iceberg Sink Connector page, go to the Common tab.
 - 
Locate the Connector configuration text box and click Edit.
 - 
Paste the configuration from your
iceberg_sink_connector.jsonfile into the text box. - 
Click Create connector.
 - 
Verify the connector status on the Connectors page.
 
To create the Iceberg sink connector using the Aiven CLI, run:
avn service connector create SERVICE_NAME @iceberg_sink_connector.json
Parameters:
SERVICE_NAME: Name of your Aiven for Apache Kafka® service.@iceberg_sink_connector.json: Path to the JSON configuration file.
Example
This example creates an Iceberg sink connector with PostgreSQL as the catalog:
{
"name": "iceberg_sink_jdbc",
"connector.class": "org.apache.iceberg.connect.IcebergSinkConnector",
"tasks.max": "2",
"topics": "test-topic",
"iceberg.catalog.type": "jdbc",
"iceberg.catalog.uri": "jdbc:postgresql://postgres.example.com:5432/iceberg_db?user=iceberg_user&password=secret&ssl=require",
"iceberg.catalog.warehouse": "s3://my-s3-bucket",
"iceberg.catalog.client.region": "us-west-2",
"iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.s3.access-key-id": "your-access-key-id",
"iceberg.catalog.s3.secret-access-key": "your-secret-access-key",
"iceberg.catalog.s3.path-style-access": "true",
"iceberg.catalog.jdbc.useSSL": "true",
"iceberg.catalog.jdbc.verifyServerCertificate": "true",
"iceberg.tables": "mydatabase.mytable",
"iceberg.tables.auto-create-enabled": "true",
"iceberg.control.commit.interval-ms": "1000",
"iceberg.control.commit.timeout-ms": "20000",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"iceberg.kafka.bootstrap.servers": "kafka.example.com:9092",
"iceberg.kafka.security.protocol": "SSL",
"iceberg.kafka.ssl.keystore.location": "/run/aiven/keys/public.keystore.p12",
"iceberg.kafka.ssl.keystore.password": "password",
"iceberg.kafka.ssl.keystore.type": "PKCS12",
"iceberg.kafka.ssl.truststore.location": "/run/aiven/keys/public.truststore.jks",
"iceberg.kafka.ssl.truststore.password": "password",
"iceberg.kafka.ssl.key.password": "password"
}
Related pages