3.2 客户端开发

在了解了消费者与消费组之间的概念之后,我们就可以着手进行消费者客户端的开发了。在Kafka的历史中,消费者客户端同生产者客户端一样也经历了两个大版本:第一个是于Kafka开源之初使用 Scala 语言编写的客户端,我们可以称之为旧消费者客户端(Old Consumer)或Scala消费者客户端;第二个是从Kafka 0.9.x版本开始推出的使用Java编写的客户端,我们可以称之为新消费者客户端(New Consumer)或Java消费者客户端,它弥补了旧客户端中存在的诸多设计缺陷。

本节主要介绍目前流行的新消费者(Java 语言编写的)客户端,而旧消费者客户端已被淘汰,故不再做相应的介绍了。

一个正常的消费逻辑需要具备以下几个步骤:

(1)配置消费者客户端参数及创建相应的消费者实例。

(2)订阅主题。

(3)拉取消息并消费。

(4)提交消费位移。

(5)关闭消费者实例。

代码清单 1-2 中已经简单对消费者客户端的编码做了演示,本节对其稍做修改,如代码清单3-1所示。

代码清单3-1 消费者客户端示例

相比于代码清单 1-2 而言,修改过后的代码多了一点东西,我们按照消费逻辑的各个步骤来做相应的分析。

3.2.1 必要的参数配置

在创建真正的消费者实例之前需要做相应的参数配置,比如 3.1 节中的设置消费者所属的消费组的名称、连接地址等。参照代码清单3-1中的initConfig()方法,在Kafka消费者客户端KafkaConsumer中有4个参数是必填的。

· bootstrap.servers:该参数的释义和生产者客户端 KafkaProducer 中的相同,用来 指 定 连 接 Kafka 集 群 所 需 的 broker 地 址 清 单,具 体 内 容 形 式 为host1:port1,host2:post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”。注意这里并非需要设置集群中全部的broker地址,消费者会从现有的配置中查找到全部的Kafka集群成员。这里设置两个以上的broker地址信息,当其中任意一个宕机时,消费者仍然可以连接到Kafka集群上。有关此参数的更多释义可以参考6.5.2节。

· group.id:消费者隶属的消费组的名称,默认值为“”。如果设置为空,则会报出异常:Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException:The configured groupId is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称。

· key.deserializer 和 value.deserializer:与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中key和value所需反序列化操作的反序列化器,这两个参数无默认值。注意这里必须填写反序列化器类的全限定名,比如示例中的org.apache.kafka.common.serialization.StringDeserializer,单单指定StringDeserializer是错误的。有关更多的反序列化内容可以参考3.2.3节。

注意到代码清单3-1中的initConfig()方法里还设置了一个参数client.id,这个参数用来设定KafkaConsumer对应的客户端id,默认值也为“”。如果客户端不设置,则KafkaConsumer会自动生成一个非空字符串,内容形式如“consumer-1”“consumer-2”,即字符串“consumer-”与数字的拼接。

KafkaConsumer中的参数众多,远非示例initConfig()方法中的那样只有5个,开发人员可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,普通开发人员无法全部记住所有的参数名称,只能有个大致的印象,在实际使用过程中,诸如“key.deserializer”“auto.offset.reset”之类的字符串经常由于人为因素而书写错误。为此,我们可以直接使用客户端中的 org.apache.kafka.clients.consumer.ConsumerConfig 类来做一定程度上的预防,每个参数在ConsumerConfig类中都有对应的名称,就以代码清单3-1中的initConfig()方法为例,引入ConsumerConfig后的修改结果如下:

注意到上面的代码中key.deserializer和value.deserializer参数对应类的全限定名比较长,也比较容易写错,这里通过Java中的技巧来做进一步的改进,相关代码如下:

如此代码就简洁了许多,同时也预防了人为出错的可能。在配置完参数之后,我们就可以使用它来创建一个消费者实例:

本节介绍的KafkaConsumer配置相关的内容基本上和介绍KafkaProducer配置时的一样,除了配置对应的反序列化器,只多了一个必要的group.id参数。

3.2.2 订阅主题与分区

在创建好消费者之后,我们就需要为该消费者订阅相关的主题了。一个消费者可以订阅一个或多个主题,代码清单3-1中我们使用subscribe()方法订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe的几个重载方法如下:

对于消费者使用集合的方式(subscribe(Collection))来订阅主题而言,比较容易理解,订阅了什么主题就消费什么主题中的消息。如果前后两次订阅了不同的主题,那么消费者以最后一次的为准。

上面的示例中,最终消费者订阅的是topic2,而不是topic1,也不是topic1和topic2的并集。

如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。在Kafka 和其他系统之间进行数据复制时,这种正则表达式的方式就显得很常见。正则表达式的方式订阅的示例如下:

细心的读者可能观察到在 subscribe 的重载方法中有一个参数类型是 ConsumerRebalance-Listener,这个是用来设置相应的再均衡监听器的,具体的内容可以参考3.2.8节。

消费者不仅可以通过KafkaConsumer.subscribe()方法订阅主题,还可以直接订阅某些主题的特定分区,在KafkaConsumer中还提供了一个assign()方法来实现这些功能,此方法的具体定义如下:

这个方法只接受一个参数partitions,用来指定需要订阅的分区集合。这里补充说明一下TopicPartition类,在Kafka的客户端中,它用来表示分区,这个类的部分内容如下所示。

TopicPartition类只有2个属性:topic和partition,分别代表分区所属的主题和自身的分区编号,这个类可以和我们通常所说的主题—分区的概念映射起来。

我们将代码清单3-1中的subscribe()方法修改为assign()方法,这里只订阅topic-demo主题中分区编号为0的分区,相关代码如下:

有读者会有疑问:如果我们事先并不知道主题中有多少个分区怎么办?KafkaConsumer 中的partitionsFor()方法可以用来查询指定主题的元数据信息,partitionsFor()方法的具体定义如下:

其中PartitionInfo类型即为主题的分区元数据信息,此类的主要结构如下:

PartitionInfo类中的属性topic表示主题名称,partition代表分区编号,leader代表分区的leader副本所在的位置,replicas代表分区的AR集合,inSyncReplicas代表分区的ISR集合,offlineReplicas代表分区的OSR集合。

通过partitionFor()方法的协助,我们可以通过assign()方法来实现订阅主题(全部分区)的功能,示例参考如下:

既然有订阅,那么就有取消订阅,可以使用 KafkaConsumer 中的 unsubscribe()方法来取消主题的订阅。这个方法既可以取消通过 subscribe(Collection)方式实现的订阅,也可以取消通过subscribe(Pattern)方式实现的订阅,还可以取消通过 assign(Collection)方式实现的订阅。示例代码如下:

