Kafka安装与入门

介绍

Kafka 是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

  • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
  • 支持通过Kafka服务器和消费机集群来分区消息。
  • 支持Hadoop并行数据加载。
  • Kafka通过官网发布了最新版本2.0.0

安装

基于Linux和macos操作系统

参考

Step 1: 下载代码

下载2.2.0版本并解压缩

1
2
$ tar -xzf kafka_2.12-2.2.0.tgz
$ cd kafka_2.12-2.2.0

Step 2: 启动服务

运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper。

1
$ bin/zookeeper-server-start.sh config/zookeeper.properties

现在启动kafka服务

1
$ nohup bin/kafka-server-start.sh config/server.properties &

入门

主要介绍发送Kafka消息,消费kafka消息等简单示例代码,以及使用过程中遇到的问题和解决方案

发送kafka消息示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.ohaotian.datatransmission.core.writer.kafka;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.log4j.Log4j2;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/**
* @Author: feifz
* @Date: 2019-05-31 14:58
* @Version: 1.0
* @Description: kafka 发送消息示例
*/
@Log4j2
public class KafkaProducerDemo {

public static void main(String[] args) {
String brokeList = "127.0.0.1:9092";
String topic = "testTopic";
String key = "testKey";
String message = "this is a test kafka message!";
Producer<String, String> producer = initProducer(brokeList);
producer.send(new ProducerRecord<>(topic, key, JSONObject.toJSONString(message)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e == null) {
log.info("## 发送消息成功->{}", JSONObject.toJSONString(message));
} else {
log.error("## 发送消息失败->{}", e.getMessage());
}
}
});
producer.close();
}

/**
* 初始化producer
* @param brokeList
* @return
*/
private static Producer<String, String> initProducer(String brokeList) {
Properties props = new Properties();
props.put("bootstrap.servers", brokeList);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432L);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
return producer;
}

}

监控

参考:https://www.orchome.com/55

Kafka Manager

简介

为了简化开发者和服务工程师维护Kafka集群的工作,构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。

该软件是用Scala语言编写的。目前(2015年02月03日)雅虎已经开源了Kafka Manager工具。这款Kafka集群管理工具主要支持以下几个功能:

管理几个不同的集群;
很容易地检查集群的状态(topics, brokers, 副本的分布, 分区的分布);
选择副本;
产生分区分配(Generate partition assignments)基于集群的当前状态;
重新分配分区。

安装要求

Kafka 0.8.. or 0.9.. or 0.10.. or 0.11..
Java 8+
sbt 0.13.x

配置

系统至少需要配置zookeeper集群的地址,可以在kafka-manager安装包的conf目录下面的application.conf文件中进行配置。例如:

1
kafka-manager.zkhosts="my.zookeeper.host.com:2181"

你可以指定多个zookeeper地址,用逗号分隔:

1
kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"

另外, 如果你不想硬编码,可以使用环境变量ZK_HOSTS。

1
kafka-ZK_HOSTS="my.zookeeper.host.com:2181"

你可以启用/禁止以下的功能,通过修改application.config:

1
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]

KMClusterManagerFeature - 允许从Kafka Manager添加,更新,删除集群。
KMTopicManagerFeature - 允许从Kafka集群中增加,更新,删除topic
KMPreferredReplicaElectionFeature - 允许为Kafka集群运行首选副本
KMReassignPartitionsFeature - 允许生成分区分配和重新分配分区
考虑为启用了jmx的大群集设置这些参数:

  • kafka-manager.broker-view-thread-pool-size=< 3 * number_of_brokers>
  • kafka-manager.broker-view-max-queue-size=< 3 * total # of partitions across all topics>
  • kafka-manager.broker-view-update-seconds=< kafka-manager.broker-view-max-queue-size / (10 * number_of_brokers) >
    下面是一个包含10个broker,100个topic的kafka集群示例,每个topic有10个分区,相当于1000个总分区,并启用JMX:

  • kafka-manager.broker-view-thread-pool-size=30

  • kafka-manager.broker-view-max-queue-size=3000
  • kafka-manager.broker-view-update-seconds=30
    控制消费者偏offset缓存的线程池和队列:
  • kafka-manager.offset-cache-thread-pool-size=< default is # of processors>
  • kafka-manager.offset-cache-max-queue-size=< default is 1000>
  • kafka-manager.kafka-admin-client-thread-pool-size=< default is # of processors>
  • kafka-manager.kafka-admin-client-max-queue-size=< default is 1000>
    您应该在启用了消费者轮询的情况下为大量#消费者增加以上内容。虽然它主要影响基于ZK的消费者轮询。

Kafka管理的消费者offset现在由“__consumer_offsets”topic中的KafkaManagedOffsetCache消费。请注意,这尚未经过跟踪大量offset的测试。每个集群都有一个单独的线程消费这个topic,所以它可能无法跟上被推送到topic的大量offset。

部署

下面的命令创建一个可部署应用的zip文件。

1
sbt clean dist

如果你不想拉源码,在编译,我已经编译好,放在百度云盘上了。

链接:https://pan.baidu.com/s/1AWQihB3CkF0g2Ao7lizTWw 密码:82eq

启动服务

解压刚刚的zip文件,然后启动它:

1
$ bin/kafka-manager

默认情况下,端口为9000。可覆盖,例如:

1
$ bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080

再如果java不在你的路径中,或你需要针对不同的版本,增加-java-home选项:

1
$ bin/kafka-manager -java-home /usr/local/oracle-java-8

用安全启动服务

为SASL添加JAAS配置,添加配置文件位置:

1
$ bin/kafka-manager -Djava.security.auth.login.config=/path/to/my-jaas.conf

注意:确保运行kafka manager的用户有读取jaas配置文件的权限。

打包

如果你想创建一个Debian或者RPM包,你可以使用下面命令打包:

1
2
sbt debian:packageBin
sbt rpm:packageBin

常见问题

1. 如何实现批量发送Kafka消息?

生产者发送多个消息到同一个分区的时候,为了减少网络带来的系能开销,kafka会对消息进行批量发送。
batch.size
通过这个参数来设置批量提交的数据大小,默认是16k,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)
linger.ms
这个设置是为发送设置一定是延迟来收集更多的消息,默认大小是0ms(就是有消息就立即发送)
当这两个参数同时设置的时候,只要两个条件中满足一个就会发送。比如说batch.size设置16kb,linger.ms设置50ms,那么当消息积压达到16kb就会发送,如果没有到达16kb,那么在第一个消息到来之后的50ms之后消息将会发送。

2. Kafka如何保证消息的可靠性传输?

这块比较常见的一个场景,就是Kafka某个broker宕机,然后重新选举partition 的leader。大家想想,要是此时其他的follower刚好还有些数据没有同步,结果此时leader 挂了,然后选举某个follower成leader之后,不就少了一些数据?这就丢了一些数据啊。
生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将follower切换为 leader 之后,就会发现说这个数据就丢了。
所以此时一般是要求起码设置如下 4 个参数:
给topic设置replication.factor参数:这个值必须大于1,要求每个 partition必须有至少2个副本。
在Kafka服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧。
在producer端设置 acks=all:这个是要求每条数据,必须是写入所有replica 之后,才能认为是写成功了。
在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
我们生产环境就是按照上述要求配置的,这样配置之后,至少在Kafka broker端就可以保证在leader所在broker发生故障,进行leader切换时,数据不会丢失。
生产者会不会弄丢数据?
如果按照上述的思路设置了acks=all,一定不会丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。