Въведение в съединителите Kafka

1. Общ преглед

Apache Kafka® е разпределена платформа за стрийминг. В предишен урок обсъдихме как да внедрим потребителите и производителите на Kafka, използвайки Spring.

В този урок ще научим как да използваме съединители Kafka.

Ще разгледаме:

 • Различни видове съединители Kafka
 • Характеристики и режими на Kafka Connect
 • Конфигурация на конектори, използвайки файлове със свойства, както и REST API

2. Основи на Kafka Connect и Kafka Connectors

Kafka Connect е рамка за свързване на Kafka с външни системи като бази данни, хранилища ключ-стойност, индекси за търсене и файлови системи, използвайки така наречените Connectors .

Съединителите Kafka са готови за използване компоненти, които могат да ни помогнат да импортираме данни от външни системи в теми на Kafka и да експортираме данни от теми на Kafka във външни системи . Можем да използваме съществуващите реализации на съединители за общи източници на данни и мивки или да реализираме свои собствени съединители.

A конектор източник събира данни от една система. Системите източници могат да бъдат цели бази данни, таблици на потоци или посредници на съобщения. Изходният конектор също може да събира показатели от сървърите на приложения в темите на Kafka, правейки данните достъпни за обработка на потоци с ниска латентност.

A конектор мивка доставя данни от Кафка теми в други системи, които биха могли да бъдат индекси като Elasticsearch, периодични системи като Hadoop, или каквато и да е база данни.

Някои конектори се поддържат от общността, докато други се поддържат от Confluent или неговите партньори. Наистина можем да намерим съединители за повечето популярни системи, като S3, JDBC и Cassandra, само за да назовем само няколко.

3. Характеристики

Функциите на Kafka Connect включват:

 • Рамка за свързване на външни системи с Kafka - опростява разработването, внедряването и управлението на конектори
 • Разпределени и самостоятелни режими - помага ни да разгърнем големи клъстери, като използваме разпределената природа на Kafka, както и настройки за разработка, тестване и малки производствени внедрения
 • REST интерфейс - можем да управляваме конектори, използвайки REST API
 • Автоматично управление на офсета - Kafka Connect ни помага да се справим с процеса на фиксиране на офсета, което ни спестява проблема с ръчното внедряване на тази склонна към грешки част от разработката на конектора
 • Разпределена и мащабируема по подразбиране - Kafka Connect използва съществуващия протокол за управление на групи; можем да добавим още работници, за да мащабираме клъстер Kafka Connect
 • Стрийминг и пакетна интеграция - Kafka Connect е идеалното решение за свързване на поточни и пакетни системи за данни във връзка със съществуващите възможности на Kafka
 • Трансформации - те ни позволяват да правим прости и леки модификации на отделни съобщения

4. Настройка

Вместо да използваме обикновената дистрибуция на Kafka, ще изтеглим Confluent Platform, дистрибуция на Kafka, предоставена от Confluent, Inc., компанията, която стои зад Kafka. Confluent Platform се предлага с някои допълнителни инструменти и клиенти в сравнение с обикновения Kafka, както и някои допълнителни предварително изградени конектори.

За нашия случай е достатъчно изданието с отворен код, което може да бъде намерено на сайта на Confluent.

5. Бърз старт Kafka Connect

За начало, ще обсъдим на принципа на Кафка Connect, като се използва най-основната си конектори, които са на файла източник конектора и файловата мивка конектора .

Удобно е, че Confluent Platform се предлага с двата тези конектора, както и с референтни конфигурации.

5.1. Конфигурация на конектора на източника

За свързващия източник референтната конфигурация е достъпна на адрес $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties :

name=local-file-source connector.class=FileStreamSource tasks.max=1 topic=connect-test file=test.txt