如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法,下面示例中的三行代码的效果相同:

如果没有订阅任何主题或分区,那么再继续执行消费程序的时候会报出IllegalStateException异常:

集合订阅的方式subscribe(Collection)、正则表达式订阅的方式subscribe(Pattern)和指定分区的订阅方式 assign(Collection)分表代表了三种不同的订阅状态:AUTO_TOPICS、AUTO_PATTERN和USER_ASSIGNED(如果没有订阅,那么订阅状态为NONE)。然而这三种状态是互斥的,在一个消费者中只能使用其中的一种,否则会报出IllegalStateException异常:

通过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过assign()方法订阅分区时,是不具备消费者自动均衡的功能的,其实这一点从assign()方法的参数中就可以看出端倪,两种类型的subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

3.2.3 反序列化

在2.1.3节中我们讲述了KafkaProducer对应的序列化器,那么与此对应的KafkaConsumer就会有反序列化器。Kafka所提供的反序列化器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,它们分别用于ByteBuffer、ByteArray、Bytes、Double、Float、Integer、Long、Short 及String类型的反序列化,这些序列化器也都实现了 Deserializer 接口,与KafkaProducer中提及的Serializer接口一样,Deserializer接口也有三个方法。

· public void configure(Map<String,?> configs,boolean isKey):用来配置当前类。

· public byte[] serialize(String topic,T data):用来执行反序列化。如果data为null,那么处理的时候直接返回null而不是抛出一个异常。

· public void close():用来关闭当前序列化器。

代码清单2-2中描述的是Kafka客户端自带的序列化器StringSerializer的具体实现,对应的反序列化器StringDeserializer的具体代码实现如下:

configure()方法中也有3个参数:key.deserializer.encoding、value.deserializer.encoding和deserializer.encoding,用来配置反序列化的编码类型,这3个都是用户自定义的参数类型,在KafkaConsumer的参数集合(ConsumerConfig)中并没有它们的身影。一般情况下,也不需要配置这几个参数,如果配置了,则需要和StringSerializer中配置的一致。默认情况下,编码类型为“UTF-8”。上面示例代码中的deserialize()方法非常直观,就是把byte[]类型转换为String类型。

在代码清单 2-3 和代码清单 2-4 中,我们演示了如何通过自定义的序列化器来序列化自定义的Company类,这里我们再来看一看与CompanySerializer对应的CompanyDeserializer的具体实现:

configure()方法和close()方法都是空实现,而deserializer()方法就是将字节数组转换成对应Company对象。在使用自定义的反序列化器的时候只需要将相应的value.deserializer参数配置为CompanyDeserializer即可,示例如下:

注意如无特殊需要,笔者还是不建议使用自定义的序列化器或反序列化器,因为这样会增加生产者与消费者之间的耦合度,在系统升级换代的时候很容易出错。自定义的类型有一个不得不面对的问题就是KafkaProducer和KafkaConsumer之间的序列化和反序列化的兼容性。对于StringSerializer来说,KafkaConsumer可以顺其自然地采用StringDeserializer,不过对于Company这种专用类型而言,某个上游应用采用CompanySerializer进行序列化之后,下游应用也必须实现对应的CompanyDeserializer。再者,如果上游的Company类型改变,那么下游也需要跟着重新实现一个新的CompanyDeserializer,后面所面临的难题可想而知。

在实际应用中,在Kafka提供的序列化器和反序列化器满足不了应用需求的前提下,推荐使用Avro、JSON、Thrift、ProtoBuf或Protostuff等通用的序列化工具来包装,以求尽可能实现得更加通用且前后兼容。使用通用的序列化工具也需要实现 Serializer 和 Deserializer 接口,因为Kafka客户端的序列化和反序列化入口必须是这两个类型。

本节的最后我们来看一下如何使用通用的序列化工具实现自定义的序列化器和反序列化器的封装。这里挑选了Protostuff来做演示,使用的Protostuff的Maven依赖如下:

为了简化说明,这里只展示出序列化器的serialize()方法和deserialize()方法,如下所示。

接下来要做的工作就和CompanyDeserializer一样,这里就不一一赘述了。读者可以添加或减少Company类中的属性,以此查看采用通用序列化工具的前后兼容性的效能。

3.2.4 消息消费

Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。

从代码清单3-1中可以看出,Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。

对于poll()方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么poll()方法返回为空的消息集合。

poll()方法的具体定义如下:

注意到poll()方法里还有一个超时时间参数timeout,用来控制poll()方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。注意这里 timeout 的类型是 Duration,它是JDK8中新增的一个与时间有关的类型。在Kafka 2.0.0之前的版本中,timeout参数的类型为long,与此类型对应的poll()方法的具体定义如下:

poll(long)方法中timeout的时间单位固定为毫秒,而poll(Duration)方法可以根据Duration中的 ofMillis()、ofSeconds()、ofMinutes()、ofHours()等多种不同的方法指定不同的时间单位,灵活性更强。并且 poll(long)方法也已经被标注为@Deprecated,虽然目前还可以使用,如果条件允许的话,还是推荐使用poll(Duration)的方式。

timeout的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值Long.MAX_VALUE。

消费者消费到的每条消息的类型为ConsumerRecord(注意与ConsumerRecords的区别),这个和生产者发送的消息类型ProducerRecord相对应,不过ConsumerRecord中的内容更加丰富,具体的结构参考如下代码:

topic 和 partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。offset 表示消息在所属分区的偏移量。timestamp 表示时间戳,与此对应的timestampType 表示时间戳的类型。timestampType 有两种类型:CreateTime 和LogAppendTime,分别代表消息创建的时间戳和消息追加到日志的时间戳。headers表示消息的头部内容。key 和 value 分别表示消息的键和消息的值,一般业务应用要读取的就是value,比如使用2.1.3节中的CompanySerializer序列化了一个Company对象,然后将其存入Kafka,那么消费到的消息中的 value 就是经过 CompanyDeserializer 反序列化后的 Company对象。serializedKeySize和serializedValueSize分别表示key和value经过序列化之后的大小,如果key为空,则serializedKeySize值为-1。同样,如果value为空,则serializedValueSize的值也会为-1。checksum是CRC32的校验值。如需更加深入了解消息中的各个属性,则可以先跳到5.2节来查阅相关内容。

我们在消费消息的时候可以直接对 ConsumerRecord 中感兴趣的字段进行具体的业务逻辑处理。

