Manage Data Sources connected to Kafka

This documentation explains how kafka connections are integrated in our datafile system using the CLI.

Kafka datafile settings

  • KAFKA_CONNECTION_NAME: The name given to the connection

  • KAFKA_BOOTSTRAP_SERVERS: The Kafka Bootstrap Server

  • KAFKA_KEY: Key credentials

  • KAFKA_SECRET: Secret credentials

  • KAFKA_TOPIC: The topic name

  • KAFKA_GROUP_ID: The group id

  • KAFKA_AUTO_OFFSET_RESET: latest by default. Available options: latest, earliest

Project structure

When making use of Kafka connections, you have to add the Kafka connection settings in the .datasource datafile:

File structure
ecommerce_data_project/
    datasources/
        my_kafka_datasource.datasource
        another_datasource.datasource
    endpoints/
    pipes/

If you don’t have any connection created, you have to set manually connection credentials. It’s the same information needed when creating a connection from the UI.

Kafka Data Source datafile with connection settings
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

You can use a connection that already exists by just setting the connection name:

Kafka Data Source datafile simplified
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

KAFKA my_connection_name
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Using INCLUDE to store connection settings

In order to avoid configuring the same connection settings among all the Kafka data files or leaking sensitive information, our template system allows you to include external files. You can use the INCLUDE and reuse your credentials, there’s more information in the Advanced Templates documentation. We recommend the following structure:

File structure using includes
ecommerce_data_project/
    datasources/
        connections/
          my_connector_name.incl
        my_kafka_datasource.datasource
        another_datasource.datasource
    endpoints/
    pipes/
Kafka Data Source datafile using includes
SCHEMA >
  `value` String,
  `topic` LowCardinality(String),
  `partition` Int16,
  `offset` Int64,
  `timestamp` DateTime,
  `key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

INCLUDE "connections/my_connection_name.incl"

KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

Note: When pulling a Kafka Data Source, KAFKA_KEY, KAFKA_SECRET won’t be included for security reasons.