Using Flume to transfer data from Kafka to HDFS in Ambari(2.4.2) with Kerberos

flume+kafka

Firstly, you don’t need to read the following post if your Flume upper than 1.5.2. You can complete your configuration file by following official documents.**

Versions of components

  • Ambari: 2.4.2
  • HDP: 2.4.3
    • HDFS: 2.7.1.2.4
    • Kafka: 0.9.0.2.4
    • Flume: 1.5.2.2.4

Unpredictable problem

Assume that using Kafka to receive data from a topic, then store data to HDFS. It is obviously a good choice using Kafka source, memory channel, and HDFS sink. Nevertheless, it’s not easy like that after searching User Guide(1.5.2).

flume_kafka_source.png

After reading the contents above, it is pitiful that Kafka source not supporting Kerberos environment because of no properties for specifying principle and keytab file path. So why Flume 1.5.2 not supporting Kafka source? OK. Let’s check out the library within Flume (/user/hdp/2.4.3.0-227/flume/lib). There are some libs specifying Kafka(0.8.2).

flume_kafka_version

Well, it’s so weird that Ambari-2.4.2 using Kafka-0.9.0, but Flume-1.5.2 in Ambari-2.4.2 not using Kafka-0.9.0 instead of using Kafka-0.8.2. I don’t know why, but I also found similary problem after googling. Kafka starts to support Kerberos authorization after version 0.9. So it’s obvious that we can’t use Kafka source in Flume in Ambari-2.4.2 with Kerberos.

kafka_start_to_support_kerberos

From this moment, we got stuck by this annoying version of conflict. After complaining, we also need to solve this problem. But, how?
Although Kafka-0.8.2 not supporting Kerberos, it ought to create a custom Kafka source. Appending Kerberos authorization code within your custom Kafka source that is a reasonable solution for this situation.

Reasonable solution

Creating custom Kafka(0.9.0) source

Writing your custom Kafka source which inheriting class AbstractSource
is just like a Kafka consumer.

Attention: The dependencies of Kafka need to use the version of 0.9.0.

Java codes paste below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.johnny.flume.source.kafka;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.nio.charset.Charset;
import java.util.*;

public class CustomKafkaSource extends AbstractSource implements Configurable, PollableSource {

private KafkaConsumer<byte[], String> consumer;

@Override
public Status process() throws EventDeliveryException {

List<Event> eventList = new ArrayList<Event>();

try {
ConsumerRecords<byte[], String> records = consumer.poll(1000);

Event event;
Map<String, String> header;

for (ConsumerRecord<byte[], String> record : records) {
header = new HashMap<String, String>();
header.put("timestamp", String.valueOf(System.currentTimeMillis()));

event = EventBuilder.withBody(record.value(), Charset.forName("UTF-8"), header);
eventList.add(event);
}

} catch (Exception e) {
e.printStackTrace();
return Status.BACKOFF;
}

getChannelProcessor().processEventBatch(eventList);

return Status.READY;
}

@Override
public void configure(Context context) {

Properties properties = new Properties();
properties.put("zookeeper.connect", context.getString("zk"));
properties.put("group.id", context.getString("groupId"));
properties.put("auto.offset.reset", "earliest");
properties.put("bootstrap.servers", context.getString("bootstrapServers"));
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

this.consumer = new KafkaConsumer<byte[], String>(properties);
this.consumer.subscribe(Arrays.asList(context.getString("topics").split(",")));
}

@Override
public synchronized void stop() {
super.stop();
}
}

Using the code “context.getString()” can get specific property value from your configuration file.

Generating jar package

Utilizing your build automation tool - such as Maven, Gradle and etc - to generate your custom Kafka source jar.

Replacing Kafka lib and Adding custom Kafka source jar

In this step, you ought to download a Kafka jar package (the version is 0.9.0), then copy the file “kafka-clients-0.9.0.0.jar” and the file that custom Kafka source jar to your lib path of Flume(/usr/hdp/2.4.3.0-227/flume/lib). After that, you also need to delete the old version of Kafka client jar(kafka-clients-0.8.2.0.jar).

