Kafka(二)文件存储机制 & 生产者

目录

一、Kafka 文件存储机制

二、Kafka 生产者

1、生产者消息发送流程

1.1、发送原理

2、异步发送 API

2.1、普通异步发送

案例演示

2.2、带回调函数的异步发送

2.3、同步发送 API

3、生产者分区

3.1、分区的好处

3.2、生产者发送消息的分区策略

(1)默认的分区器 DefaultPartitioner

3.3、自定义分区器 

1)需求

2)实现步骤

4、生产经验

4.1、生产者如何提高吞吐量

4.2、数据可靠性

4.3、数据去重

4.3.1、数据传递语义

4.3.2、幂等性

4.3.3、生产者事务

4.4、数据有序

4.5、数据乱序


一、Kafka 文件存储机制

        Kafka中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向topic的。

        topic 只是逻辑上的概念,而 partition 是物理上的概念。每个partition对应于一个log文件,该 log 文件中存储的就是 producer 生产的数据。Producer生产的数据会被不断追加到该 log 文件末端,且每条数据都有自己的 offset 。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

        由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个 partition 分为多个 segment 。每个 segment 对应两个文件 ——“.index”文件和“.log”文件(一个存储当前文件的索引范围,一个存储真正的数据)。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如,like 这个 topic 有三个分区,则其对应的文件夹为 like-0,like-1,like-2:

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

这种存储规则很像我们 Spark Shuffle 阶段产生的文件的存储规则,顺便回忆一下 Spark 的 Shuffle 阶段:

        Shuffle 过程中每个 Map 任务会产生两个文件,即数据文件和索引文件。其中,数据文件是存储当前 Map 任务的输出结果,而索引文件中则存储了数据文件中的数据的分区信息。下一个阶段的 Reduce 任务就是根据索引文件来获取属于自己处理的那个分区的数据。

        其实 Spark 常见的有两种 Shuffle 策略:HashShuffle 和 SortShuffle ,但是这属于 Spark 调优的内容了,现在不必深究。

二、Kafka 生产者

1、生产者消息发送流程

        我们知道,Kafka 的三大组成部分:Producer、Broker、Consumer。接下来我们要学习的部分就是 Kafka 的生产者是如何把数据发送给 Kafka 集群(Broker)的。

1.1、发送原理

        在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

main 线程:

  • 拦截器是可选的(一般用 flume 的拦截器,配置更简单)
  • 序列化器(用的是Kafka自己的序列化器,而不是Java的Serializable,因为Java的序列化器太繁重)
  • 分区器(决定数据往哪个分区存储)
  • RecordAccumulator 默认 32MB(内存),分区器会把数据发送到这里,一个分区会创建一个队列,方便数据管理(都是在内存中完成的),队列中每份数据的大小(batch.size)默认是16K