Тази конфигурация има някои свойства, които са общи за всички съединители на източника:

 • name е посочено от потребителя име за екземпляра на конектора
 • connector.class указва класа на внедряване, основно вида на съединителя
 • task.max указва колко екземпляра на нашия източник на конектор трябва да работят паралелно и
 • topic определя темата, към която конекторът трябва да изпрати изхода

В този случай имаме и специфичен за съединителя атрибут:

 • file дефинира файла, от който съединителят трябва да прочете входа

За да работи това, нека създадем основен файл с малко съдържание:

echo -e "foo\nbar\n" > $CONFLUENT_HOME/test.txt

Имайте предвид, че работната директория е $ CONFLUENT_HOME.

5.2. Конфигурация на съединител за мивка

За нашия конектор за мивка ще използваме референтната конфигурация на $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties :

name=local-file-sink connector.class=FileStreamSink tasks.max=1 file=test.sink.txt topics=connect-test

Логично, той съдържа абсолютно същите параметри, въпреки че този път connector.class определя изпълнението на съединителя на мивката, а файлът е мястото, където съединителят трябва да напише съдържанието.

5.3. Конфигурация на работника

И накрая, трябва да конфигурираме работника Connect, който ще интегрира двата ни конектора и ще свърши работата по четене от конектора на източника и запис в конектора за мивка.

За това можем да използваме $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties :

bootstrap.servers=localhost:9092 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 plugin.path=/share/java

Note that plugin.path can hold a list of paths, where connector implementations are available

As we'll use connectors bundled with Kafka, we can set plugin.path to $CONFLUENT_HOME/share/java. Working with Windows, it might be necessary to provide an absolute path here.

For the other parameters, we can leave the default values:

 • bootstrap.servers contains the addresses of the Kafka brokers
 • key.converter and value.converter define converter classes, which serialize and deserialize the data as it flows from the source into Kafka and then from Kafka to the sink
 • key.converter.schemas.enable and value.converter.schemas.enable are converter-specific settings
 • offset.storage.file.filename is the most important setting when running Connect in standalone mode: it defines where Connect should store its offset data
 • offset.flush.interval.ms defines the interval at which the worker tries to commit offsets for tasks

And the list of parameters is quite mature, so check out the official documentation for a complete list.

5.4. Kafka Connect in Standalone Mode

And with that, we can start our first connector setup:

$CONFLUENT_HOME/bin/connect-standalone \ $CONFLUENT_HOME/etc/kafka/connect-standalone.properties \ $CONFLUENT_HOME/etc/kafka/connect-file-source.properties \ $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

First off, we can inspect the content of the topic using the command line:

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning

As we can see, the source connector took the data from the test.txt file, transformed it into JSON, and sent it to Kafka:

