源码地址:
https://github.com/pony-maggie/springboot-kafka-demo
本地kafka和zk环境
我们需要在本地启动一个单机版的kafka和zookeeper环境。kafka的安装包自带zookeeper,直接启动即可,这个详细过程不是本文的重点,不详细说了。
我的本地环境配置如下:
- win10系统
- kafka_2.12-1.1.1
- zookeeper-3.4.9
- spring boot 2.1.6.RELEASE
启动zk,端口是2181
1 |
|
启动kafka,端口是
1 |
|
记得查看启动日志确认启动成功才行。
用kafka自带的工具创建一个topic试试:
1 |
|
可以看到创建成功了。然后我们查询下kafka的topic,
1 |
|
然后我们可以用kafka自带的生产者和消费者工具进行测试,进一步验证本地环境。
首先分别启动生产者和消费者,
1 |
|
1 |
|
在消费者的窗口输入消息,很快消费者窗口就会显示出该消息了。或者消费者启动也可以用下面的方式:
1 |
|
原因可以参考:
Kafka中的broker-list,bootstrap-server以及zookeeper
下面两个如何配置
创建demo项目工程
依赖
1 |
|
配置
1 |
|
先来解释下这几个配置,
- bootstrap-servers:连接kafka的地址,多个地址用逗号分隔
- batch-size:当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
- retries:若设置大于0的值,客户端会将发送失败的记录重新发送
- buffer-memory:Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
- key-serializer:关键字的序列化类
- value-serializer:值的序列化类
到这里配置就可以结束了,目前spring-kafka已经和spring boot无缝对接,可以自动加载配置文件进行配置,我们不需要再单独定义配置类。
测试代码
我们先定义一个消息实体,方便消费者和生产者共享。
1 |
|
然后是生产者,
1 |
|
代码很简单,不做过多解释。
然后是消费者,
1 |
|
只需要在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的topic。
kafka的消息再接收端会被封装成ConsumerRecord对象返回,它内部的value属性就是实际的消息。
测试
启动springboot项目,通过日志可以看出消息的收发都是正常的。
1 |
|
我们在代码里创建了一个名为 “malu” 的topic,可以通过命令查询下:
1 |
|
其它说明
如果启动的时候报错,需要考虑springboot和spring-kafka兼容性问题。比如一开始我启动的时候就报错:
1 |
|
后来把spring-kafka的版本升级下就好了。具体的版本对应关系可以看下官方的说明:
https://spring.io/projects/spring-kafka
参考:
http://kafka.apachecn.org/documentation.html
http://www.54tianzhisheng.cn/2018/01/05/SpringBoot-Kafka/