总结一下Kafka

基础架构及术语

话不多说,先看图,通过这张图我们来捋一捋相关的概念及之间的关系:

Kafka基础架构

Producer :Producer即生产者,消息的产生者,是消息的入口。

kafka cluster
Broker
:Broker是kafka实例,每个服务器上有一个或多个kafka的实例,我们姑且认为每个broker对应一台服务器。每个kafka集群内的broker都有一个
不重复 的编号,如图中的broker-0、broker-1等……
Topic :消息的主题,可以理解为消息的分类,kafka的数据就保存在topic。在每个broker上都可以创建多个topic。
Partition
:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition的表现形式就是一个一个的文件夹!
Replication
:每一个分区都有多个副本,副本的作用是做备胎。当主分区(Leader)故障的时候会选择一个备胎(Follower)上位,成为Leader。在kafka中默认副本的最大数量是10个,且副本的数量不能大于Broker的数量,follower和leader绝对是在不同的机器,同一机器对同一个分区也只可能存放一个副本(包括自己)。
Message :每一条发送的消息主体。

Leader: 每个分区多个副本的“主”生产者发送数据的对象,以及消费者消费数据的对象都是Leader

Follower:每个分区多个副本中的“从”,实时从Leader中同步数据,保持和Leader数据的同步。Leader发生故障时,某个Follower会成为新的Leader。

Consumer :消费者,即消息的消费方,是消息的出口。
Consumer Group
:我们可以将多个消费组组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据,这也是为了提高kafka的吞吐量!

Zookeeper :kafka集群依赖zookeeper来保存集群的的元信息,来保证系统的可用性。

工作流程分析

上面介绍了kafka的基础架构及基本概念,不知道大家看完有没有对kafka有个大致印象,如果对还比较懵也没关系!我们接下来再结合上面的结构图分析kafka的工作流程,最后再回来整个梳理一遍我相信你会更有收获!

发送数据

我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候 永远的找leader
,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:

Kafka发送数据

发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证
同一分区 内的数据是有序的!写入示意图如下:

每条消息追加到分区中

上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
1、 方便扩展 。因为一个topic可以有多个partition,所以我们可以通过扩展机器去轻松的应对日益增长的数据量。
2、 提高并发 。以partition为读写单位,可以多个消费者同时消费数据,提高了消息的处理效率。

熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:
1、 partition在写入的时候可以指定需要写入的partition,如果有指定,则写入对应的partition。
2、 如果没有指定partition,但是设置了数据的key,则会根据key的值hash出一个partition。
3、 如果既没指定partition,又没有设置key,则会轮询选出一个partition。

保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为
01all
0代表producer往集群发送数据不需要等到集群的返回,不确保消息发送成功。安全性最低但是效率最高。
1代表producer往集群发送数据只要leader应答就可以发送下一条,只确保leader发送成功。
all代表producer往集群发送数据需要所有的follower都完成从leader的同步才会发送下一条,确保leader发送成功和所有的副本都完成备份。安全性最高,但是效率最低。

最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。

数据传递语义

  • 至少一次(AtLeastOnce)=ACK级别设置为-1+分区副本大于等于2+ISR里应答的最小副本数量大于等于2。可以保证数据不丢失,但是不能保证数据不重复;
  • 最多一次(AtMostOnce)=ACK级别设置为0。可以保证数据不重复,但是不能保证数据不丢失。
  • 精确一次(ExactlyOnce): 精确一次(ExactlyOnce)=幂等性+至少一次(ack=-1+分区副本数 >=2+ISR最小副本数量>=2)。对于一些非常重要的信息,比如和钱相关的数据, 要求数据既不能重复也不丢失。

Kafka0.11版本以后,引入了一项重大特性: 幂等性和事务。

幂等性

幂等性:就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复

重复数据的判断标准 :具有<**PID,Partition,SeqNumber**>相同主键的消息提交时,Broker只会持久化一条。其中
PID是Kafka每次重启都会分配一个新的Partition表示分区号;SequenceNumber是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