poll()方法的返回值类型是 ConsumerRecords,它用来表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord,它提供了一个iterator()方法来循环遍历消息集内部的消息,iterator()方法的定义如下:

在代码清单3-1中,我们使用这种方法来获取消息集中的每一个ConsumerRecord。除此之外,我们还可以按照分区维度来进行消费,这一点很有用,在手动提交位移时尤为明显,有关位移提交的内容我们会在下一节中详细陈述。ConsumerRecords类提供了一个records(TopicPartition)方法来获取消息集中指定分区的消息,此方法的定义如下:

我们不妨使用这个records(TopicPartition)方法来修改一下代码清单3-1中的消费逻辑,主要的示例代码如下:

上面示例中的 ConsumerRecords.partitions()方法用来获取消息集中所有分区。在 ConsumerRecords类中还提供了按照主题维度来进行消费的方法,这个方法是records(TopicPartition)的重载方法,具体定义如下:

ConsumerRecords 类中并没提供与 partitions()类似的 topics()方法来查看拉取的消息集中所包含的主题列表,如果要按照主题维度来进行消费,那么只能根据消费者订阅主题时的列表来进行逻辑处理了。下面的示例演示了如何使用ConsumerRecords中的record(String topic)方法:

在 ConsumerRecords 类中还提供了几个方法来方便开发人员对消息集进行处理:count()方法用来计算出消息集中的消息个数,返回类型是int;isEmpty()方法用来判断消息集是否为空,返回类型是boolean;empty()方法用来获取一个空的消息集,返回类型是ConsumerRecord<K,V>。

到目前为止,可以简单地认为poll()方法只是拉取一下消息而已,但就其内部逻辑而言并不简单,它涉及消费位移、消费者协调器、组协调器、消费者的选举、分区分配的分发、再均衡的逻辑、心跳等内容,在后面的章节中会循序渐进地介绍这些内容。

3.2.5 位移提交

对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。单词“offset”可以翻译为“偏移量”,也可以翻译为“位移”,读者可能并没有过多地在意这一点:在很多中文资料中都会交叉使用“偏移量”和“位移”这两个词,并没有很严谨地进行区分。笔者对offset做了一些区分:对于消息在分区中的位置,我们将offset称为“偏移量”;对于消费者消费到的位置,将 offset 称为“位移”,有时候也会更明确地称之为“消费位移”。做这一区分的目的是让读者在遇到 offset 的时候可以很容易甄别出是在讲分区存储层面的内容,还是在讲消费层面的内容,如此也可以使“偏移量”和“位移”这两个中文词汇具备更加丰富的意义。当然,对于一条消息而言,它的偏移量和消费者消费它时的消费位移是相等的,在某些不需要具体划分的场景下也可以用“消息位置”或直接用“offset”这个单词来进行表述。

在每次调用poll()方法时,它返回的是还没有被消费过的消息集(当然这个前提是消息已经存储在Kafka 中了,并且暂不考虑异常情况的发生),要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。再考虑一种情况,当有新的消费者加入时,那么必然会有再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新的消费者,如果不持久化保存消费位移,那么这个新的消费者也无法知晓之前的消费位移。

在旧消费者客户端中,消费位移是存储在ZooKeeper中的。而在新消费者客户端中,消费位移存储在Kafka内部的主题__consumer_offsets中。这里把将消费位移存储起来(持久化)的动作称为“提交”,消费者在消费完消息之后需要执行消费位移的提交。

参考图3-6的消费位移,x表示某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费了 x 位置的消息,那么我们就可以说消费者的消费位移为 x,图中也用了lastConsumedOffset这个单词来标识它。

图3-6 消费位移

不过需要非常明确的是,当前消费者需要提交的消费位移并不是 x,而是 x+1,对应于图3-6中的position,它表示下一条需要拉取的消息的位置。读者可能看过一些相关资料,里面所讲述的内容可能是提交的消费位移就是当前所消费到的消费位移,即提交的是 x,这明显是错误的。类似的错误还体现在对LEO(Log End Offset)的解读上,与此相关的细节可以参阅第5章的内容。在消费者中还有一个committed offset的概念,它表示已经提交过的消费位移。

KafkaConsumer 类提供了 position(TopicPartition)和 committed(TopicPartition)两个方法来分别获取上面所说的position和committed offset的值。这两个方法的定义如下所示。

为了论证lastConsumedOffset、committed offset和position之间的关系,我们使用上面的这两个方法来做相关演示。我们向某个主题中分区编号为0的分区发送若干消息,之后再创建一个消费者去消费其中的消息,等待消费完这些消息之后就同步提交消费位移(调用commitSync()方法,这个方法的细节在下面详细介绍),最后我们观察一下lastConsumedOffset、committed offset和position的值。示例代码如代码清单3-2所示。

代码清单3-2 消费位移的演示

示例中先通过assign()方法订阅了编号为0的分区,然后消费分区中的消息。示例中还通过调用 ConsumerRecords.isEmpty()方法来判断是否已经消费完分区中的消息,以此来退出while(true)的循环,当然这段逻辑并不严谨,这里只是用来演示,读者切勿在实际开发中效仿。

最终的输出结果如下:

可以看出,消费者消费到此分区消息的最大偏移量为377,对应的消费位移lastConsumedOffset也就是377。在消费完之后就执行同步提交,但是最终结果显示所提交的位移committed offset为 378,并且下一次所要拉取的消息的起始偏移量 position 也为 378。在本示例中,position=committed offset=lastConsumedOffset+1,当然position和committed offset并不会一直相同,这一点会在下面的示例中有所体现。

对于位移提交的具体时机的把握也很有讲究,有可能会造成重复消费和消息丢失的现象。参考图3-7,当前一次poll()操作所拉取的消息集为[x+2,x+7],x+2代表上一次提交的消费位移,说明已经完成了x+1之前(包括x+1在内)的所有消息的消费,x+5表示当前正在处理的位置。如果拉取到消息之后就进行了位移提交,即提交了x+8,那么当前消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+8开始的。也就是说,x+5至x+7之间的消息并未能被消费,如此便发生了消息丢失的现象。

再考虑另外一种情形,位移提交的动作是在消费完所有拉取到的消息之后才执行的,那么当消费x+5的时候遇到了异常,在故障恢复之后,我们重新拉取的消息是从x+2开始的。也就是说,x+2至x+4之间的消息又重新消费了一遍,故而又发生了重复消费的现象。

图3-7 消费位移的提交位置

而实际情况还会有比这两种更加复杂的情形,比如第一次的位移提交的位置为 x+8,而下一次的位移提交的位置为x+4,后面会做进一步的分析。