{"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}

And, if we have a look at the folder $CONFLUENT_HOME, we can see that a file test.sink.txt was created here:

cat $CONFLUENT_HOME/test.sink.txt foo bar

As the sink connector extracts the value from the payload attribute and writes it to the destination file, the data in test.sink.txt has the content of the original test.txt file.

Now let's add more lines to test.txt.

When we do, we see that the source connector detects these changes automatically.

We only have to make sure to insert a newline at the end, otherwise, the source connector won't consider the last line.

At this point, let's stop the Connect process, as we'll start Connect in distributed mode in a few lines.

6. Connect's REST API

Until now, we made all configurations by passing property files via the command line. However, as Connect is designed to run as a service, there is also a REST API available.

By default, it is available at //localhost:8083. A few endpoints are:

 • GET /connectors – returns a list with all connectors in use
 • GET /connectors/{name} – returns details about a specific connector
 • POST /connectors – creates a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
 • GET /connectors/{name}/status – returns the current status of the connector – including if it is running, failed or paused – which worker it is assigned to, error information if it has failed, and the state of all its tasks
 • DELETE /connectors/{name} – deletes a connector, gracefully stopping all tasks and deleting its configuration
 • GET /connector-plugins – returns a list of connector plugins installed in the Kafka Connect cluster

The official documentation provides a list with all endpoints.

We'll use the REST API for creating new connectors in the following section.

7. Kafka Connect in Distributed Mode

The standalone mode works perfectly for development and testing, as well as smaller setups. However, if we want to make full use of the distributed nature of Kafka, we have to launch Connect in distributed mode.

By doing so, connector settings and metadata are stored in Kafka topics instead of the file system. As a result, the worker nodes are really stateless.

7.1. Starting Connect

A reference configuration for distributed mode can be found at $CONFLUENT_HOME/etc/kafka/connect-distributed.properties.

Parameters are mostly the same as for standalone mode. There are only a few differences:

 • group.id defines the name of the Connect cluster group. The value must be different from any consumer group ID
 • offset.storage.topic, config.storage.topic and status.storage.topic define topics for these settings. For each topic, we can also define a replication factor

Again, the official documentation provides a list with all parameters.

We can start Connect in distributed mode as follows:

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

7.2. Adding Connectors Using the REST API

Now, compared to the standalone startup command, we didn't pass any connector configurations as arguments. Instead, we have to create the connectors using the REST API.

To set up our example from before, we have to send two POST requests to //localhost:8083/connectors containing the following JSON structs.

First, we need to create the body for the source connector POST as a JSON file. Here, we'll call it connect-file-source.json:

{ "name": "local-file-source", "config": { "connector.class": "FileStreamSource", "tasks.max": 1, "file": "test-distributed.txt", "topic": "connect-distributed" } }

Note how this looks pretty similar to the reference configuration file we used the first time.

And then we POST it:

curl -d @"$CONFLUENT_HOME/connect-file-source.json" \ -H "Content-Type: application/json" \ -X POST //localhost:8083/connectors

Then, we'll do the same for the sink connector, calling the file connect-file-sink.json:

{ "name": "local-file-sink", "config": { "connector.class": "FileStreamSink", "tasks.max": 1, "file": "test-distributed.sink.txt", "topics": "connect-distributed" } }

And perform the POST like before:

curl -d @$CONFLUENT_HOME/connect-file-sink.json \ -H "Content-Type: application/json" \ -X POST //localhost:8083/connectors

If needed, we can verify, that this setup is working correctly:

$CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-distributed --from-beginning {"schema":{"type":"string","optional":false},"payload":"foo"} {"schema":{"type":"string","optional":false},"payload":"bar"}

And, if we have a look at the folder $CONFLUENT_HOME, we can see that a file test-distributed.sink.txt was created here:

cat $CONFLUENT_HOME/test-distributed.sink.txt foo bar

After we tested the distributed setup, let's clean up, by removing the two connectors:

curl -X DELETE //localhost:8083/connectors/local-file-source curl -X DELETE //localhost:8083/connectors/local-file-sink

8. Transforming Data

8.1. Supported Transformations

Transformations enable us to make simple and lightweight modifications to individual messages.

Kafka Connect supports the following built-in transformations:

 • InsertField – Add a field using either static data or record metadata
 • ReplaceField – Filter or rename fields
 • MaskField – Replace a field with the valid null value for the type (zero or an empty string, for example)
 • HoistField – Wrap the entire event as a single field inside a struct or a map
 • ExtractField – Extract a specific field from struct and map and include only this field in the results
 • SetSchemaMetadata – Modify the schema name or version
 • TimestampRouter – Modify the topic of a record based on original topic and timestamp
 • RegexRouter – Modify the topic of a record based on original topic, a replacement string, and a regular expression

A transformation is configured using the following parameters:

 • transforms – A comma-separated list of aliases for the transformations
 • transforms.$alias.type – Class name for the transformation
 • transforms.$alias.$transformationSpecificConfig – Configuration for the respective transformation

8.2. Applying a Transformer

To test some transformation features, let's set up the following two transformations:

 • First, let's wrap the entire message as a JSON struct
 • After that, let's add a field to that struct

Before applying our transformations, we have to configure Connect to use schemaless JSON, by modifying the connect-distributed.properties:

key.converter.schemas.enable=false value.converter.schemas.enable=false

After that, we have to restart Connect, again in distributed mode:

$CONFLUENT_HOME/bin/connect-distributed $CONFLUENT_HOME/etc/kafka/connect-distributed.properties

Again, we need to create the body for the source connector POST as a JSON file. Here, we'll call it connect-file-source-transform.json.

Besides the already known parameters, we add a few lines for the two required transformations:

{ "name": "local-file-source", "config": { "connector.class": "FileStreamSource", "tasks.max": 1, "file": "test-transformation.txt", "topic": "connect-transformation", "transforms": "MakeMap,InsertSource", "transforms.MakeMap.type": "org.apache.kafka.connect.transforms.HoistField$Value", "transforms.MakeMap.field": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value": "test-file-source" } }

After that, let's perform the POST:

curl -d @$CONFLUENT_HOME/connect-file-source-transform.json \ -H "Content-Type: application/json" \ -X POST //localhost:8083/connectors

Let's write some lines to our test-transformation.txt:

Foo Bar

If we now inspect the connect-transformation topic, we should get the following lines:

{"line":"Foo","data_source":"test-file-source"} {"line":"Bar","data_source":"test-file-source"}

9. Using Ready Connectors

After using these simple connectors, let's have a look at more advanced ready-to-use connectors, and how to install them.

9.1. Where to Find Connectors

Pre-built connectors are available from different sources:

 • A few connectors are bundled with plain Apache Kafka (source and sink for files and console)
 • Some more connectors are bundled with Confluent Platform (ElasticSearch, HDFS, JDBC, and AWS S3)
 • Also check out Confluent Hub, which is kind of an app store for Kafka connectors. The number of offered connectors is growing continuously:
  • Confluent connectors (developed, tested, documented and are fully supported by Confluent)
  • Certified connectors (implemented by a 3rd party and certified by Confluent)
  • Community-developed and -supported connectors
 • Beyond that, Confluent also provides a Connectors Page, with some connectors which are also available at the Confluent Hub, but also with some more community connectors
 • And finally, there are also vendors, who provide connectors as part of their product. For example, Landoop provides a streaming library called Lenses, which also contains a set of ~25 open source connectors (many of them also cross-listed in other places)

9.2. Installing Connectors from Confluent Hub

The enterprise version of Confluent provides a script for installing Connectors and other components from Confluent Hub (the script is not included in the Open Source version). If we're using the enterprise version, we can install a connector using the following command:

$CONFLUENT_HOME/bin/confluent-hub install confluentinc/kafka-connect-mqtt:1.0.0-preview

9.3. Installing Connectors Manually

If we need a connector, which is not available on Confluent Hub or if we have the Open Source version of Confluent, we can install the required connectors manually. For that, we have to download and unzip the connector, as well as move the included libs to the folder specified as plugin.path.

For each connector, the archive should contain two folders that are interesting for us:

 • The lib folder contains the connector jar, for example, kafka-connect-mqtt-1.0.0-preview.jar, as well as some more jars required by the connector
 • The etc folder holds one or more reference config files

We have to move the lib folder to $CONFLUENT_HOME/share/java, or whichever path we specified as plugin.path in connect-standalone.properties and connect-distributed.properties. In doing so, it might also make sense to rename the folder to something meaningful.

We can use the config files from etc either by referencing them while starting in standalone mode, or we can just grab the properties and create a JSON file from them.

10. Conclusion

In this tutorial, we had a look at how to install and use Kafka Connect.

Разгледахме видове съединители, както източник, така и мивка. Също така разгледахме някои функции и режими, в които Connect може да работи. След това прегледахме трансформатори. И накрая научихме къде да вземем и как да инсталираме персонализирани съединители.

Както винаги, конфигурационните файлове могат да бъдат намерени в GitHub.