介绍
Kafka 是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
- 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
- 高吞吐量 :即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。
- 支持通过Kafka服务器和消费机集群来分区消息。
- 支持Hadoop并行数据加载。
- Kafka通过官网发布了最新版本2.0.0
安装
基于Linux和macos操作系统
参考
Step 1: 下载代码
下载2.2.0版本并解压缩
1 | $ tar -xzf kafka_2.12-2.2.0.tgz |
Step 2: 启动服务
运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper。
1 $ bin/zookeeper-server-start.sh config/zookeeper.properties
现在启动kafka服务1
$ nohup bin/kafka-server-start.sh config/server.properties &
入门
主要介绍发送Kafka消息,消费kafka消息等简单示例代码,以及使用过程中遇到的问题和解决方案
发送kafka消息示例代码
1 | package com.ohaotian.datatransmission.core.writer.kafka; |
监控
Kafka Manager
简介
为了简化开发者和服务工程师维护Kafka集群的工作,构建了一个叫做Kafka管理器的基于Web工具,叫做 Kafka Manager。这个管理工具可以很容易地发现分布在集群中的哪些topic分布不均匀,或者是分区在整个集群分布不均匀的的情况。它支持管理多个集群、选择副本、副本重新分配以及创建Topic。同时,这个管理工具也是一个非常好的可以快速浏览这个集群的工具。
该软件是用Scala语言编写的。目前(2015年02月03日)雅虎已经开源了Kafka Manager工具。这款Kafka集群管理工具主要支持以下几个功能:
管理几个不同的集群;
很容易地检查集群的状态(topics, brokers, 副本的分布, 分区的分布);
选择副本;
产生分区分配(Generate partition assignments)基于集群的当前状态;
重新分配分区。
安装要求
Kafka 0.8.. or 0.9.. or 0.10.. or 0.11..
Java 8+
sbt 0.13.x
配置
系统至少需要配置zookeeper集群的地址,可以在kafka-manager安装包的conf目录下面的application.conf文件中进行配置。例如:1
kafka-manager.zkhosts="my.zookeeper.host.com:2181"
你可以指定多个zookeeper地址,用逗号分隔:1
kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"
另外, 如果你不想硬编码,可以使用环境变量ZK_HOSTS。1
kafka-ZK_HOSTS="my.zookeeper.host.com:2181"
你可以启用/禁止以下的功能,通过修改application.config:1
application.features=["KMClusterManagerFeature","KMTopicManagerFeature","KMPreferredReplicaElectionFeature","KMReassignPartitionsFeature"]
KMClusterManagerFeature - 允许从Kafka Manager添加,更新,删除集群。
KMTopicManagerFeature - 允许从Kafka集群中增加,更新,删除topic
KMPreferredReplicaElectionFeature - 允许为Kafka集群运行首选副本
KMReassignPartitionsFeature - 允许生成分区分配和重新分配分区
考虑为启用了jmx的大群集设置这些参数:
- kafka-manager.broker-view-thread-pool-size=< 3 * number_of_brokers>
- kafka-manager.broker-view-max-queue-size=< 3 * total # of partitions across all topics>
kafka-manager.broker-view-update-seconds=< kafka-manager.broker-view-max-queue-size / (10 * number_of_brokers) >
下面是一个包含10个broker,100个topic的kafka集群示例,每个topic有10个分区,相当于1000个总分区,并启用JMX:kafka-manager.broker-view-thread-pool-size=30
- kafka-manager.broker-view-max-queue-size=3000
- kafka-manager.broker-view-update-seconds=30
控制消费者偏offset缓存的线程池和队列: - kafka-manager.offset-cache-thread-pool-size=< default is # of processors>
- kafka-manager.offset-cache-max-queue-size=< default is 1000>
- kafka-manager.kafka-admin-client-thread-pool-size=< default is # of processors>
- kafka-manager.kafka-admin-client-max-queue-size=< default is 1000>
您应该在启用了消费者轮询的情况下为大量#消费者增加以上内容。虽然它主要影响基于ZK的消费者轮询。
Kafka管理的消费者offset现在由“__consumer_offsets”topic中的KafkaManagedOffsetCache消费。请注意,这尚未经过跟踪大量offset的测试。每个集群都有一个单独的线程消费这个topic,所以它可能无法跟上被推送到topic的大量offset。
部署
下面的命令创建一个可部署应用的zip文件。1
sbt clean dist
如果你不想拉源码,在编译,我已经编译好,放在百度云盘上了。
链接:https://pan.baidu.com/s/1AWQihB3CkF0g2Ao7lizTWw 密码:82eq
启动服务
解压刚刚的zip文件,然后启动它:1
$ bin/kafka-manager
默认情况下,端口为9000。可覆盖,例如:1
$ bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=8080
再如果java不在你的路径中,或你需要针对不同的版本,增加-java-home选项:1
$ bin/kafka-manager -java-home /usr/local/oracle-java-8
用安全启动服务
为SASL添加JAAS配置,添加配置文件位置:1
$ bin/kafka-manager -Djava.security.auth.login.config=/path/to/my-jaas.conf
注意:确保运行kafka manager的用户有读取jaas配置文件的权限。
打包
如果你想创建一个Debian或者RPM包,你可以使用下面命令打包:1
2sbt debian:packageBin
sbt rpm:packageBin
常见问题
1. 如何实现批量发送Kafka消息?
生产者发送多个消息到同一个分区的时候,为了减少网络带来的系能开销,kafka会对消息进行批量发送。
batch.size
通过这个参数来设置批量提交的数据大小,默认是16k,当积压的消息达到这个值的时候就会统一发送(发往同一分区的消息)
linger.ms
这个设置是为发送设置一定是延迟来收集更多的消息,默认大小是0ms(就是有消息就立即发送)
当这两个参数同时设置的时候,只要两个条件中满足一个就会发送。比如说batch.size设置16kb,linger.ms设置50ms,那么当消息积压达到16kb就会发送,如果没有到达16kb,那么在第一个消息到来之后的50ms之后消息将会发送。
2. Kafka如何保证消息的可靠性传输?
这块比较常见的一个场景,就是Kafka某个broker宕机,然后重新选举partition 的leader。大家想想,要是此时其他的follower刚好还有些数据没有同步,结果此时leader 挂了,然后选举某个follower成leader之后,不就少了一些数据?这就丢了一些数据啊。
生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将follower切换为 leader 之后,就会发现说这个数据就丢了。
所以此时一般是要求起码设置如下 4 个参数:
给topic设置replication.factor参数:这个值必须大于1,要求每个 partition必须有至少2个副本。
在Kafka服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧。
在producer端设置 acks=all:这个是要求每条数据,必须是写入所有replica 之后,才能认为是写成功了。
在 producer 端设置 retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。
我们生产环境就是按照上述要求配置的,这样配置之后,至少在Kafka broker端就可以保证在leader所在broker发生故障,进行leader切换时,数据不会丢失。
生产者会不会弄丢数据?
如果按照上述的思路设置了acks=all,一定不会丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。