在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是enable.auto.commit参数为true。在代码清单3-1中并没有展示出这两个参数,说明使用的正是默认值。

在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在poll()方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

在Kafka消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看一下图3-8中的情形。拉取线程A不断地拉取消息并存入本地缓存,比如在BlockingQueue中,另一个处理线程B从缓存中读取消息并进行相应的逻辑处理。假设目前进行到了第y+1次拉取,以及第m次位移提交的时候,也就是x+6之前的位移已经确认提交了,处理线程B却还正在消费x+3的消息。此时如果处理线程B发生了异常,待其恢复之后会从第m此位移提交处,也就是x+6的位置开始拉取消息,那么x+3至x+6之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。

图3-8 自动位移提交中消息丢失的情况

自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免,与此同时,自动位移提交也无法做到精确的位移管理。在Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费,手动的提交方式可以让开发人员根据程序的逻辑在合适的地方进行位移提交。开启手动提交功能的前提是消费者客户端参数enable.auto.commit配置为false,示例如下:

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和commitAsync()两种类型的方法。我们这里先讲述同步提交的方式,commitSync()方法的定义如下:

这个方法很简单,下面使用它演示同步提交的简单用法:

可以看到示例中先对拉取到的每一条消息做相应的逻辑处理,然后对整个消息集做同步提交。参考 KafkaConsumer 源码中提供的示例,针对上面的示例还可以修改为批量处理+批量提交的方式,关键代码如下:

上面的示例中将拉取到的消息存入缓存 buffer,等到积累到足够多的时候,也就是示例中大于等于200个的时候,再做相应的批量处理,之后再做批量提交。这两个示例都有重复消费的问题,如果在业务逻辑处理完之后,并且在同步位移提交前,程序出现了崩溃,那么待恢复之后又只能从上一次位移提交的地方拉取消息,由此在两次位移提交的窗口中出现了重复消费的现象。

commitSync()方法会根据poll()方法拉取的最新位移来进行提交(注意提交的值对应于图3-6中position的位置),只要没有发生不可恢复的错误(Unrecoverable Error),它就会阻塞消费者线程直至位移提交完成。对于不可恢复的错误,比如CommitFailedException、WakeupException、InterruptException、AuthenticationException、AuthorizationException等,我们可以将其捕获并做针对性的处理。

对于采用 commitSync()的无参方法而言,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个含参方法,具体定义如下:

该方法提供了一个 offsets 参数,用来提交指定分区的位移。无参的 commitSync()方法只能提交当前批次对应的 position 值。如果需要提交一个中间值,比如业务每消费一条消息就提交一次位移,那么就可以使用这种方式,我们来看一下代码示例,如代码清单3-3所示。

代码清单3-3 带参数的同步位移提交

在实际应用中,很少会有这种每消费一条消息就提交一次消费位移的必要场景。commitSync()方法本身是同步执行的,会耗费一定的性能,而示例中的这种提交方式会将性能拉到一个相当低的点。更多时候是按照分区的粒度划分提交位移的界限,这里我们就要用到了3.2.4 章中提及的 ConsumerRecords 类的 partitions()方法和 records(TopicPartition)方法,关键示例代码如代码清单3-4所示(修改自KafkaConsumer源码中的示例,注意代码中加粗的部分)。

代码清单3-4 按分区粒度同步提交消费位移

与commitSync()方法相反,异步提交的方式(commitAsync())在执行的时候消费者线程不会被阻塞,可能在提交消费位移的结果还未返回之前就开始了新一次的拉取操作。异步提交可以使消费者的性能得到一定的增强。commitAsync方法有三个不同的重载方法,具体定义如下:

第一个无参的方法和第三个方法中的offsets都很好理解,对照commitSync()方法即可。关键的是这里的第二个方法和第三个方法中的callback参数,它提供了一个异步提交的回调方法,当位移提交完成后会回调 OffsetCommitCallback 中的 onComplete()方法。这里采用第二个方法来演示回调函数的用法,关键代码如下:

commitAsync()提交的时候同样会有失败的情况发生,那么我们应该怎么处理呢?读者有可能想到的是重试,问题的关键也就在这里了。如果某一次异步提交的消费位移为 x,但是提交失败了,然后下一次又异步提交了消费位移为 x+y,这次成功了。如果这里引入了重试机制,前一次的异步提交的消费位移在重试的时候提交成功了,那么此时的消费位移又变为了 x。如果此时发生异常(或者再均衡),那么恢复之后的消费者(或者新的消费者)就会从x处开始消费消息,这样就发生了重复消费的问题。

为此我们可以设置一个递增的序号来维护异步提交的顺序,每次位移提交之后就增加序号相对应的值。在遇到位移提交失败需要重试的时候,可以检查所提交的位移和序号的值的大小,如果前者小于后者,则说明有更大的位移已经提交了,不需要再进行本次重试;如果两者相同,则说明可以进行重试提交。除非程序编码错误,否则不会出现前者大于后者的情况。

如果位移提交失败的情况经常发生,那么说明系统肯定出现了故障,在一般情况下,位移提交失败的情况很少发生,不重试也没有关系,后面的提交也会有成功的。重试会增加代码逻辑的复杂度,不重试会增加重复消费的概率。如果消费者异常退出,那么这个重复消费的问题就很难避免,因为这种情况下无法及时提交消费位移;如果消费者正常退出或发生再均衡的情况,那么可以在退出或再均衡执行之前使用同步提交的方式做最后的把关。

示例代码中加粗的部分是在消费者正常退出时为位移提交“把关”添加的。发生再均衡情况的“把关”会在3.2.8节中做详细介绍。

3.2.6 控制或关闭消费

KafkaConsumer 提供了对消费速度进行控制的方法,在有些应用场景下我们可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。KafkaConsumer中使用pause()和resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区向客户端返回数据的操作。这两个方法的具体定义如下:

KafkaConsumer还提供了一个无参的paused()方法来返回被暂停的分区集合,此方法的具体定义如下:

之前的示例展示的都是使用一个while循环来包裹住poll()方法及相应的消费逻辑,如何优雅地退出这个循环也很有考究。细心的读者可能注意到有些示例代码并不是以 while(true)的形式做简单的包裹,而是使用 while(isRunning.get())的方式,这样可以通过在其他地方设定isRunning.set(false)来退出while循环。还有一种方式是调用KafkaConsumer的wakeup()方法,wakeup()方法是 KafkaConsumer 中唯一可以从其他线程里安全调用的方法(KafkaConsumer 是非线程安全的,可以通过3.2.10节了解更多细节),调用wakeup()方法后可以退出poll()的逻辑,并抛出 WakeupException 的异常,我们也不需要处理 WakeupException 的异常,它只是一种跳出循环的方式。