sender线程:

  • 只有数据积累到 batch.size 之后,sender 才会发送数据,默认16K
  • 如果队列中的一份数据迟迟满不了(达不到 batch.size),sender 会等待 linger.ms 后发送这份数据,默认是 0ms
  • NetworkClient:我们发送数据的请求会被放到一个队列(InFlightRequests 队列,通过源码可以看到其底层数据结构Map<String, Deque<ClientRequest>> 以 broker 为 key,队列中放的是一个个发送请求)中去,当第一个请求发出后未得到响应仍然会继续发送第二个请求,但是这个队列的大小默认是 5,也就是说,如果五个请求都发出去后且都没有得到响应,那么就不可以再发出请求,因为默认每个 broker 最多缓存5个请求。
  • Sender线程通过Selector把上面的请求和数据一起发送到Kafka集群,当数据发送到Kafka集群后,Kafka集群会进行副本的复制并返回对应的acks信息。acks有三种应答级别,分别是ACK=0、ACK=1和ACK=all或ACK=-1(默认级别)。

    ACK=0:生产者在消息发送后不会等待来自服务器的任何确认。这意味着生产者无法知道消息是否成功存储在Kafka集群中。这个级别提供了最高的吞吐量,但在可靠性方面是最低的,因为可能会丢失消息。

    ACK=1:生产者会等待直到消息的领导者副本(Leader Replica)确认接收到消息。一旦领导者副本存储了消息,生产者会收到一个确认。这个级别在性能和数据可靠性之间提供了一个平衡。但如果领导者副本在确认后发生故障,而消息还未复制到追随者副本(Follower Replicas),则消息可能会丢失。

    ACK=all或ACK=-1(默认级别):生产者会等待消息被所有的同步副本(ISR,In-Sync Replicas)确认。这意味着只有当所有的同步副本都已经接收并存储了消息,生产者才会收到一个确认。这个级别提供了最高的数据可靠性,但可能会牺牲一些性能,因为需要等待所有副本的确认。

  • 如果数据发送成功,就把 InFlighRequests 队列中的请求缓存给清除掉,并且把对应 RecordAccuulator 中的数据清除掉。

  • 如果数据发送失败,会进行重试,重试的次数默认是(Intege.MAX_VALUE),也就是死磕到底,直到发送成功为止。

参数名称

描述

bootstrap.servers

生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。

key.serializer和value.serializer

指定发送消息的key和value的序列化类型。一定要写全类名。

buffer.memory

RecordAccumulator缓冲区总大小,默认32m。

batch.size

缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。

linger.ms

如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。

acks

0:生产者发送过来的数据,不需要等数据落盘应答。

1:生产者发送过来的数据,Leader收到数据后应答。

-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

max.in.flight.requests.per.connection

允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。

retries

当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647。

如果设置了重试,还想保证消息的有序性,需要设置

MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。

retry.backoff.ms

两次重试之间的时间间隔,默认是100ms。

enable.idempotence

是否开启幂等性,默认true,开启幂等性。

compression.type

生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。

支持压缩类型:none、gzip、snappy、lz4和zstd。

2、异步发送 API

send 方法有两种传参方式,一种有回调函数,一种不带回调函数。 

2.1、普通异步发送

        所谓的异步发送指的是外部的数据使用异步的方式吧数据发送到 RecordAccumulator 的内存队列当中去,异步的体现就是外部数据发送到队列中后,并不会等待 RecordAccumulator 把数据传给 Broker 节点并返回成功的消息才继续发送;而是直接把数据扔到 RecordAccumulator 的内存队列后就撒手不管了。

案例演示

导入依赖:

<dependencies>
     <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.0.0</version>
     </dependency>
</dependencies>

注意:使用 kafka 前必须启动 zookeeper,不然报错无法使用。

public class CustomProducer {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的 key 和 value 的序列化类型 key.serialize
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer");
        // 这两个是等价的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1. 创建 Kafka 生产者对象
        // 需要指定键值的类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like","test"+i));
        }

        // 3. 关闭资源
        kafkaProducer.close();
    }
}

我们可以看到,一条消息就是一个 ProducerRecord 对象,不管你的消息多么短,哪怕是一个标点,它也会被包装进一个对象里面去。 

运行结果: 

        我们发现,我们的编程步骤和我们上面Kafka的发送原理中生产者的发送步骤是一致的,只不过我们这里没有设置拦截器和分区器,这是一个最简单的 Kafka 生产者程序了。

2.2、带回调函数的异步发送

现在我们来使用带回调函数的异步发送:

只需要修改send方法即可:

    for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", "test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
        }

运行结果: 

这种带回调函数的方法可以让我们知道数据更多的元数据信息(比如主题、分区...)。

2.3、同步发送 API

所谓的同步发送,就是外部数据发送到 RecordAccumulator 的内存队列后,必须等待数据被 Selector 发送到 Broker 并返回响应信息才能继续发送。代码的实现很简单,只需要在send方法之后加一个 get() 即可:

