kafka 的使用原理及通过spring-kafka 自定义封装包的
目录
- Kafka 封装包接入
- 1.Kafka 工作原理
- 2.Spring Kafka 介绍
- 3. kafka封装包的设计及使用
1).点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除) 点对点模型通常是一个基于拉取或者轮询的消息传送模型,这种模型从队列中请求信息, 而不是将消息推送到客户端。这个模型的特点是发送到队列的消息被一个且只有一个接收者
接收处理,即使有多个消息监听者也是如此。
2).发布/订阅模式(一对多,数据生产后,推送给所有订阅者) 发布订阅模型则是一个基于推送的消息传送模型。发布订阅模型可以有多种不同的订阅 者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即
使当前订阅者不可用,处于离线状态。
1).Apache Kafka 是一个开源消息系统,由 Scala 写成。是由 Apache 软件基金会开发的 一个开源消息系统项目。
2).Kafka 最初是由 LinkedIn 公司开发,并于 2011 年初开源。2012 年 10 月从 Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待的平台。
3).Kafka 是一个分布式消息队列。Kafka 对消息保存时根据 Topic 进行归类,发送消息 者称为 Producer,消息接受者称为 Consumer, kafka 集群有多个 kafka 实例组成,每个 实例(server)
称为 broker。
kafka 是一个分布式基于发布/订阅模式的消息队列(Messge Queue),主要用于大数据实时处理领域。
发布/订阅 消息的发布不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
最新的定义kafka 是一个开源的分布式事件流平台(Event Streaming Platform),被数千件公司用于高性能数据管道、流分析、 数据集成和关键人任务应用。
2).kafka 的应用场景 缓存/消峰有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。 解耦 允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。 异步通信[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
1).Producer 消息生产者,就是向 Kafka broker发消息的客户端。
2).Consumer 消息消费者,向 Kafka broker 取消息的客户端。
3).Consumer Group (CG )消费者组,由多个 consumer组成。 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个 组内 消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即
消费者组是逻辑上的一个订阅者。
4).Broker 一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker可以容纳多个 ic。
5).Topic可以理解为一个队列,个 生产者和消费者面向的都是一个 ic。
6).Partition 为了实现扩展性,一个非常大的 ic 可以分布到多个 broker(即服务器)上,个 一个 ic 可以分为多个 partition,每个 partition 是一个有序的队列。
partions主题分区数:kafka通过分区策略,将不同的分区分配在一个集群中的broker上,一般会分散在不同的broker上,当只有一个broker时,所有的分区就只分配到该Broker上。
消息会通过负载均衡发布到不同的分区上,消费者会监测偏移量来获取哪个分区有新数据,从而从该分区上拉取消息数据。分区数越多,在一定程度上会提升消息处理的吞吐量;
7).Replica 副本,一个 ic 的每个分区都有若干个副本,一个 Leader 和若干个 Folloer。
8).Leader 每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
9).Folloer 每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Folloer 会成为新的 Leader。
10).Offset: 消息位移,表示分区中每条消息的位置信息,是一个单调递增且不变的值
11).Consumer Offset消费者位移,表征消费者消费进度,每个消费者都有自己的消费者位移。
生产者发送流程在消息发送的过程中,涉及到了两个线程 ——main 线程和 Sender 线程。在main线程中创建了一个双端队列 RecordAumulator。main线程将消息发送给 RecordAumulator, Sender线程不断从
RecordAumulator 中拉取消息发送到 Kafka Broker。
- 1)producer 先从 zookeeper 的 "/brokers/…/state"节点找到该 partition 的 leader
- 2)producer 将消息发送给该 leader
- 3)leader 将消息写入本地 log
- 4)folloers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
- 5)leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high atermark, mit 的 offset)并向 producer 发送 ACK
• batch.size 只有数据积累到batch.size之后,sender才会发送数据。默认16k
• linger.ms 如果数据迟迟未达到batch.size,sender等待linger.time之后就会 发送数据。单位ms,默认值是0ms,表示没有延迟。
acks 应答级别acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader 应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader 和ISR 队列里面所有Foller 应答,可靠性高
,效率低;
在生产环境中, acks=0 很少使用;acks=1, 一般用于传输普通日志 , 允许丢个别数据;acks=-1,一般用于传输和钱相关的数据,对可靠性要求比较高的场景 。
消费者数据完全可靠条件ACK 级别设置为-1 + 分区副本大于等于2 + ISR 里应答的的最小副本数量大于等于2
消费者组定义消费者使用一个消费者组名(即group.id)来标记自己,ic的每条消息都只会被发送到每个订阅它的消费者组的一个消费者实例上
一个consumer group可能有若干个consumer实例
对于同一个group而言,ic的每条消息只能被发送到group下的一个consumer实例上
ic消息可以被发送到多个group中
Kafka支持基于队列和基于发布/订阅的两种消息引擎模型,事实上Kafka是通过consumer group实现对这两种模型的支持
所有consumer实例都属于相同group—实现基于队列的模型,每条消息只会被一个consumer实例处理
consumer实例都属于不同group—实现基于发布/订阅的模型,极端的情况是每个consumer实例都设置完全不同都group,这样kafka消息就会被广播到所有consumre实例
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-U86RBlDi-1671434814417)(images/img_10.png)]
消费者工作流程consumer需要读取多个ic的多个分区消息,若实现并行的消息读取,一种方式是使用多线程,为每个要读取的分区创建一个专有线程去消费,旧版本consumer就是使用这种方式;另一种方式类似于Linux
I/O模型的poll或select,使用一个线程来管理多个socket连接,即与多个broker实现消息的并行消费,新版本consumer采用这种设计;
一旦consumer订阅了ic,所有消费逻辑包括coordinator的协调、消费者组的rebalance以及数据的获取都会在主逻辑poll方法的一次调用中被执行,这样用户很容易使用一个线程来管理consumer I/O操作
对于新版本consumer Kafka
1.0.0而言,是一个双线程Java进程,创建KafkaConsumer的线程被称为用户主线程,consumer在后台会创建一个心跳线程。KafkaConsumer的poll方法在用户主线程中运行,这也表明消费者组rebalance、
消息获取、coordinator管理、异步任务结果的处理甚至位移提交等操作都运行在用户主线程中;
重复消费涉及到的相关参数
-
enable.auto.mit表示消费者会周期性自动提交消费的offset。默认值true。
-
auto.mit.interval.ms在enable.auto.mit为true的情况下, 自动提交的间隔。默认值5秒。
-
max.poll.records单次消费者拉取的最大数据条数,默认值500。
-
max.poll.interval.ms表示若在阈值时间之内消费者没有消费完上一次poll的消息,consumer
client会主动向coordinator发起LeaveGroup请求,触发Rebalance;然后consumer重新发送JoinGroup请求。 -
session.timeout.msgroup
Coordinator检测consumer发生崩溃所需的时间。在这个时间内如果Coordinator未收到Consumer的任何消息,那Coordinator就认为Consumer挂了。默认值10秒。 -
heartbeat.interval.ms标识Consumer给Coordinator发一个心跳包的时间间隔。heartbeat.interval.ms越小,发的心跳包越多。默认值3秒。
原因1消费者宕机、重启或者被强行kill进程,导致消费者消费的offset没有提交。
原因2设置enable.auto.mit为true,如果在关闭消费者进程之前,取消了消费者的订阅,则有可能部分offset没提交,下次重启会重复消费。
原因3消费后的数据,当offset还没有提交时,Partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout.ms时间,那么就会触发reblance重平衡,此时可能存在消费者offset没提交,会导致重平衡后重复消费。
重复消费的解决方法:
提高消费者的处理速度。例如对消息处理中比较耗时的步骤可通过异步的方式进行处理、利用多线程处理等。在缩短单条消息消费的,根据实际场景可将max.poll.interval.ms值设置大一点,避免不必要的Rebalance。可根据实际消息速率适当调小max.poll.records的值。
- 引入消息去重机制。例如生成消息时,在消息中加入唯一标识符如消息id等。在消费端,可以保存最近的max.poll.records条消息id到redis或mysql表中,这样在消费消息时先通过查询去重后,再进行消息的处理。
implementation'.springframeork.kafka:spring-kafka' testImplementation'.springframeork.kafka:spring-kafka-test'2) spring-kafka 的主要参数配置及说明
spring: kafka: #kafka 服务地址,可以有多个用,隔开 bootstrap-servers:localhost: 9002 # 消費者监听得的 ic 不存在时项目启动不报错 listener.missinng-ics-fatal: false # 在监听容器中运行的线程数 listener.concurrency: 10 # 轮询超时时间 listener.pol.timeout: 5000 # 生产者参数 producer: # 消息发送重试次数 retries: 0 # 一个批次可以使用的内存大小 batch-size: 16384 # 设置生产者内存缓存大小 buffer-memory: 33554432 # 同步副本机制 acks: 1 key-serializer: .apache.kafka.mon.serialization.StringSerializer value-serializer: .apache.kafka.mon.serialization.StringSerializer consumer: group-id: # 一次调用poll 返回的最大记录数 max-poll-recode: 500 # 是否自动提交偏移量,2.1版本后面默认值为 false ,为了避免出现重复数据和数据丢失,可以把他设置为 false,然后手动提交偏移量 enable-auto-mmit: false # 自动提交的时间间隔 在spring boot 2.x 版本是值的类型为 Duration 需要符合特定的格式,如1S,1M,2H,5D, enable-auto-mit 为false 情况下该参数不生效 auto-mit-interval: 1S # 该属性指定了消费者在读取一个没有偏移的分区或者偏移量无效的情况下该作何处理 # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 默认值 # none:ic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 auto-offset-reset: earliest # 返回的最小字节数,默认为1 fetch-min-size: 1 # 最长阻塞时间 fetch-max-ait: 500 #消费者进程 id client-id: 1 key-serializer: .apache.kafka.mon.serialization.StringSerializer value-serializer: .apache.kafka.mon.serialization.StringSerializer3) 主要Api Configuring Topics
如果你在应用上下文中定义了一个KafkaAdmin的bean,它可以自动向代理添加主题,只需要向每个主题添加NeTopic的bean在应用中的上下文
@Bean public KafkaAdmin admin(){ Mapconfigs=ne HashMap(); configs.put(AdminClientConfig.BOOTSTRAP_SERVER_CONFIG,"localhost:9092"); reture ne KafkaAdmin(configs); } @Bean public NeTopic ic1(){ return TopicBuilder.name("ic1") .partition(1) .replicas(1) .build(); } @Bean public NeTopic ic2(){ return ne NeTopic("ic2",1,(short)1); }
在springBoot 项目中,KafkaAdmin 会被自动注册,所以我们只需要NeTopic的bean即可
发送消息 kafkaTemplate:kafkaTemplate 包装了一个生产者,并提供了方便的方法来发送数据到kafka ic
ListenableFuture>sendDefault(V data); ListenableFuture >sendDefault(K key,V data); ListenableFuture >sendDefault(Integer partition,K key,V data); ListenableFuture >sendDefault(Integer partition,Long timestamp,K key,V data); ListenableFuture >send(String ic,V data); ListenableFuture >send(String ic,K key,V data); ListenableFuture >send(String ic,Integer partition,K key,V data);
以上方法返回一个ListenableFuture
MessageListenerContainer:可以配置MessageListenerContainer 并提供消息监听器或使用@KafkaListener 注解来接收消息
kafka 提供了两个 MessageListenerContainer 来实现
- KafkaMessageListenerContainer 在一个线程中接收来自所有主题或分区的所有消息
- ConcurrentMessageListenerContainer 委托给一个或多个KafkaMessageListenerContainer 实例来提供多线程使用
@KafkaListener 注解用于将一个bean 方法指定为监听器容器的监听器
// 该部分值也可以使用占位符或spel 表达式 ics="{someBean.property}" @KafkaListener(ics = "chinaclear", groupId = "test") public void consumerTopic(String msg){ System.out.println("收到消息"+msg); } // 使用consumerRecord,?> 作为接收消息的参数时除了可有Acknoledgmen 对象外,不能有其他参数 @KafkaListener(ics = "my-replicated-ic", groupId = "test") public void listenZhugeGroup(ConsumerRecordrecord,Acknoledgment ack){ String value=record.value(); System.out.println(value); System.out.println(record); //手动提交offset //ack.acknoledge(); }
@KafkaListener 还可以作用在class 类上:
该注解作用于类上时,类中的方法必须用 @KafkaHandler注解,在传递消息时,将使用转换后的消息有效负载类型来确定调用那个方法。方式如下
@KafkaListener(id = "myId", ics = "myTopic") public class Myclass { @KafkaHandler public void listen(String str) { } @KafkaHandler public void listen(Integer integer) { } }异常处理器KafkaListenerErrorHandler :
KafkaListener中抛出的异常都会经过KafkaListenerErrorHandler 异常处理,spring kafka 提供了ConsumerAareListenerErrorHandler
的子接口,注册这个类型中的bean 即可
@Bean public ConsumerAareErrorHandler consumerAareErrorHandler(){ return ne ConsumerAareErrorHandler(){ @Override public void handle(Exception thronException,ConsumerRecord, ?> data,Consumer, ?> consumer){ log.info("Consumer Error Handler receive"+data.toString()); }}; }3. kafka 封装包的设计及使用 1.引入方式
git@github.:renpengcheng4612/KafkaLearn.git2.API
工程内容参考当前工程的 CommonKafka Module
git@github.:renpengcheng4612/KafkaLearn.git