跳出循环以后一定要显式地执行关闭动作以释放运行过程中占用的各种系统资源,包括内存资源、Socket连接等。KafkaConsumer提供了close()方法来实现关闭,close()方法有三种重载方法,分别如下:

第二种方法是通过 timeout 参数来设定关闭方法的最长执行时间,有些内部的关闭逻辑会耗费一定的时间,比如设置了自动提交消费位移,这里还会做一次位移提交的动作;而第一种方法没有 timeout 参数,这并不意味着会无限制地等待,它内部设定了最长等待时间(30秒);第三种方法已被标记为@Deprecated,可以不考虑。

一个相对完整的消费程序的逻辑可以参考下面的伪代码:

当关闭这个消费逻辑的时候,可以调用consumer.wakeup(),也可以调用isRunning.set(false)。

3.2.7 指定位移消费

在3.2.5节中我们讲述了如何进行消费位移的提交,正是有了消费位移的持久化,才使消费者在关闭、崩溃或者在遇到再均衡的时候,可以让接替的消费者能够根据存储的消费位移继续进行消费。

试想一下,当一个新的消费组建立的时候,它根本没有可以查找的消费位移。或者消费组内的一个新消费者订阅了一个新的主题,它也没有可以查找的消费位移。当__consumer_offsets主题中有关这个消费组的位移信息过期而被删除后,它也没有可以查找的消费位移。

在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费,这个参数的默认值为“latest”,表示从分区末尾开始消费消息。参考图3-9,按照默认的配置,消费者会从9开始进行消费(9是下一条要写入消息的位置),更加确切地说是从9开始拉取消息。如果将auto.offset.reset参数配置为“earliest”,那么消费者会从起始处,也就是0开始消费。

举个例子,在 auto.offset.reset 参数默认的配置下,用一个新的消费组来消费主题topic-demo时,客户端会报出重置位移的提示信息,参考如下:

图3-9 auto.offset.reset配置

除了查找不到消费位移,位移越界也会触发 auto.offset.reset 参数的执行,这个在下面要讲述的seek系列的方法中会有相关的介绍。

auto.offset.reset参数还有一个可配置的值—“none”,配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForPartitionException异常,示例如下:

如果能够找到消费位移,那么配置为“none”不会出现任何异常。如果配置的不是“latest”、“earliest”和“none”,则会报出ConfigException异常,示例如下:

到目前为止,我们知道消息的拉取是根据poll()方法中的逻辑来处理的,这个poll()方法中的逻辑对于普通的开发人员而言是一个黑盒,无法精确地掌控其消费的起始位置。提供的auto.offset.reset 参数也只能在找不到消费位移或位移越界的情况下粗粒度地从开头或末尾开始消费。有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的 seek()方法正好提供了这个功能,让我们得以追前消费或回溯消费。seek()方法的具体定义如下:

seek()方法中的参数partition表示分区,而offset参数用来指定从分区的哪个位置开始消费。seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在 poll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次poll()方法,等到分配到分区之后才可以重置消费位置。seek()方法的使用示例如代码清单3-5所示(只列出关键代码)。

代码清单3-5 seek方法的使用示例

上面示例中第③行设置了每个分区的消费位置为10。第②行中的assignment()方法是用来获取消费者所分配到的分区信息的,这个方法的具体定义如下:

如果我们将代码清单3-5中第①行poll()方法的参数设置为0,即这一行替换为:

在此之后,会发现seek()方法并未有任何作用。因为当poll()方法中的参数为0时,此方法立刻返回,那么poll()方法内部进行分区分配的逻辑就会来不及实施。也就是说,消费者此时并未分配到任何分区,如此第②行中的assignment便是一个空列表,第③行代码也不会执行。那么这里的 timeout 参数设置为多少合适呢?太短会使分配分区的动作失败,太长又有可能造成一些不必要的等待。我们可以通过KafkaConsumer的assignment()方法来判定是否分配到了相应的分区,参考下面的代码清单3-6:

代码清单3-6 seek()方法的另一种使用示例

如果对未分配到的分区执行seek()方法,那么会报出 IllegalStateException的异常。类似在调用subscribe()方法之后直接调用seek()方法:

会报出如下的异常:

如果消费组内的消费者在启动的时候能够找到消费位移,除非发生位移越界,否则auto.offset.reset参数并不会奏效,此时如果想指定从开头或末尾开始消费,就需要seek()方法的帮助了,代码清单3-7用来指定从分区末尾开始消费。

代码清单3-7 使用seek()方法从分区末尾消费

代码清单3-7中第①行的endOffsets()方法用来获取指定分区的末尾的消息位置,参考图3-9中9的位置,注意这里获取的不是8,是将要写入最新消息的位置。endOffsets的具体方法定义如下:

其中partitions参数表示分区集合,而timeout参数用来设置等待获取的超时时间。如果没有指定 timeout 参数的值,那么 endOffsets()方法的等待时间由客户端参数request.timeout.ms 来设置,默认值为 30000。与 endOffsets 对应的是 beginningOffsets()方法,一个分区的起始位置起初是0,但并不代表每时每刻都为0,因为日志清理的动作会清理旧的数据,所以分区的起始位置会自然而然地增加,日志清理的相关细节可以参考 5.4 节。beginningOffsets()方法的具体定义如下:

beginningOffsets()方法中的参数内容和含义都与 endOffsets()方法中的一样,配合这两个方法我们就可以从分区的开头或末尾开始消费。其实KafkaConsumer中直接提供了seekToBeginning()方法和seekToEnd()方法来实现这两个功能,这两个方法的具体定义如下:

有时候我们并不知道特定的消费位置,却知道一个相关的时间点,比如我们想要消费昨天8点之后的消息,这个需求更符合正常的思维逻辑。此时我们无法直接使用seek()方法来追溯到相应的位置。KafkaConsumer同样考虑到了这种情况,它提供了一个offsetsForTimes()方法,通过timestamp来查询与此对应的分区位置。

offsetsForTimes()方法的参数timestampsToSearch是一个Map类型,key为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于OffsetAndTimestamp中的offset和timestamp字段。

下面的示例演示了offsetsForTimes()和seek()之间的使用方法,首先通过offsetForTimes()方法获取一天之前的消息位置,然后使用 seek()方法追溯到相应位置开始消费,示例中的assignment变量和代码清单3-7中的一样,表示消费者分配到的分区集合。