for (int i = 0; i < 5; i++) {
      kafkaProducer.send(new ProducerRecord<>("like","test"+i)).get();
}

3、生产者分区

3.1、分区的好处

分区的好处我们太清楚了,之前的 Hadoop、Spark、Flink 都有分区的概念,在 Shuffle 的时候、在 keyBy 的时候,分区的好处显然可以增加并行度,提高我们数据的处理效率;可以负载均衡,不会出现服务器涝的涝死,旱的旱死。

所以,从大数据的存储和计算的角度来看,分区有这么两种好处:

  1. 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上,合理控制分区的任务,可以实现负载均衡。
  2. 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据。

3.2、生产者发送消息的分区策略

(1)默认的分区器 DefaultPartitioner

我们可以看到,在初始化 ProducerRecord 时,有 6 种初始化的传参方式:

接下来我们就利用上面的回调函数可以返回数据的元数据信息,来测试一下是不是像它说的这样:

1、指定分区: 

for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", 0,"","test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
        }

运行结果:

topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 0

面试题:如何把一张 MySQL 表的数据都放到一个 Kafka 的分区当中去?

答:生产者在发送数据时指定数据的分区为 表名 。

2、不指定分区,只指定 key:

kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });

 我们指定 key 为 i (i 的值为0,1,2,3,4),运行结果:

topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1

3,不指定 partition 也不指定 key。这种方式其实我们上面在学异步发送的时候已经演示过了,它默认会等这一批都满了(16K)或者达到 linger.ms(默认0ms)才会发送。

我们这里在循环的时候,让线程睡眠 2ms,这样就不会因为默认的 linger.ms 太短立即发送,导致数据量太小,发送太快看不出来粘性分区的特点:

for (int i = 0; i < 25; i++) {
            kafkaProducer.send(new ProducerRecord<>("like", ""+i,"test" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){ // 如果异常为空 说明正常执行
                        System.out.println("topic: "+recordMetadata.topic()+",partition: "+recordMetadata.partition());
                    }
                }
            });
            Thread.sleep(2);
        }

运行结果: 

topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 2
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 2
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 1
topic: like,partition: 0
topic: like,partition: 0
topic: like,partition: 1

我们可以看到,粘性分区是这样的:它每次的分区和上一次的分区是不一样的,并且每次的数据尽可能会放到一个分区去。 

3.3、自定义分区器 

在工作当中,一些特殊场景我们现有的分区策略是无法实现的,这就需要我们自定义来实现分区器了。

1)需求

        实现一个分区器,发送过来的数据如果包含 "大傻春" 就发往 0 号分区,否则发往 1 号分区。

2)实现步骤

1. 实现 Partitioner 接口

2. 重写 partition() 方法

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String val = value.toString();
        if (val.contains("大傻春"))
            return 0;
        return 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

使用自定义分区器:

// 关联自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());

KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

// 发送数据
for (int i = 0; i < 5; i++) {
    ProducerRecord<String, String> record;
    if (i==3)
        record = new ProducerRecord<>("like", "" + i, "大傻春");
    else
        record = new ProducerRecord<>("like", ""+i,"test" + i);

kafkaProducer.send(record, (RecordMetadata recordMetadata, Exception e)-> {
    if (e == null){ // 如果异常为空 说明正常执行
           System.out.println("topic:"+recordMetadata.topic()+",partition:"+recordMetadata.partition());
       }
    });
}

4、生产经验

