跳到主要内容

16、Kafka 实战 - kafka笔记之如何保证数据不丢失

kafka笔记之如何保证数据不丢失

Kafka是一种高吞吐量的分布式发布订阅消息系统。在使用过程中如果使用不当,经常会出现消息丢失的情况,这是业务系统不能容忍的,消息系统最重要的是保证数据不丢失。本文主要记录kafka是如何保证数据不丢失的,主要从三方面来介绍,消息发送端保证数据不丢失,kafka服务保证消息不丢失,消费者保证消息不丢失。

基础知识

kafka 可以保证分区消息的顺序,同一个分区,先发送到kafka分区的消息,会被先消费掉。 kafka 是通过一个集群对外提供服务,只要是集群中多个副本中有一个副本是活跃的,那么收到的消息就不会丢失。

kafka集群保证数据不丢失

先思考一个问题: kafka集群什么时候会丢失消息?

这就要从kafka的复制机制开始讲了。

kafka每个topic有多个分区,分区存储在磁盘上,kafka可以保证分区的数据是有序的,每个分区可以有多个副本。

副本按照是否是首领,可以分为首领副本和跟随者副本(这里对应的就是kafka集群中的leader和follower)。

所有的消息都是发送给leader的,消息消费也是从leader获取的。首领副本第一时间收到消息,或者消费消息,他一定是同步副本。

其他follower都是和leader保持通信,同步leader的消息。当leader不可用时,会选举一个follower会变成leader。

对于一个主一个从的两个kafka,做的集群来说。

(此时的kafka复制系数是2. 对应的配置参数是replication,factor) 一个是leader副本,一个是follower副本。当follower副本一直能与leader副本保持同步的时候 follower副本是 同步副本, 当follower与leader无法保持同步的时候 follower副本则变成非同步副本。

如果leader宕机,这时候系统需要选举一个follower来作为首领,kafka优先选择同步副本作为首领,当系统没有同步副本的时候。 kafka如果选择非同步副本作为首领,则会丢失一部分数据,(这一部分数据就是非同步副本无法及时从首领副本更新的消息)。 kafka如果不选择非同步副本作为首领,则此时kafka集群不可用。

kafka 选择非同步副本作为首领副本的行为叫做,不完全首领选举。如何控制kafka在leader宕机时,同步副本不可用时,是否选择非同步 作为首领?通过kafka的另外一个参数来控制的 : unclean.leader.election. 如果是true 则会发生不完全首领选举。

副本数建议3个就可以,多的话需要更多的磁盘,unclean.leader.election 建议false.

对于两个kafka做的集群来说,肯定是不安全的。那么三个节点的kafka安全吗?

答案是也不一定安全 因为即使三个副本,也有可能是两个从都是非同步副本,此时主宕机,从要么不可用(影响高可用),要么成为主(数据丢失)。 这里就需要保证kafka系统中至少有两个同步副本。一个肯定是首领副本,另外一个是从的副本。 此时需要kafka的另外一个参数 最小同步副本数 min,insync.replicas 只有保证kafka收到生产者的消息之后,至少有 “最小同步副本数“ 的副本收到消息,才能保证在主宕机时消息不丢失。 这个参数的意思是 kafka收到生产者消息之后,至少几个同步副本,同步之后,才给客户端消息确认。 数量多能保证高可用,但是牺牲效率。

kafka 如何判断一个follower副本是不是同步副本? 满足两个个条件 1.在过去10秒内从首领获取过消息,并且是最新消息。 2.过去6秒内 和 zk直接发送过心跳。

疑问:如果kafka 长时间未收到消息,第一条如何满足?

消息发送者正确的发送姿势

消息怎么才算是发生成功?

消息的生产者向kafka集群发送消息,需要等待kafka集群确认,这里涉及到一个参数 acks 他的值有三个 0, 1, all 如果是0 ,那么代表发送过去,不等待kafka消息确认,认为成功 一定会丢失消息,可能kafka集群正在选举,此时就无法收到任何异常。 如果是1,那么代表发送过去,等待首领副本确认消息,认为成功 首领肯定收到了消息,写入了分区文件(不一定落盘)。 如果是all, 那么代表发送过去之后,消息被写入所有同步副本之后 ,认为成功。 注意这里是 所有同步副本,不是所有副本。 具体是多少同步副本,还要取决于kafka集群设置的最小同步副本数,和集群当前的同步副本数。 选择这种配置,会可靠,但是牺牲效率,可以通过,增大批和使用异步模式,提高效率。

如果发生消息发生异常怎么办?重试吗?

哪些异常需要重试? 网络异常和集群无主,或者正在选举的异常是可以重试的。 哪些不需要重试? 配置异常。 其他异常怎么办? 序列化异常,内存溢出,栈溢出等。

重要的配置参数

如果网络异常收不到响应,则等待,这里有个配置等待时间 request,timeout.ms 发送消息等待时间。 metadata.fetch.time.out 从kafka 获取元数据的等待时间。 max.block.ms : 配置控制了KafkaProducer.send()并将KafkaProducer.partitionsFor()被阻塞多长时间。 由于缓冲区已满或元数据不可用,这些方法可能会被阻塞止。用户提供的序列化程序或分区程序中的阻塞将不计入此超时 重试次数 retries 重试直接的等待时间, 默认是100 ms ,可以通过 retry.backoff.ms 配置 多个消息发送给同一个分区的时候,生产者会把消息打成一个批,批大小设置 batch.size 过大占内存,过小发送频繁,并且生产者不是 必须满批发送,有个等待时间,linger.ms设置 等待多久批不满则发送。

消息消费者正确的消费姿势

消费者需要向kafka集群提交 已经消费的消息的offset来确定消息消费到了那里。 消息队列的消费方式有两种,一种是发布订阅模式,一种是队列模式。 发布订阅模式 一个消息可以被多个消费者消费。队列模式多个消费者只能消费到一部分消息。 kafka是通过group-id来区分消费组的。 一个topic被 同一个消费组的不同消费者消费 ,相当于是队列模式。被不同消费组消费相当于是 订阅模式。 一个partition在同一个时刻只有一个consumer instance在消费。 对于正确的模式,我们需要配置正确的group-id

auto.offset.reset 没有偏移量可以提交的时候,系统从哪里开始消费。 有两种设置 :earliest 和latest 。

enable.auto.commit

自动提交 ,如果开启了自动提交,那么系统会自动进行提交offset。可能会引起,并未消费掉,就提交了offset.引起数据的丢失。

与自动提交相关的是自动提交的间隔时间 auto.commit.interval.ms 默认是5秒钟提交一次,可以通过查看 kafka config目录下的

配置文件,查询配置的默认值。

自动提交 还可能引起消息的重复消费,特别是 多个客户端直接出现重平衡时。

总结

只有三方面都保证了。才可以保证数据不丢失。