Kafka connect overview and build a data pipeline using Mysql and Kafka Connect
Topics Covered
a) Intro about Kafka Connect.
b) Why Kafka Connect.
c) How Kafka Connect works.
a) Intro about Kafka Connect:
An open source framework, to support large scale streaming data.
It is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems, using so-called Connectors.
They are ready-to-use components, which can help us to import data from external systems into Kafka topics and export data from Kafka.
Kafka Connect works in 2 modes:
Source Connector : imports data from a Source to Kafka
Sink Connector : exports data from Kafka to a Sink
b) Why kafka Connect:
No coding required:
we can stream data from Kafka topics to a target store (Sink Connectors) or stream updates in a data store to Kafka topics (Source Connectors), without making any line of code, which we would need to maintain.
Delivery Guarantee:
Kafka Connect supports two different levels of delivery guarantees between a source and sink system: at least once, at most once, and will support exactly once in a future release when that capability is available natively within Kafka. The connector implementation drives the delivery guarantees offered.
Large data processing:
Easily process high volumes of data using multiple workers.
c) How Kafka Connect works:
Kafka Connect has three major models in its design.
Connector model:
A connector is defined by specifying a Connector configuration options to control what data is copied and how to format it.
Each Connector instance is responsible for defining and updating a set of Tasks that actually copy the data.
Worker model:
A Kafka Connect cluster consists of a set of Worker processes that are containers that execute Connectors and Tasks.
Workers automatically coordinate with each other to distribute work and provide scalability and fault tolerance.
Data model:
Kafka Connect supports pluggable Converters for storing this data in a variety of serialization formats.
Schemas are built-in, allowing important metadata about the format of messages to be propagated through complex data pipelines.
Defining Sink connectors:
curl -X POST http://localhost:8083/connectors/ -H 'Cache-Control: no-cache' -H 'Content-Type: application/json' -d '{
"name": "my_topic_o1",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "my_topic",
"connection.url": "jdbc:mysql://",
"connection.user": "XXXXXXXXXX",
"connection.password": "XXXXXXXXXXXXXX",
"auto.create": "false",
"insert.mode": "insert",
"table.name.format":"my_topic",
"errors.tolerance":"all"
}}
If you are using json data to push on kafka, then the kafka connector uses jsonconverter to parse the message.
{ "schema":{
"type":"struct",
"optional":false,
"version":1,
"fields":[
{
"field":"data",
"type":"string",
"optional":true
}
]
},
"payload":{
"data":"message"
}}
Create a table as defined in schema on the corresponding on DB to log the Kafka pushed messages.