4.1、生产者如何提高吞吐量

        我们外部的数据是放到 RecordAccumulator 的内存队列当中等待发送的,队列中每份数据的大小(batch.size)默认是16K,但我们知道,如果数据迟迟不能达到 batch.size 的话,会根据默认的配置 linger.ms (默认是 0ms)来发送。但事实上,0ms就意味着不需要数据达到 batch.size 也就是 batch.size 这一配置形同虚设不起作用,也就是只要它察觉到有数据就立即发送,这样的效率其实并不高。

        为了提高我们的吞吐量,我们当然需要调整这两个配置的参数大小:

  • batch.size :内存队列中每个批次的大小,默认 16K
  • linger.ms:等待时间,修改为 5-100ms

        当然这个参数的值我们需要慎重考虑,就像我们 Flink 当中水位线允许迟到的时间一样,不能说为了保证数据的迟到率最低,就把等待时间设置为几秒,那样 Flink 辛辛苦苦实现的毫秒级延迟有啥用呢。

  • compression.type:压缩 snappy
  • RecordAccumulator:缓冲区大小,修改为 64 MB
public class CustomProducerParameters {
    public static void main(String[] args) {

        // 0. 配置信息
        Properties properties = new Properties();
        // 连接 kafka
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");
        // key 和 value 的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // batch.size 单位: KB
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); //默认none,可配置值gzip、snappy、lz4和zstd
        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432*2); // 修改为64MB

        // 1. 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 2. 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("like","value"+i));
        }

        // 3. 关闭资源
        kafkaProducer.close();
    }
}

4.2、数据可靠性

        数据的可靠性,指的其实就是当 Selector 把数据发送到 Broker 之后,是否等待响应(ack)之后再发送数据。

我们知道,ack 的值有三种:

  1. ack = 0:发送完直接走,不用等响应,可靠性差,但是效率高。(一般不用)
  2. ack = 1:发送完需要等待 leader 节点响应才能继续发送下一个数据,可靠性中等,效率中等。
  3. ack = -1:发送完需要等待所有节点(leader 和 其他 fllower)响应才能继续发送下一个数据。可靠性高,效率低。

 

        在生产环境中,acks=0 很少用;acks=1:一般用于传输普通日志,允许丢个别数据;acks=-1:一般用于传输和钱有关的数据,用于对可靠性的要求比较高的场景。

完全可靠性在这种情况下尽管概率特别低,但是仍然不能排除,我们会在下面的数据去重去学习。

代码中配置 ACK 级别: 

// acks
properties.put(ProducerConfig.ACKS_CONFIG,"1");
// 重试次数
properties.put(ProducerConfig.RETRIES_CONFIG,3);

4.3、数据去重

4.3.1、数据传递语义

就像我们 Flink 中的时间语义,Kafka 生产者也有它的数据传递语义:

  • 至少一次(At least once)= ACK级别设置为1 + 分区副本大于等于2 + ISR里应答的最小副本数大于等于2
  • 最多一次(At most once)= ACK级别设置为0
  • 总结:
    • Al least once 可以保证数据不丢失,但是不能保证数据不重复;
    • At most once 可以保证数据不重复,但是不能保证数据不丢失。
  • 精确一次(Exactly once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不能丢失。

Kafka 0.11 版本后,引入了一个重要的特性:幂等性和事务。

4.3.2、幂等性
  • 幂等性就是指 Producer 不论向 Broker 发送多少次重复数据,Broker 端都只会持久化一条,保证了不重复。
  • 精确一次(Exactly once)= 幂等性 + 至少一次(ack = -1 + 分区副本数 >= 2 + ISR 最小副本数 >= 2)

        重复数据的判断标准就是数据的三个属性必须唯一(PID、Partition、SeqNumber),其中 PID 是 Kafka 每次启动自动生成的,Partition是分区号,SeqNumber 是一个单调递增的数。

        幂等性只能保证数据在单分区内不会重复。

配置幂等性

enable.idempotence 默认就是 true,flase 关闭。

4.3.3、生产者事务

我们知道,幂等性只能保证数据在单分区内不会重复,但是还是不能保证绝对的唯一,比如 Kafka 挂掉了需要重启,那么重启之后之前数据的 PID 就失效了,所以当有重复的数据时,并不能识别出来。这就需要事务的方式来解决了。

说明:开启事务必须开启幂等性。

其实,Kafka 实现精确一次的保证机制和我们 Flink 在保证端到端一致性时输出端的保证方式是很相似的(幂等写入和事务写入)

Kafka 的事务一共如下 5 个 API:

// 1初始化事务
void initTransactions();

// 2开启事务
void beginTransaction() throws ProducerFencedException;

// 3在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
                              String consumerGroupId) throws ProducerFencedException;

