The CREATE CONNECTION command creates a reusable connection configuration that can be referenced when creating sources, sinks, or tables. Currently supported connection types are Kafka, Schema Registry, and Iceberg.
Added in v2.3: Support iceberg connection.

Syntax

CREATE CONNECTION [ IF NOT EXISTS ] connection_name
WITH (
    type = '<connector_type>',
    connection_parameter = SECRET `<secret_name>`,
    ...
);

Parameter

Parameter or clauseDescription
typeRequired. The type of connection. Supported values: kafka, schema_registry, iceberg.
secret_nameUse the SECRET keyword to reference secrets, allowing sensitive information to be stored securely and referenced in the connection configuration. Changes to the secret are automatically applied, so there’s no need to alter the connection.
Required
  • properties.bootstrap.server: The Kafka bootstrap server addresses.
OptionalFor SSL/SASL authentication:
  • properties.security.protocol
  • properties.ssl.endpoint.identification.algorithm
  • properties.ssl.ca.location
  • properties.ssl.ca.pem
  • properties.ssl.certificate.location
  • properties.ssl.certificate.pem
  • properties.ssl.key.location
  • properties.ssl.key.pem
  • properties.ssl.key.password
  • properties.sasl.mechanism
  • properties.sasl.username
  • properties.sasl.password
  • properties.sasl.kerberos.service.name
  • properties.sasl.kerberos.keytab
  • properties.sasl.kerberos.principal
  • properties.sasl.kerberos.kinit.cmd
  • properties.sasl.kerberos.min.time.before.relogin
  • properties.sasl.oauthbearer.config
For PrivateLink connection:
  • privatelink.targets
  • privatelink.endpoint
For AWS authentication:
  • aws.region
  • endpoint
  • aws.credentials.access_key_id
  • aws.credentials.secret_access_key
  • aws.credentials.session_token
  • aws.credentials.role.arn
  • aws.credentials.role.external_id
Property nameDescription
schema.registryThe base URL of the Schema Registry service.
schema.registry.usernameThe username for authenticating to the Schema Registry service.
schema.registry.passwordThe password for authenticating to the Schema Registry service.
Property nameDescription
catalog.typeType of the Iceberg catalog.
s3.regionAWS S3 region.
s3.endpointAWS S3 endpoint.
s3.access.keyAWS S3 access key.
s3.secret.keyAWS S3 secret key.
gcs.credentialGoogle Cloud Storage credential.
azblob.account_nameThe Azure Storage account name.
azblob.account_keyThe Azure Storage account key associated with the account name
azblob.endpoint_urlThe full endpoint URL of the Azure Blob service.
warehouse.pathPath of the Iceberg warehouse, applicable only in storage catalog.
catalog.nameName of the catalog, optional for storage catalog, required for others.
catalog.uriURI of the Iceberg catalog, applicable only in rest catalog.
catalog.credentialCredential for accessing Iceberg catalog, applicable only in rest catalog.
catalog.tokenToken for interacting with rest catalog server.
catalog.oauth2_server_uriOAuth2 token endpoint URI, applicable only in rest catalog.
catalog.scopeAdditional OAuth2 scope for accessing the Iceberg catalog, applicable only in rest catalog.
catalog.rest.signing_regionThe signing region when signing requests to the rest catalog.
catalog.rest.signing_nameThe signing name when signing requests to the rest catalog.
catalog.rest.signing_regionSpecify whether to use SigV4 for signing requests to the rest catalog.
s3.path.style.accessEnables path-style access for AWS S3.
catalog.jdbc.userJDBC user for catalog access.
catalog.jdbc.passwordJDBC password for catalog access.

Example

To connect to a schema registry:
CREATE CONNECTION sr_conn WITH (
  type = 'schema_registry',
  schema.registry = 'http://...',
  schema.registry.username = 'admin_user',
  schema.registry.password = 'schema_registry_password'
);
To create a Kafka connection that securely integrates secrets:
CREATE CONNECTION conn_kafka WITH (
    type = 'kafka',
    properties.bootstrap.server='<broker addr>', 
    properties.sasl.mechanism='PLAIN', 
    properties.security.protocol='SASL_PLAINTEXT', 
    properties.sasl.username=SECRET <username>, 
    properties.sasl.password=SECRET <password>
);
CREATE TABLE t WITH (
    connector = 'kafka', 
    topic = 'demo-topic', 
    connection = conn_kafka
) FORMAT PLAIN ENCODE AVRO (connection = sr_conn);
To create an Iceberg connection:
CREATE CONNECTION CONN WITH (
    type = 'iceberg',
    catalog.name = 'demo',
    catalog.type = 'storage',
    warehouse.path = 's3a://hummock001/iceberg-data',
    s3.endpoint = 'http://127.0.0.1:9301',
    s3.region = 'us-east-1',
    s3.access.key = 'hummockadmin',
    s3.secret.key = 'hummockadmin'
);
CREATE SINK sink1 from s1 WITH (
    connector = 'iceberg',
    type = 'upsert',
    database.name = 'demo_db',
    table.name = 'test_connection_table',
    connection = conn,
    create_table_if_not_exists = 'true',
    commit_checkpoint_interval = 1,
    primary_key = 'i1,i2',
);
To create a source, table or sink from the connection, the name of connector and connection must match those specified above. Also, the attributes defined in the connection and the source/table/sink cannot overlap:
CREATE SINK sink_kafka from data_table WITH (
  connector = 'kafka',
  connection = conn_kafka,
  topic = 'connection_ddl_1'
) FORMAT PLAIN ENCODE JSON (
  force_append_only='true'
);
If you are using a cloud-hosted source or sink, such as AWS MSK, there might be connectivity issues when your service is located in a different VPC from where you have deployed RisingWave. To establish a secure, direct connection between these two different VPCs and allow RisingWave to read consumer messages from the broker or send messages to the broker, use the AWS PrivateLink service. Follow the steps below to create an AWS PrivateLink connection.
  1. Create a target group for each broker. Set the target type as IP addresses and the protocol as TCP. Ensure that the VPC of the target group is the same as your cloud-hosted source.
  2. Create a Network Load Balancer. Ensure that it is enabled in the same subnets your broker sources are in and the Cross-zone load balancing is also enabled.
  3. Create a TCP listener for each MSK broker that corresponds to the target groups created. Ensure the ports are unique.
  4. Complete the health check for each target group.
  5. Create a VPC endpoint service associated with the Network Load Balancer created. Be sure to add the AWS principal of the account that will access the endpoint service to allow the service consumer to connect. See Manage permissions for more details.
  6. Use the CREATE CONNECTION command in RisingWave to create an AWS PrivateLink connection referencing the endpoint service created. Here is an example of creating an AWS PrivateLink connection.
CREATE CONNECTION connection_name WITH (
    type = 'privatelink',
    provider = 'aws',
    service.name = 'com.amazonaws.xyz.us-east-1.abc-xyz-0000'
);
  1. Create a source or sink with AWS PrivateLink connection.