Kafka幂等性

如何使用幂等性?

开启参数enable.idempotence 默认为true,false关闭。

// 8.开启幂等性(开启事务,必须开启幂等性,默认为true)  
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);  

生产者事务

1)Kafka事务原理开启事务,必须开启幂等性。

Kafka事务一共有5个API

/**  
     * See {@link KafkaProducer#initTransactions()}  
     */  
    void initTransactions();  
  
    /**  
     * See {@link KafkaProducer#beginTransaction()}  
     */  
    void beginTransaction() throws ProducerFencedException;  
  
    /**  
     * See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)}  
     */  
    @Deprecated  
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,  
                                  String consumerGroupId) throws ProducerFencedException;  
  
    /**  
     * See {@link KafkaProducer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}  
     */  
    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,  
                                  ConsumerGroupMetadata groupMetadata) throws ProducerFencedException;  
  
    /**  
     * See {@link KafkaProducer#commitTransaction()}  
     */  
    void commitTransaction() throws ProducerFencedException;  
  
    /**  
     * See {@link KafkaProducer#abortTransaction()}  
     */  
    void abortTransaction() throws ProducerFencedException;   

3)使用kafka事务配置如下

// 7.设置事务id(必须)  
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_0");  
  
// 8.开启幂等性(开启事务,必须开启幂等性,默认为true)  
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);   

4)自定类 :CustomProducerTransactions 实现kafka事务

import ch.qos.logback.classic.Level;  
import ch.qos.logback.classic.Logger;  
import ch.qos.logback.classic.LoggerContext;  
import org.apache.kafka.clients.producer.Callback;  
import org.apache.kafka.clients.producer.KafkaProducer;  
import org.apache.kafka.clients.producer.ProducerRecord;  
import org.apache.kafka.clients.producer.RecordMetadata;  
import org.slf4j.LoggerFactory;  
  
import java.util.List;  
  
/**  
 * @author huangdh  
 * @version 1.0  
 * @description:  
 * @date 2022-11-10 21:45  
 */  
public class CustomProducerTransactions {  
  
    // 修改日志打印级别,默认为debug级别  
    static {  
        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();  
        List<Logger> loggerList = loggerContext.getLoggerList();  
        loggerList.forEach(logger -> {  
            logger.setLevel(Level.INFO);  
        });  
    }  
  
    public static void main(String[] args) {  
  
        KafkaProducer<String, String> producer = KafkaProducerFactory.getProducer();  
  
        // 初始化事务  
        producer.initTransactions();  
        // 开启事务  
        producer.beginTransaction();  
        try {  
            for (int i = 0; i < 5; i++) {  
                producer.send(new ProducerRecord<>("kafka", "study hard everyday" + i), new Callback() {  
                    @Override  
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {  
                        if (e == null) {  
                            System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());  
                        } else {  
                            e.printStackTrace();  
                        }  
                    }  
                });  
            }  
  
//            int i = 1/0;  
            // 提交事务  
            producer.commitTransaction();  
        } catch (Exception e) {  
            // 终止事务  
            producer.abortTransaction();  
            e.printStackTrace();  
        }finally {  
            producer.close();  
        }  
    }  
}  
  

执行结果如下:

23:04:15.842 [kafka-producer-network-thread | producer-transaction_id_0] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-transaction_id_0, transactionalId=transaction_id_0] Discovered transaction coordinator 8.8.80.8:9092 (id: 0 rack: null)  
23:04:15.979 [kafka-producer-network-thread | producer-transaction_id_0] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-transaction_id_0, transactionalId=transaction_id_0] ProducerId set to 1000 with epoch 4  
主题:kafka->分区:1  
主题:kafka->分区:1  
主题:kafka->分区:1  
主题:kafka->分区:1  
主题:kafka->分区:1  

保存数据

Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。

Partition 结构
前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件,
log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。

Partition结构