前面说过位移越界也会触发 auto.offset.reset 参数的执行,位移越界是指知道消费位置却无法在实际的分区中查找到,比如想要从图3-9中的位置10处拉取消息时就会发生位移越界。注意拉取图3-9中位置9处的消息时并未越界,这个位置代表特定的含义(LEO)。我们通过seek()方法来演示发生位移越界时的情形,将代码清单3-7中的第②行代码修改为:

此时客户端会报出如下的提示信息:

通过上面加粗的提示信息可以了解到,原本拉取位置为101(fetch offset 101),但已经越界了(out of range),所以此时会根据auto.offset.reset参数的默认值来将拉取位置重置(resetting offset)为100,我们也能知道此时分区topic-demo-3中最大的消息offset为99。

3.2.5节中提及了Kafka中的消费位移是存储在一个内部主题中的,而本节的seek()方法可以突破这一限制:消费位移可以保存在任意的存储介质中,例如数据库、文件系统等。以数据库为例,我们将消费位移保存在其中的一个表中,在下次消费的时候可以读取存储在数据表中的消费位移并通过seek()方法指向这个具体的位置,伪代码如代码清单3-8所示。

代码清单3-8 消费位移保存在DB中

seek()方法为我们提供了从特定位置读取消息的能力,我们可以通过这个方法来向前跳过若干消息,也可以通过这个方法来向后回溯若干消息,这样为消息的消费提供了很大的灵活性。seek()方法也为我们提供了将消费位移保存在外部存储介质中的能力,还可以配合再均衡监听器来提供更加精准的消费能力。

3.2.8 再均衡

再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。比如消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作,之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍,也就是发生了重复消费。一般情况下,应尽量避免不必要的再均衡的发生。

3.2.2 节中在讲述 subscribe()方法时提及再均衡监听器 ConsumerRebalanceListener,在subscribe(Collection<String> topics,ConsumerRebalanceListener listener) 和 subscribe(Pattern pattern,ConsumerRebalanceListener listener)方法中都有它的身影。再均衡监听器用来设定发生再均衡动作前后的一些准备或收尾的动作。ConsumerRebalanceListener 是一个接口,包含2 个方法,具体的释义如下:

(1)void onPartitionsRevoked(Collection<TopicPartition>partitions)

这个方法会在再均衡开始之前和消费者停止读取消息之后被调用。可以通过这个回调方法来处理消费位移的提交,以此来避免一些不必要的重复消费现象的发生。参数partitions表示再均衡前所分配到的分区。

(2)void onPartitionsAssigned(Collection<TopicPartition>partitions)

这个方法会在重新分配分区之后和消费者开始读取消费之前被调用。参数partitions表示再均衡后所分配到的分区。

下面我们通过一个例子来演示ConsumerRebalanceListener的用法,具体内容如代码清单3-9所示。

代码清单3-9 再均衡监听器的用法

代码清单3-10中将消费位移暂存到一个局部变量currentOffsets中,这样在正常消费的时候可以通过commitAsync()方法来异步提交消费位移,在发生再均衡动作之前可以通过再均衡监听器的onPartitionsRevoked()回调执行commitSync()方法同步提交消费位移,以尽量避免一些不必要的重复消费。

再均衡监听器还可以配合外部存储使用。在代码清单 3-8 中,我们将消费位移保存在数据库中,这里可以通过再均衡监听器查找分配到的分区的消费位移,并且配合 seek()方法来进一步优化代码逻辑,将代码清单3-8中的第一行代码修改为如下内容:

本节只是简单演示了再均衡监听器的用法,再均衡期间消费者客户端与Kafka服务端之间的交互逻辑及相关原理并不简单,更多的细节可以参考7.2节的内容。

3.2.9 消费者拦截器

2.1.5节中讲述了生产者拦截器的使用,对应的消费者也有相应的拦截器的概念。消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。

与生产者拦截器对应的,消费者拦截器需要自定义实现 org.apache.kafka.clients.consumer.ConsumerInterceptor接口。ConsumerInterceptor接口包含3个方法:

· public ConsumerRecords<K,V>onConsume(ConsumerRecords<K,V>records);

· public void onCommit(Map<TopicPartition,OffsetAndMetadata>offsets);

· public void close().

KafkaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作,比如修改返回的消息内容、按照某种规则过滤消息(可能会减少poll()方法返回的消息的个数)。如果 onConsume()方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递。

KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit()方法,可以使用这个方法来记录跟踪所提交的位移信息,比如当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的具体细节,而使用拦截器的onCommit()方法却可以做到这一点。

close()方法和ConsumerInterceptor的父接口中的configure()方法与生产者的ProducerInterceptor接口中的用途一样,这里就不赘述了。

在某些业务场景中会对消息设置一个有效期的属性,如果某条消息在既定的时间窗口内无法到达,那么就会被视为无效,它也就不需要再被继续处理了。下面使用消费者拦截器来实现一个简单的消息TTL(Time to Live,即过期时间)的功能。在代码清单3-10中,自定义的消费者拦截器 ConsumerInterceptorTTL使用消息的 timestamp 字段来判定是否过期,如果消息的时间戳与当前的时间戳相差超过10秒则判定为过期,那么这条消息也就被过滤而不投递给具体的消费者。

代码清单3-10 自定义的消费者拦截器

实现自定义的ConsumerInterceptorTTL之后,需要在KafkaConsumer中配置指定这个拦截器,这个指定的配置和KafkaProducer中的一样,也是通过interceptor.classes参数实现的,此参数的默认值为“”。示例如下:

我们在发送消息的时候修改ProducerRecord中的timestamp的值来使其变得超时,具体可以参考下面的示例:

示例代码中一共发送了三条消息:“first-expire-data”“normal-data”和“last-expire-data”,其中第一条和第三条消息都被修改成超时了,那么此时消费者通过 poll()方法只能拉取到“normal-data”这一条消息,另外两条就被过滤了。

不过使用这种功能时需要注意的是:在使用类似代码清单 3-3 中这种带参数的位移提交的方式时,有可能提交了错误的位移信息。在一次消息拉取的批次中,可能含有最大偏移量的消息会被消费者拦截器过滤。有关消息TTL的更多内容可以参考11.1节。

在消费者中也有拦截链的概念,和生产者的拦截链一样,也是按照interceptor.classes参数配置的拦截器的顺序来一一执行的(配置的时候,各个拦截器之间使用逗号隔开)。同样也要提防“副作用”的发生。如果在拦截链中某个拦截器执行失败,那么下一个拦截器会接着从上一个执行成功的拦截器继续执行。

