Kafka生产者发送消息的三种方式

介绍

Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量、灵活的offset是其它消息系统所没有的。

三种方式

Kafka发送消息主要有三种方式:

  • 发送并忘记
  • 同步发送
  • 异步发送+回调函数

发送并忘记

发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理)
发送并忘记的方式本质上也是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法保证消息的可靠性.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void wayOne() {
String brokeList = "127.0.0.1:9092";
String topic = "testTopic";
String key = "testKey";
Producer<String, String> producer = initProducer(brokeList);
long before = System.currentTimeMillis();
System.out.println("发送前-->"+before);
for(int i=1 ;i<10000;i++){
producer.send(new ProducerRecord<>(topic, 0,key, String.valueOf(i)));
}
producer.flush();
producer.close();
long after = System.currentTimeMillis();
System.out.println("发送后-->"+after);
long temp = after-before;
System.out.println("时间间隔-->"+temp);

}

同步发送

同步发送(通过get方法等待Kafka的响应,判断消息是否发送成功)
以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断, 可以明确地知道每条消息的发送情况,但是由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void wayTwo() {
String brokeList = "39.96.117.232:9092";
String topic = "testTopic";
String key = "testKey";
Producer<String, String> producer = initProducer(brokeList);
long before = System.currentTimeMillis();
System.out.println("发送前-->"+before);
for(int i=1 ;i<10000;i++){
Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topic,key, String.valueOf(i)));
try {
RecordMetadata record = recordMetadataFuture.get(10, TimeUnit.MICROSECONDS);
System.out.println(record.toString());
}catch (Exception e){
System.out.println(e);
}

}
long after = System.currentTimeMillis();
System.out.println("发送后-->"+after);
long temp = after-before;
System.out.println("时间间隔-->"+temp);

}

异步发送+回调函数

异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败)
在调用send方法发送消息的同时,指定一个回调函数,服务器在返回响应时会调用该回调函数,通过回调函数能够对异常情况进行处理,当调用了回调函数时,只有回调函数执行完毕生产者才会结束,否则一直会阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void wayThree() {
String brokeList = "39.96.117.232:9092";
String topic = "testTopic";
String key = "testKey";
Producer<String, String> producer = initProducer(brokeList);
long before = System.currentTimeMillis();
System.out.println("发送前-->"+before);
for(int i=1 ;i<10000;i++){
producer.send(new ProducerRecord<>(topic, key, String.valueOf(i)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("## 发送消息成功->");
}else {
System.out.println("## 发送消息失败->{}"+e.getMessage());
}
}
});
}
long after = System.currentTimeMillis();
System.out.println("发送后-->"+after);
long temp = after-before;
System.out.println("时间间隔-->"+temp);
}

示例代码

基于java实现(待完善)

依赖jar包:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.2.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.ohaotian.feifz.mq.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* @Author: feifz
* @Date: 2019-05-31 15:34
* @Version: 1.0
* @Description: kafka 发送消息三种方式发送消息示例代码
* @Refer https://www.cnblogs.com/FG123/p/10091478.html
*/
public class KafkaSendMsgDemo {

public static void main(String[] args) {

/**
三种方式虽然在时间上有所差别,但并不是说时间越快的越好,具体要看业务的应用场景:

场景1:如果业务要求消息必须是按顺序发送的,那么可以使用同步的方式,并且只能在一个partation上或指定同一个key,结合参数设置retries的值让发送失败时重试,设置max_in_flight_requests_per_connection=1,可以控制生产者在收到服务器晌应之前只能发送1个消息,从而控制消息顺序发送;

场景2:如果业务只关心消息的吞吐量,容许少量消息发送失败,也不关注消息的发送顺序,那么可以使用发送并忘记的方式,并配合参数acks=0,这样生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息;

场景3:如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息,配合参数retries=0,并将发送失败的消息记录到日志文件中;
* */

wayThree();
}

/**
* 发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理)
*/
public static void wayOne() {
String brokeList = "127.0.0.1:9092";
String topic = "testTopic";
String key = "testKey";
Producer<String, String> producer = initProducer(brokeList);
long before = System.currentTimeMillis();
System.out.println("发送前-->"+before);
for(int i=1 ;i<10000;i++){
producer.send(new ProducerRecord<>(topic, 0,key, String.valueOf(i)));
}
producer.flush();
producer.close();
long after = System.currentTimeMillis();
System.out.println("发送后-->"+after);
long temp = after-before;
System.out.println("时间间隔-->"+temp);

}

/**
* 同步发送-(通过get方法等待Kafka的响应,判断消息是否发送成功)
*/
public static void wayTwo() {
String brokeList = "39.96.117.232:9092";
String topic = "testTopic";
String key = "testKey";
Producer<String, String> producer = initProducer(brokeList);
long before = System.currentTimeMillis();
System.out.println("发送前-->"+before);
for(int i=1 ;i<10000;i++){
Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topic,key, String.valueOf(i)));
try {
RecordMetadata record = recordMetadataFuture.get(10, TimeUnit.MICROSECONDS);
System.out.println(record.toString());
}catch (Exception e){
System.out.println(e);
}

}
long after = System.currentTimeMillis();
System.out.println("发送后-->"+after);
long temp = after-before;
System.out.println("时间间隔-->"+temp);

}

/**
* 异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败)
*/
public static void wayThree() {
String brokeList = "39.96.117.232:9092";
String topic = "testTopic";
String key = "testKey";
Producer<String, String> producer = initProducer(brokeList);
long before = System.currentTimeMillis();
System.out.println("发送前-->"+before);
for(int i=1 ;i<10000;i++){
producer.send(new ProducerRecord<>(topic, key, String.valueOf(i)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e==null){
System.out.println("## 发送消息成功->");
}else {
System.out.println("## 发送消息失败->{}"+e.getMessage());
}
}
});
}
long after = System.currentTimeMillis();
System.out.println("发送后-->"+after);
long temp = after-before;
System.out.println("时间间隔-->"+temp);
}

/**
* 初始化producer
* @param brokeList
* @return
*/
private static Producer<String, String> initProducer(String brokeList) {
Properties props = new Properties();
props.put("bootstrap.servers", brokeList);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("retries", 0);
Producer<String, String> producer = new KafkaProducer<>(props);
return producer;
}

}