如上图,这个partition有三组segment文件,每个log文件的大小是一样的,但是存储的message数量是不一定相等的(每条的message大小不一致)。文件的命名是以该segment最小offset来命名的,如000.index存储offset为0~368795的消息,kafka就是利用分段+索引的方式来解决查找效率的问题。

Message结构
上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:
1、 offset:offset是一个占8byte的有序id号,它可以唯一确定每条消息在parition内的位置!
2、 消息大小:消息大小占用4byte,用于描述消息的大小。
3、 消息体:消息体存放的是实际的消息数据(被压缩过),占用的空间根据具体的消息而不一样。

存储策略
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
1、 基于时间,默认配置是168小时(7天)。
2、 基于大小,默认配置是1073741824。
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!

Kafka副本

kafka副本

  1. Kafka副本作用:提高数据可靠性。
  2. Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。
  3. Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。
  4. Kafka分区中的所有副本统称为AR(Assigned Repllicas),AR = ISR + OSR。
    1. ISR,表示和Leader保持同步的Follower集合如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR** 。该时间阈值由replica.lag.time.max.ms参数设定,默认30s**。Leader发生故障之后,就会从ISR中选举新的Leader。
    2. OSR,表示Follower与Leader副本同步时,延迟过多的副本。

Leader选举流程

Kafka集群中有一个broker的Controller会被选举为Controller Leader,
负责管理集群broker的上下线,所有topic的分区副本分配和Leader选举等工作
。Controller的信息同步工作是依赖于Zookeeper的。

Leader和Follower故障处理细节

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset+1。

HW(High Watermark):所有副本中最小的LEO。

Leader和Follower故障处理细节

Follower故障

(1)Follower发生故障后会被临时踢出ISR。

(2)这个期间Leader和Follower继续接收数据。

Follower故障

(3)待该Follower恢复后,Follower会读取本地磁盘记录上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。

从HW开始向Leader进行同步

(4)等该Follower的LEO大于等于该Partition的HW,即Follower追上Leader之后,就可以重新加入ISR了。

Follower追上Leader

Leader故障处理机制

(1)Leader发生故障之后,会从ISR中选出一个新的Leader。

从ISR中选出一个新的Leader

(2)为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。

高于HW的部分截掉

注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或者不重复。

消费数据

消息存储在log文件后,消费者就可以进行消费了。与生产消息相同的是,消费者在拉取消息的时候也是 找leader 去拉取。

