Apache Kafka

1.) Overview


Apache Kafka is a distributed streaming platform. It is used for building real-time data platforms and streaming applications. In this blog, we will discuss how to install Kafka and work on some basic use cases.

This article was created using Apache Kafka version 2.12-2.1.0.

2.) Installation

Download and unpack Kafka from https://kafka.apache.org/downloads. 

2.1) Configuration

config/zookeeper.properties
  • Set the dataDir /tmp//kafka/zookeeper
config/server.properties
  • log.dirs=/tmp/kafka/logs
  • zookeeper.connect=localhost:2181
  • listeners=PLAINTEXT://localhost:9092
To test Kafka run the following commands.
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties

The second command will start a new command prompt and you should see some logs in zookeeper.

3.) Kafka Topics

Create a topic:
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic clicks

View the topics:
>bin/kafka-topics.sh --list --zookeeper localhost:2181

Delete the topic (execute at the end):
>bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic clicks

4.) Sending and Receiving Messages

Send messages:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
-Enter some messages here and leave the command open

Receive the messages:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic clicks --from-beginning
-You should be able to receive the messages that haven't been read yet

5.) Multi Broker

Make 2 copies of config/server.properties. Set the following properties:

config/server-1.properties
  • broker.id=1
  • listeners=PLAINTEXT://:9093
  • log.dir=/tmp/kafka-logs-1
config/server-2.properties
  • broker.id=2
  • listeners=PLAINTEXT://:9094
  • log.dir=/tmp/kafka-logs-2
Start the 2 new broker in different terminals
>bin/kafka-server-start.sh config/server.1.properties
>bin/kafka-server-start.sh config/server.2.properties

Create a new topic that will be replicated on the original node plus the two new.
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic clicks-replicated

You can run the view topics command again (above).

We can also describe the newly created topic as we specified:
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks-replicated
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks

6.) Fault Tolerance

Now, we can send some messages to our replicated topic:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks-replicated

Read the message in the replicated topic:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic clicks-replicated

Now, shut down the second node by ctrl + c in the command or close it.

Again, we can describe the replicated topic.
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks-replicated

We can the messages again from the beginning (original and 1st node, node that the second node is off).
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic clicks-replicated
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic clicks-replicated

*Close all the terminals except zookeeper and the original topic using port 9092.

7.) Import / export data from and to a file using a connector

Kafka can also read and write from and to a file. Let's try that by using the default configurations.
  • connect-standalone.properties - is basically server.properties
  • connect-file-source.properties - specify the source file to read (default: test.txt, note topic value here)
  • connect-file-sink.properties - where to write (default: test.sink.txt)
Run the connector
>bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
-Create a test.txt file where you run the connector and add some text to it. Make sure that you end with a newline. Otherwise, the last line will not be read.

Notice the log we should have something like:
[2019-01-13 16:17:09,799] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)
[2019-01-13 16:17:10,838] INFO Cluster ID: MYm1bMttRdCqG-njYXeO-w (org.apache.kafka.clients.Metadata:285)

There should be a newly created file with the same content named: test.sink.txt.

Note that you can still read the messages using the consumer. Topic=connect-test is from connect-file-source.properties:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Modify the test.txt, adding "Hello World!" and your consumer should be able to pickup the message.
>{"schema":{"type":"string","optional":false},"payload":"Hello World!"}

*Terminate the consumer but leave server0 open.

8.) Streaming using WordCount app

Now let's create a new file with the following content:
>echo -e "The quick brown fox jumps over the lazy dog.\nThe quick brown fox jumps over the lazy dog." > file-input.txt

Create a new topic:
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input

Send file data to the topic, it could come from a stream.
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input < file-input.txt

Consume the input:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input --from-beginning

We can use the WordCount app package with Kafka to stream the data from the file we just created.
>bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

Consume the messages using String and Long deserializers:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

You should have an output similar to:
the 1
quick 1
brown 1
fox 1
jumps 1
over 1
the 2
lazy 1
dog. 1
the 3
quick 2
brown 2
fox 2
jumps 2
over 2
the 4
lazy 2
dog. 2

0 nhận xét:

Đăng nhận xét