Introduction

Apache Kafka is one of the core technologies in Enterprise Architectures these days. More than 60% of Fortune 500 companies are using Kafka. The technology has evolved a lot over the last decade i.e. from Pub/Sub to a Complete Event Streaming Platform. The very first requirement to work in an event driven system is to ingest data/events. For the same purpose, Kafka Connect was added to the Kafka Ecosystem in 2015. This enables us to integrate external systems without writing a single line of code (if connector already exists). Apart

Learning Objectives

  1. What is Kafka Connect?
  2. Why Kafka Connect
  3. Kafka Connect Architecture
  4. Modes of Operation
  5. Deciding number of tasks
  6. Installing a Connector
  7. Working with HDFS Sink Connector

What is Kafka Connect?

Kafka Connect is an open source framework which works as a centralized data hub for integrating data between databases, file systems, key-value stores, search indexes, and many more. It was built as another layer on the core platform of Apache Kafka®, for supporting large scale streaming data. It works in the two modes i.e.

  • Import from any external system (called Source) like mysql, hdfs, etc to Kafka broker cluster
  • Export from Kafka cluster to any external system (called Sink) like hdfs, s3, etc

Why Kafka Connect?

  • In an ETL pipeline Kafka Connect take cares of E(Extract) and L(Load) part irrespective of Processing engine
  • Using Kafka Connect one can reuse a piece of code for different processing engines
  • It ensures Decoupling and Reusability
  • It reduces the effort in implementing any Stream processing framework by taking up how-to-ingest-data responsibility
  • It guarantees the best parallelism, fault tolerance, delivery semantics, ordering, etc. as per Connector’s underlying implementation

Kafka Connect Architecture

  • Kafka Connect is a separate Cluster
  • So with-in Kafka Connect Cluster there are workers, connector plugins, connector and tasks
  • A cluster can have multiple workers and worker runs on the cluster only
  • Each Worker contains one or many Connector Tasks
  • Tasks for a Connector are started by the Worker only
  • Tasks in Kafka Connect act as Producer or Consumer depending on type of Connector
  • Tasks are automatically load balanced if there is any failure as shown in the picture below

(Source: Confluent)

Modes of Operation

There are two modes of Operations:

1. Standalone Mode:

  • In Standalone mode, a single process executes all connectors and their associated tasks
  • It is easy to test Kafka Connect in standalone mode
  • there is no automated fault-tolerance out-of-the-box when a connector goes offline

2. Distributed Mode:

  • Distributed mode runs Connect workers on one or multiple nodes
  • When running on multiple nodes, the coordination mechanics to work in parallel does not require an orchestration manager such as YARN

Deciding number of Tasks

A Tasks is a thread in Kafka Connect Cluster that is responsible for consuming or producing data from or To Kafka Topic.

Many factors come into play when deciding number of Tasks e.g.

  1. Cores availability on Kafka Connect Cluster - Depending on the available resources, the maximum number of tasks can be created
  2. Limitations of Source/Sink - Some sources/sinks e.g. RDBMS don’t support too many concurrent connections therefore it is recommended to keep number of tasks to a lower number to ensure that the Source/Sink don’t go down
  3. Partitions - Number of Partitions indicate Degree of Parallelism in Kafka Topic. Having Max Tasks greater than Number of Partitions will cause Connect to launch the number of tasks as the number of Partitions. When more partitions are added in future (but still less than max tasks) then it will create additional tasks to match the new number of partitions

Installing Connectors

  • Download the JAR file (usually from Confluent Hub but perhaps built manually yourself from elsewhere)
  • Place it in a folder on your Kafka Connect worker
  • Locate your Kafka Connect worker’s configuration (.properties) file, and open it in an editor
  • Search for plugin.path setting, and amend or create it to include the folder(s) in which you connectors reside

Working with HDFS Sink Connector

Install the Confluent Platform using one of our Quickstart guide for installing Confluent 6.x platform.

After installing the Confluent Platform 6.x, Execute below command to start confluent services:

cd confluent-6.1.1
./bin/confluent local services start

Now create a topic named as test_hdfs:

./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_hdfs

After this, produce a test Avro data to the test_hdfs topic in Kafka:

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test_hdfs --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"meetup","type":"string"}]}'

Paste the following values:

{"meetup":"datacouch01"}

{"meetup":"datacouch02"}

{"meetup":"datacouch03"}

{"meetup":"dataocuh04"}

Open a new terminal and install the hdfs-sink connector:

confluent-hub install confluentinc/kafka-connect-hdfs:latest

It is going to ask for a lot of options, do the following:

  1. Press 1, then
  2. Press Y for each other option

Now restart the Confluent platform using following commands:

./bin/confluent local services stop
./bin/confluent local services start

Type the hostname command for checking your hostname.

Open the share/confluent-hub-components/confluentinc-kafka-connect-hdfs/etc/quickstart-hdfs.properties using any editor and change the hdfs.url to <hostname>:8020/user/dataproc/



Let’s load the HDFS-Sink connector:

confluent local services connect connector load hdfs-sink --config share/confluent-hub-components/confluentinc-kafka-connect-hdfs/etc/quickstart-hdfs.properties

Open a new terminal, you can confirm that the connector is in a RUNNING state using following command:

confluent local services connect connector status hdfs-sink

After this let’s validate that the avro data is in HDFS or not:

hdfs dfs -ls /user/dataproc/topics/test_hdfs/partition=0

Conclusion

Kafka Connect is an open source framework which works as a centralized data hub for integrating data between databases, file systems, key-value stores, search indexes, and many more. It can Import from any external system (called Source) like mysql, hdfs, etc to Kafka broker cluster and export from Kafka cluster to any external system (called Sink) like hdfs, s3, etc.