多个消费者可以组成一个消费者组(consumer
group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!是不是有点绕。我们看下图:

消费数据

图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议
消费者组的consumer的数量与partition的数量一致
在保存数据的小节里面,我们聊到了partition划分为多组segment,每个segment又包含.log、.index、.timeindex文件,存放的每条message包含offset、消息大小、消息体……我们多次提到segment和offset,查找消息的时候是怎么利用segment+offset配合查找的呢?假如现在需要查找一个offset为368801的message是什么样的过程呢?我们先看看下面的图:

Kafka查找消息

1、 先找到offset的368801message所在的segment文件(利用二分法查找),这里找到的就是在第二个segment文件。
2、
打开找到的segment中的.index文件(也就是368796.index文件,该文件起始偏移量为368796+1,我们要查找的offset为368801的message在该index内的偏移量为368796+5=368801,所以这里要查找的
相对offset
为5)。由于该文件采用的是稀疏索引的方式存储着相对offset及对应message物理偏移量的关系,所以直接找相对offset为5的索引找不到,这里同样利用二分法查找相对offset小于或者等于指定的相对offset的索引条目中最大的那个相对offset,所以找到的是相对offset为4的这个索引。
3、
根据找到的相对offset为4的索引确定message存储的物理偏移位置为256。打开数据文件,从位置为256的那个地方开始顺序扫描直到找到offset为368801的那条Message。

这套机制是建立在offset为有序的基础上,利用 segment + 有序offset + 稀疏索引 + 二分查找 +
顺序查找
等多种手段来高效的查找数据!至此,消费者就能拿到需要处理的数据进行处理了。那每个消费者又是怎么记录自己消费的位置呢?在早期的版本中,消费者将消费到的offset维护zookeeper中,consumer每间隔一段时间上报一次,这里容易导致重复消费,且性能不好!在新的版本中消费者消费到的offset已经直接维护在kafka集群的__consumer_offsets这个topic中!

Kafka Broker

Kafka Broker工作流程

Zookeeper存储的Kafka信息

Zookeeper存储的Kafka信息

  1. /kafka/brokers/ids:[0,1,2],记录有哪些服务器。
  2. /kafka/brokers/topics/first/partitions/0/state:{“leader”:1,”isr”:[1,0,2] } 记录谁是Leader,有哪些服务器可用。
  3. /kafka/controller:{“brokerid”:0} 辅助选举Leader

Kafka Broker工作流程

Broker工作流程

  1. broker启动后在zk中注册
  2. controller谁先注册,谁说了算
  3. 由选举出来的controller监听brokers节点变化
  4. controller决定Leader选举
  5. controller将节点信息上传到zk中
  6. 其他controller从zk同步相关信息
  7. 假设broker1中Leader挂了
  8. controller监听到节点发生变化
  9. 获取ISR
  10. 选举新的Leader(在isr中存活为前提,按照AR中排在前面的优先,例如:ar[1,0,2],那么leader就会按照1,0,2的顺序轮询)
  11. 更新Leader及ISR

Kafka核心特性

压缩

我们上面已经知道了Kafka支持以集合(batch)为单位发送消息,在此基础上,Kafka还支持对消息集合进行压缩,Producer端可以通过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩之后,在Consumer端需进行解压。压缩的好处就是减少传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈往往体现在网络上而不是CPU(压缩和解压会耗掉部分CPU资源)。
那么如何区分消息是压缩的还是未压缩的呢,Kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,如果后两位为0,则表示消息未被压缩。

消息可靠性

在消息系统中,保证消息在生产和消费过程中的可靠性是十分重要的,在实际消息传递过程中,可能会出现如下三中情况:

  • 一个消息发送失败
  • 一个消息被发送多次
  • 最理想的情况:exactly-once ,一个消息发送成功且仅发送了一次

有许多系统声称它们实现了exactly-
once,但是它们其实忽略了生产者或消费者在生产和消费过程中有可能失败的情况。比如虽然一个Producer成功发送一个消息,但是消息在发送途中丢失,或者成功发送到broker,也被consumer成功取走,但是这个consumer在处理取过来的消息时失败了。
从Producer端看:Kafka是这么处理的,当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可通过参数控制等待时间),如果消息在途中丢失或是其中一个broker挂掉,Producer会重新发送(我们知道Kafka有备份机制,可以通过参数控制是否等待所有备份节点都收到消息)。
从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程中挂掉,此时Consumer可以通过这个offset值重新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息做任意处理。

备份机制

备份机制是Kafka0.8版本的新特性,备份机制的出现大大提高了Kafka集群的可靠性、稳定性。有了备份机制后,Kafka允许集群中的节点挂掉后而不影响整个集群工作。一个备份数量为n的集群允许n-1个节点失败。在所有备份节点中,有一个节点作为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。下面这幅图解释了Kafka的备份机制:

Kafka备份机制

Kafka高效性相关设计

高效读写数据

  1. Kafka是分布式集群,可以采用分区技术,并行度高。
  2. 读数据采用稀疏索引,可以快速定位到要消费的数据。
  3. 顺序写磁盘。

Kafka的producer生产数据,要 写入到log文件中,写的过程是一直追加到文件末端,为顺序写
官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s
。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

页缓存 + 零拷贝技术

零拷贝: Kafka的数据加工处理操作交由Kafka生产者和Kafka消费者处理。Kafka
Broker应用层不关心存储的数据,所以就不用走应用层,传输效率高。

PageCache页缓存
:Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读操作发生时,先从PageCache中查找,如果找不到,再去磁盘中读取。
实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。

是否零拷贝工作流程对比