Hiển thị các bài đăng có nhãn bigdata. Hiển thị tất cả bài đăng
Hiển thị các bài đăng có nhãn bigdata. Hiển thị tất cả bài đăng

Apache Kafka

1.) Overview


Apache Kafka is a distributed streaming platform. It is used for building real-time data platforms and streaming applications. In this blog, we will discuss how to install Kafka and work on some basic use cases.

This article was created using Apache Kafka version 2.12-2.1.0.

2.) Installation

Download and unpack Kafka from https://kafka.apache.org/downloads. 

2.1) Configuration

config/zookeeper.properties
  • Set the dataDir /tmp//kafka/zookeeper
config/server.properties
  • log.dirs=/tmp/kafka/logs
  • zookeeper.connect=localhost:2181
  • listeners=PLAINTEXT://localhost:9092
To test Kafka run the following commands.
>bin/zookeeper-server-start.sh config/zookeeper.properties
>bin/kafka-server-start.sh config/server.properties

The second command will start a new command prompt and you should see some logs in zookeeper.

3.) Kafka Topics

Create a topic:
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic clicks

View the topics:
>bin/kafka-topics.sh --list --zookeeper localhost:2181

Delete the topic (execute at the end):
>bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic clicks

4.) Sending and Receiving Messages

Send messages:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks
-Enter some messages here and leave the command open

Receive the messages:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic clicks --from-beginning
-You should be able to receive the messages that haven't been read yet

5.) Multi Broker

Make 2 copies of config/server.properties. Set the following properties:

config/server-1.properties
  • broker.id=1
  • listeners=PLAINTEXT://:9093
  • log.dir=/tmp/kafka-logs-1
config/server-2.properties
  • broker.id=2
  • listeners=PLAINTEXT://:9094
  • log.dir=/tmp/kafka-logs-2
Start the 2 new broker in different terminals
>bin/kafka-server-start.sh config/server.1.properties
>bin/kafka-server-start.sh config/server.2.properties

Create a new topic that will be replicated on the original node plus the two new.
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic clicks-replicated

You can run the view topics command again (above).

We can also describe the newly created topic as we specified:
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks-replicated
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks

6.) Fault Tolerance

Now, we can send some messages to our replicated topic:
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks-replicated

Read the message in the replicated topic:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic clicks-replicated

Now, shut down the second node by ctrl + c in the command or close it.

Again, we can describe the replicated topic.
>bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic clicks-replicated

We can the messages again from the beginning (original and 1st node, node that the second node is off).
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic clicks-replicated
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --from-beginning --topic clicks-replicated

*Close all the terminals except zookeeper and the original topic using port 9092.

7.) Import / export data from and to a file using a connector

Kafka can also read and write from and to a file. Let's try that by using the default configurations.
  • connect-standalone.properties - is basically server.properties
  • connect-file-source.properties - specify the source file to read (default: test.txt, note topic value here)
  • connect-file-sink.properties - where to write (default: test.sink.txt)
Run the connector
>bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
-Create a test.txt file where you run the connector and add some text to it. Make sure that you end with a newline. Otherwise, the last line will not be read.

Notice the log we should have something like:
[2019-01-13 16:17:09,799] WARN Couldn't find file test.txt for FileStreamSourceTask, sleeping to wait for it to be created (org.apache.kafka.connect.file.FileStreamSourceTask:109)
[2019-01-13 16:17:10,838] INFO Cluster ID: MYm1bMttRdCqG-njYXeO-w (org.apache.kafka.clients.Metadata:285)

There should be a newly created file with the same content named: test.sink.txt.

Note that you can still read the messages using the consumer. Topic=connect-test is from connect-file-source.properties:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning

Modify the test.txt, adding "Hello World!" and your consumer should be able to pickup the message.
>{"schema":{"type":"string","optional":false},"payload":"Hello World!"}

*Terminate the consumer but leave server0 open.

8.) Streaming using WordCount app

Now let's create a new file with the following content:
>echo -e "The quick brown fox jumps over the lazy dog.\nThe quick brown fox jumps over the lazy dog." > file-input.txt

Create a new topic:
>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-plaintext-input

Send file data to the topic, it could come from a stream.
>bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input < file-input.txt

Consume the input:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input --from-beginning

We can use the WordCount app package with Kafka to stream the data from the file we just created.
>bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

Consume the messages using String and Long deserializers:
>bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

You should have an output similar to:
the 1
quick 1
brown 1
fox 1
jumps 1
over 1
the 2
lazy 1
dog. 1
the 3
quick 2
brown 2
fox 2
jumps 2
over 2
the 4
lazy 2
dog. 2

Hadoop MapReduce Demo

Versions:
  • Hadoop 3.1.1 
  • Java10
Set the following environment variables:
  • JAVA_HOME 
  • HADOOP_HOME

For Windows

Download Hadoop 3.1.1 binaries for windows at https://github.com/s911415/apache-hadoop-3.1.0-winutils. Extract in HADOOP_HOME\bin and make sure to override the existing files.

For Ubuntu

$ ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
$ chmod 0600 ~/.ssh/authorized_keys

The following instruction will install Hadoop as Pseudo-Distributed Operation

1.) Create the following folders:
HADOOP_HOME/tmp
HADOOP_HOME/tmp/dfs/data
HADOOP_HOME/tmp/dfs/name

2.) Set the following properties: core-site.xml and hdfs-site.xml
<property>
fs.defaultFS
hdfs://localhost:9001
</property>
<property>
</property>
core-site.xml
<property>
hadoop.tmp.dir
HADOOP_HOME/tmp
</property>
<property>
</property>
hdfs-site.xml
<property>
dfs.namenode.name.dir
file:///HADOOP_HOME/tmp/dfs/name
</property>
<property>
dfs.datanode.data.dir
file:///HADOOP_HOME/tmp/dfs/data
</property>

<property>
dfs.permissions
false
</property>
<property>
</property>
3.) Run hadoop namenode -format Don't forget the file:/// prefix in hdfs-site.xml for windows. Otherwise, the format will fail.

4.) Run HADOOP_HOME/sbin/start-dfs.xml.

5.) If all goes well, you can check the log for the web port in the console. In my case it's http://localhost:9870.


6.) You can now upload any file in the #4 URL.



Now let's try to create a project that will test our Hadoop setup. Or download an already existing one. For example this project: https://www.guru99.com/create-your-first-Hadoop-program.html. It has a nice explanation with it, so let's try. I've repackaged it into a pom project and uploaded at Github at https://github.com/czetsuya/Hadoop-MapReduce.
  1. Clone the repository. 
  2. Open the hdfs url from the #5 above, and create an input and output folder.
  3. In input folder, upload the file SalesJan2009 from the project's root folder. 
  4. Run Hadoop jar Hadoop-mapreduce-0.0.1-SNAPSHOT.jar /input /output. 
  5. Check the output from the URL and download the resulting file.

To run Hadoop as standalone, download and unpack it as is. Go to our projects folder, build using maven, then run the Hadoop command below:
>$HADOOP_HOME/bin/hadoop jar target/hadoop-mapreduce-0.0.1-SNAPSHOT.jar input output

input - is a directory that should contain the csv file
output - is a directory that will be created after launch. The output file will be save here.

The common cause of problems: 

  • Un-properly configured core-site or hdfs-site related to data and name node?
  • File / folder permission

References

  • https://www.guru99.com/create-your-first-hadoop-program.html
  • https://github.com/czetsuya/Hadoop-MapReduce
  • https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SingleCluster.html#Standalone_Operation