3.2.10 多线程实现

KafkaProducer是线程安全的,然而KafkaConsumer却是非线程安全的。KafkaConsumer中定义了一个 acquire()方法,用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出ConcurrentModifcationException异常:

KafkaConsumer中的每个公用方法在执行所要执行的动作之前都会调用这个acquire()方法,只有wakeup()方法是个例外,具体用法可以参考3.2.6节。acquire ()方法的具体定义如下:

acquire()方法和我们通常所说的锁(synchronized、Lock等)不同,它不会造成阻塞等待,我们可以将其看作一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁和解锁操作。release()方法也很简单,具体定义如下:

acquire()方法和release()方法都是私有方法,因此在实际应用中不需要我们显式地调用,但了解其内部的机理之后可以促使我们正确、有效地编写相应的程序逻辑。

KafkaConsumer 非线程安全并不意味着我们在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于Kafka 中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。多线程的实现方式有多种,第一种也是最常见的方式:线程封闭,即为每个线程实例化一个KafkaConsumer对象,如图3-10所示。

图3-10 一个线程对应一个KafkaConsumer实例

一个线程对应一个KafkaConsumer实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数,根据 3.1 节中介绍的消费者与分区数的关系,当消费线程的个数大于分区数时,就有部分消费线程一直处于空闲的状态。

与此对应的第二种方式是多个消费线程同时消费同一个分区,这个通过 assign()、seek()等方法实现,这样可以打破原有的消费线程的个数不能超过分区数的限制,进一步提高了消费的能力。不过这种实现方式对于位移提交和顺序控制的处理就会变得非常复杂,实际应用中使用得极少,笔者也并不推荐。一般而言,分区是消费线程的最小划分单位。下面我们通过实际编码来演示第一种多线程消费实现的方式,详细示例参考如代码清单3-11所示。

代码清单3-11 第一种多线程消费实现方式

内部类KafkaConsumerThread代表消费线程,其内部包裹着一个独立的KafkaConsumer实例。通过外部类的main()方法来启动多个消费线程,消费线程的数量由consumerThreadNum变量指定。一般一个主题的分区数事先可以知晓,可以将consumerThreadNum设置成不大于分区数的值,如果不知道主题的分区数,那么也可以通过KafkaConsumer类的partitionsFor()方法来间接获取,进而再设置合理的consumerThreadNum值。

上面这种多线程的实现方式和开启多个消费进程的方式没有本质上的区别,它的优点是每个线程可以按顺序消费各个分区中的消息。缺点也很明显,每个消费线程都要维护一个独立的TCP连接,如果分区数和consumerThreadNum的值都很大,那么会造成不小的系统开销。

参考代码清单 3-11 中的第①行,如果这里对消息的处理非常迅速,那么 poll()拉取的频次也会更高,进而整体消费的性能也会提升;相反,如果在这里对消息的处理缓慢,比如进行一个事务性操作,或者等待一个RPC的同步响应,那么poll()拉取的频次也会随之下降,进而造成整体消费性能的下降。一般而言,poll()拉取消息的速度是相当快的,而整体消费的瓶颈也正是在处理消息这一块,如果我们通过一定的方式来改进这一部分,那么我们就能带动整体消费性能的提升。参考图3-11,考虑第三种实现方式,将处理消息模块改成多线程的实现方式,具体实现如代码清单3-12所示。

图3-11 第三种多线程消费实现方式

代码清单3-12 第三种多线程消费实现方式

代码清单3-12中的RecordHandler类是用来处理消息的,而KafkaConsumerThread类对应的是一个消费线程,里面通过线程池的方式来调用 RecordHandler 处理一批批的消息。注意KafkaConsumerThread类中ThreadPoolExecutor里的最后一个参数设置的是CallerRunsPolicy(),这样可以防止线程池的总体消费能力跟不上poll()拉取的能力,从而导致异常现象的发生。第三种实现方式还可以横向扩展,通过开启多个 KafkaConsumerThread 实例来进一步提升整体的消费能力。

第三种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少TCP连接对系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难了。在代码清单 3-11 中的initConfig()方法里笔者特意加了一个配置:

这样旨在说明在具体实现的时候并没有考虑位移提交的情况。对于第一种实现方式而言,如果要做具体的位移提交,它的具体实现和 3.2.5 节讲述的位移提交没有什么区别,直接在KafkaConsumerThread 中的 run()方法里实现即可。而对于第三种实现方式,这里引入一个共享变量offsets来参与提交,如图3-12所示。

图3-12 带有具体位移提交的第三种实现方式

每一个处理消息的 RecordHandler 类在处理完消息之后都将对应的消费位移保存到共享变量offsets中,KafkaConsumerThread在每一次poll()方法之后都读取offsets中的内容并对其进行位移提交。注意在实现的过程中对offsets读写需要加锁处理,防止出现并发问题。并且在写入offsets的时候需要注意位移覆盖的问题,针对这个问题,可以将RecordHandler类中的run()方法实现改为如下内容(参考代码清单3-4):

对应的位移提交实现可以添加在代码清单3-12中KafkaConsumerThread类的第①行代码下方,具体实现参考如下:

读者可以细想一下这样实现是否万无一失?其实这种位移提交的方式会有数据丢失的风险。对于同一个分区中的消息,假设一个处理线程RecordHandler1正在处理offset为0~99的消息,而另一个处理线程RecordHandler2已经处理完了offset为100~199的消息并进行了位移提交,此时如果RecordHandler1发生异常,则之后的消费只能从200开始而无法再次消费0~99的消息,从而造成了消息丢失的现象。这里虽然针对位移覆盖做了一定的处理,但还没有解决异常情况下的位移覆盖问题。对此就要引入更加复杂的处理机制,这里再提供一种解决思路,参考图3-13,总体结构上是基于滑动窗口实现的。对于第三种实现方式而言,它所呈现的结构是通过消费者拉取分批次的消息,然后提交给多线程进行处理,而这里的滑动窗口式的实现方式是将拉取到的消息暂存起来,多个消费线程可以拉取暂存的消息,这个用于暂存消息的缓存大小即为滑动窗口的大小,总体上而言没有太多的变化,不同的是对于消费位移的把控。

图3-13 滑动窗口式多线程消费实现方式

