Manage Data Sources connected to Kafka¶
This documentation explains how kafka connections are integrated in our datafile system using the CLI.
Kafka datafile settings¶
KAFKA_CONNECTION_NAME
: The name given to the connectionKAFKA_BOOTSTRAP_SERVERS
: The Kafka Bootstrap ServerKAFKA_KEY
: Key credentialsKAFKA_SECRET
: Secret credentialsKAFKA_TOPIC
: The topic nameKAFKA_GROUP_ID
: The group idKAFKA_AUTO_OFFSET_RESET
:latest
by default. Available options:latest
,earliest
KAFKA_STORE_RAW_VALUE
:'False'
by default. When set to'True'
, the kafka datasource will store the raw message value in addition to the columns extracted from the message value.
Project structure¶
When making use of Kafka connections, you have to add the Kafka connection settings in the .datasource
datafile:
ecommerce_data_project/
datasources/
my_kafka_datasource.datasource
another_datasource.datasource
endpoints/
pipes/
If you don’t have any connection created, you have to set manually connection credentials. It’s the same information needed when creating a connection from the UI.
SCHEMA >
`value` String,
`topic` LowCardinality(String),
`partition` Int16,
`offset` Int64,
`timestamp` DateTime,
`key` String
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id
You can use a connection that already exists by just setting the connection name:
SCHEMA >
`value` String,
`topic` LowCardinality(String),
`partition` Int16,
`offset` Int64,
`timestamp` DateTime,
`key` String
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id
Using INCLUDE to store connection settings¶
In order to avoid configuring the same connection settings among all the Kafka data files or leaking sensitive information, our template system allows you to include external files. You can use the INCLUDE
and reuse your credentials, there’s more information in the Advanced Templates documentation. We recommend the following structure:
ecommerce_data_project/
datasources/
connections/
my_connector_name.incl
my_kafka_datasource.datasource
another_datasource.datasource
endpoints/
pipes/
SCHEMA >
`value` String,
`topic` LowCardinality(String),
`partition` Int16,
`offset` Int64,
`timestamp` DateTime,
`key` String
ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"
INCLUDE "connections/my_connection_name.incl"
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id
Note: When pulling a Kafka Data Source, KAFKA_KEY
, KAFKA_SECRET
won’t be included for security reasons.