Other preparation for the final point

  1. Creating a Kafka topic and ensuring that it’s working.
1
2
3
4
5
6
7
8
9
10
11
12
13
kinit -kt /etc/security/keytabs/kafka.service.keytab kafka/bigdata.com@CONNEXT.COM

cd /usr/hdp/2.4.3.0-227/kafka/

bin/kafka-topics.sh --create --zookeeper bigdata.com:2181 --replication-factor 1 --partitions 1 --topic flume-kafka

bin/kafka-topics.sh --list --zookeeper bigdata.com:2181

bin/kafka-topics.sh --describe --zookeeper bigdata.com:2181 --topic flume-kafka

bin/kafka-console-producer.sh --topic flume-kafka --broker-list bigdata.com:6667 --security-protocol SASL_PLAINTEXT

bin/kafka-console-consumer.sh --topic flume-kafka --zookeeper bigdata.com:2181 --from-beginning --security-protocol SASL_PLAINTEXT
  1. Creating your storage folder in your HDFS for saving data.

Editing your flume configuration file

Finally, it’s time to reach the peak. Just follow the configuration code below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
amk.sources = k
amk.sinks = h
amk.channels = m

#### Sources ####
amk.sources.k.type = com.johnny.flume.source.kafka.CustomKafkaSource
amk.sources.k.zk = bigdata.com:2181
amk.sources.k.groupId = flume-kafka
amk.sources.k.bootstrapServers = bigdata.com:6667
amk.sources.k.topics = flume-kafka

#### Sinks ####
amk.sinks.h.type = hdfs
amk.sinks.h.hdfs.fileType = DataStream
amk.sinks.h.hdfs.path = /johnny/flume/events/%y-%m-%d
amk.sinks.h.hdfs.filePrefix = events
amk.sinks.h.hdfs.fileSuffix = .log
amk.sinks.h.hdfs.round = true
amk.sinks.h.hdfs.roundValue = 10
amk.sinks.h.hdfs.roundUnit = minute
amk.sinks.h.hdfs.useLocalTimeStamp = true
amk.sinks.h.hdfs.kerberosPrincipal = hdfs-bigdata@CONNEXT.COM
amk.sinks.h.hdfs.kerberosKeytab = /etc/security/keytabs/hdfs.headless.keytab

#### Channels ####
amk.channels.m.type = memory

amk.sources.k.channels = m
amk.sinks.h.channel = m

Others need to notice

Similarly, we also ought to write a custom Kafka sink in Flume(1.5.2) if we need it.
The java code pastes below.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package com.johnny.flume.sink.kafka;

import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomKafkaSink extends AbstractSink implements Configurable {
public KafkaProducer<String, String> producer;
public String topic;

@Override
public Status process() throws EventDeliveryException {

Status status = null;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {

Event event = ch.take();
if (event == null) {
status = Status.BACKOFF;
}
byte[] byte_message = event.getBody();
//生产者
ProducerRecord<String, String> record = new ProducerRecord<>(this.topic, new String(byte_message));
producer.send(record);

txn.commit();
status = Status.READY;
} catch (Throwable t) {
txn.rollback();
status = Status.BACKOFF;
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
}
return status;
}

@Override
public void configure(Context context) {

Properties properties = new Properties();
properties.put("bootstrap.servers", context.getString("bootstrapServers"));
properties.put("metadata.broker.list", context.getString("brokerList"));
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("serializer.class", "kafka.serializer.StringEncoder");
properties.put("request.required.acks", context.getString("acks"));
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");

this.producer = new KafkaProducer<String, String>(properties);
this.topic = context.getString("topic");
}
}

The related configuration file pastes below.

1
2
3
4
5
6
#### Sinks ####
amk.sinks.k.type = com.johnny.flume.sink.kafka.CustomKafkaSink
amk.sinks.k.bootstrapServers = bigdata.com:6667
amk.sinks.k.brokerList = bigdata.com:6667
amk.sinks.k.acks = 1
amk.sinks.k.topic = flume-kafka