如图3-13所示,每一个方格代表一个批次的消息,一个滑动窗口包含若干方格,startOffset标注的是当前滑动窗口的起始位置,endOffset标注的是末尾位置。每当startOffset指向的方格中的消息被消费完成,就可以提交这部分的位移,与此同时,窗口向前滑动一格,删除原来startOffset所指方格中对应的消息,并且拉取新的消息进入窗口。滑动窗口的大小固定,所对应的用来暂存消息的缓存大小也就固定了,这部分内存开销可控。方格大小和滑动窗口的大小同时决定了消费线程的并发数:一个方格对应一个消费线程,对于窗口大小固定的情况,方格越小并行度越高;对于方格大小固定的情况,窗口越大并行度越高。不过,若窗口设置得过大,不仅会增大内存的开销,而且在发生异常(比如Crash)的情况下也会引起大量的重复消费,同时还考虑线程切换的开销,建议根据实际情况设置一个合理的值,不管是对于方格还是窗口而言,过大或过小都不合适。

如果一个方格内的消息无法被标记为消费完成,那么就会造成 startOffset 的悬停。为了使窗口能够继续向前滑动,那么就需要设定一个阈值,当 startOffset 悬停一定的时间后就对这部分消息进行本地重试消费,如果重试失败就转入重试队列,如果还不奏效就转入死信队列,有关Kafka中重试队列和死信队列的实现可以参考11.3节。真实应用中无法消费的情况极少,一般是由业务代码的处理逻辑引起的,比如消息中的内容格式与业务处理的内容格式不符,无法对这条消息进行决断,这种情况可以通过优化代码逻辑或采取丢弃策略来避免。如果需要消息高度可靠,也可以将无法进行业务逻辑的消息(这类消息可以称为死信)存入磁盘、数据库或Kafka,然后继续消费下一条消息以保证整体消费进度合理推进,之后可以通过一个额外的处理任务来分析死信进而找出异常的原因。

3.2.11 重要的消费者参数

在KafkaConsumer中,除了3.2.1节提及的4个默认的客户端参数,大部分的参数都有合理的默认值,一般我们也不需要去修改它们。不过了解这些参数可以让我们更好地使用消费者客户端,其中还有一些重要的参数涉及程序的可用性和性能,如果能够熟练掌握它们,也可以让我们在编写相关的程序时能够更好地进行性能调优与故障排查。下面挑选一些重要的参数来做细致的讲解。

1.fetch.min.bytes

该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1(B)。Kafka在收到Consumer的拉取请求时,如果返回给Consumer的数据量小于这个参数所配置的值,那么它就需要进行等待,直到数据量满足这个参数的配置大小。可以适当调大这个参数的值以提高一定的吞吐量,不过也会造成额外的延迟(latency),对于延迟敏感的应用可能就不可取了。

2.fetch.max.bytes

该参数与fetch.max.bytes参数对应,它用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为 52428800(B),也就是 50MB。如果这个参数设置的值比任何一条写入Kafka中的消息要小,那么会不会造成无法消费呢?很多资料对此参数的解读认为是无法消费的,比如一条消息的大小为10B,而这个参数的值是1(B),既然此参数设定的值是一次拉取请求中所能拉取的最大数据量,那么显然 1B<10B,所以无法拉取。这个观点是错误的,该参数设定的不是绝对的最大值,如果在第一个非空分区中拉取的第一条消息大于该值,那么该消息将仍然返回,以确保消费者继续工作。也就是说,上面问题的答案是可以正常消费。与此相关的,Kafka中所能接收的最大消息的大小通过服务端参数message.max.bytes(对应于主题端参数max.message.bytes)来设置。

3.fetch.max.wait.ms

这个参数也和fetch.min.bytes参数有关,如果Kafka仅仅参考fetch.min.bytes参数的要求,那么有可能会一直阻塞等待而无法发送响应给 Consumer,显然这是不合理的。fetch.max.wait.ms参数用于指定Kafka的等待时间,默认值为500(ms)。如果Kafka中没有足够多的消息而满足不了fetch.min.bytes参数的要求,那么最终会等待500ms。这个参数的设定和Consumer与Kafka之间的延迟也有关系,如果业务应用对延迟敏感,那么可以适当调小这个参数。

4.max.partition.fetch.bytes

这个参数用来配置从每个分区里返回给Consumer的最大数据量,默认值为1048576(B),即1MB。这个参数与 fetch.max.bytes 参数相似,只不过前者用来限制一次拉取中每个分区的消息大小,而后者用来限制一次拉取中整体消息的大小。同样,如果这个参数设定的值比消息的大小要小,那么也不会造成无法消费,Kafka 为了保持消费逻辑的正常运转不会对此做强硬的限制。

5.max.poll.records

这个参数用来配置Consumer在一次拉取请求中拉取的最大消息数,默认值为500(条)。如果消息的大小都比较小,则可以适当调大这个参数值来提升一定的消费速度。

6.connections.max.idle.ms

这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。

7.exclude.internal.topics

Kafka中有两个内部的主题:__consumer_offsets和__transaction_state。exclude.internal.topics用来指定Kafka中的内部主题是否可以向消费者公开,默认值为true。如果设置为true,那么只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false则没有这个限制。

8.receive.buffer.bytes

这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为65536(B),即64KB。如果设置为-1,则使用操作系统的默认值。如果Consumer与Kafka处于不同的机房,则可以适当调大这个参数值。

9.send.buffer.bytes

这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。

10.request.timeout.ms

这个参数用来配置Consumer等待请求响应的最长时间,默认值为30000(ms)。

11.metadata.max.age.ms

这个参数用来配置元数据的过期时间,默认值为300000(ms),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker加入。

12.reconnect.backoff.ms

这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间),避免频繁地连接主机,默认值为50(ms)。这种机制适用于消费者向broker发送的所有请求。

13.retry.backoff.ms

这个参数用来配置尝试重新发送失败的请求到指定的主题分区之前的等待(退避)时间,避免在某些故障情况下频繁地重复发送,默认值为100(ms)。

14.isolation.level

这个参数用来配置消费者的事务隔离级别。字符串类型,有效值为“read_uncommitted”和“read_committed”,表示消费者所消费到的位置,如果设置为“read_committed”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO(LastStableOffset)的位置,默认情况下为“read_uncommitted”,即可以消费到HW(High Watermark)处的位置。有关事务和LSO的内容可以分别参考7.4节和10.2节。

还有一些消费者参数在本节没有提及,这些参数同样非常重要,它们需要用单独的章节或场景中描述。部分参数在前面的章节内容中已经提及,比如 boostrap.servers;还有部分参数会在后面的章节内容中提及,比如 heartbeat.interval.ms。表 3-1 罗列了部分消费者客户端的重要参数。

表3-1 部分消费者客户端的重要参数

续表