// 4提交事务
void commitTransaction() throws ProducerFencedException;

// 5放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

测试:

public class CustomProducerTransaction {

    public static void main(String[] args) {

        Properties properties = new Properties();

        // 连接集群 bootstrap.servers 多写几个主机地址 防止一个客户端挂掉
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的 key 和 value 的序列化类型 key.serialize
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.Serializer");
        // 这两个是等价的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 指定事务id
        properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_01");

        // 1. 创建 Kafka 生产者对象
        // 需要指定键值的类型
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        kafkaProducer.initTransactions();

        kafkaProducer.beginTransaction();

        try {
            // 2. 发送数据
            for (int i = 0; i < 5; i++) {
                kafkaProducer.send(new ProducerRecord<>("like","test"+i));
            }
            kafkaProducer.commitTransaction();
        }catch (Exception e){
            kafkaProducer.abortTransaction();   // 终止事务
        }finally {
            // 3. 关闭资源
            kafkaProducer.close();
        }
    }
}

注意:一定要记得手动指定事务id(保证唯一)。我们要把发送数据的代码写进 try-catch 中,如果有异常那么久终止事务。

4.4、数据有序

        数据有序的保证一直是流处理领域的一个问题,就像我们的 Flink 通过 水位线和Barrier 对齐算法保证数据容错和有序性。我们 Kafka 中也是一样的,单分区的话我们数据当然是有序的,但因为是多分区,所以分区和分区间的数据不能保证哪个数据先被读取,所以说分区间数据的顺序是无序的。

        至于多分区要做到有序,可以把每个分区的数据在消费者这里进行排序,但是这样的效率不是很高,所以我们 Kafka 一般都是只保证单分区有序,个人认为,Kafka只要做到数据的尽量有序就可以了,反正Kafka的数据到下游传递时,一般也都是并行读取,比如 Flink 读取 Kafka 的数据就支持多个 Sink 算子,所以数据到了 Flink 的多个算子链里会出现乱序。

        单分区内数据有序,但是不一定发送过去仍然是有序的,这就需要给它增加一些条件了,也就是接下来要学习的用幂等性解决数据乱序。

4.5、数据乱序

Kafka 1.x 版本之后保证数据单分区有序的条件:

  • 未开启幂等性
    • max.in.flight.requests.per.connection 设置为 1(相当于 NetworkClient 中只能存在一个请求,那数据肯定是有序的,都不要使用幂等性)
  • 开启幂等性
    • max.in.flight.requests.per.connection 需要设置为 <= 5(启用幂等性之后,Kafka 服务端会缓存最近的  5 个request元数据,所以无论如何,都可以保证最近的5个request的数据都是有序的。而且这个配置的值不能>5,因为Kafka服务端最多缓存5个请求)

解释

        为什么一定可以保证单分区内数据有序呢,因为幂等性就是(ProducerID,Partition,SeqNumber),其中 SeqNumber 要保证单调递增,对应我们的request,如果生产者的 RecoedAccumulator 中某个broker对应的请求队列中 request1和 request2 发送成功,但是request3 却失败了,那么当然会去重试,但是此时request4和request5也发送过去了怎么办。因为我们开启了幂等性,所以当 request1和request2 发送到Kafka的服务端(broker)之后,因为它们是不重复且有序的,所以会立即落盘,但是我们的 request3 由于发送失败,此时 request4和request5又发送过来了,但是由于 request4 和 request5 并不满足幂等性要求,所以不会落盘,而是留在内存当中,所以只有当 request3 再次到来之后,满足幂等性并落盘之后,request4和request5才能落盘。