Kafka进阶
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

3.5 生产者的参数配置

Kafka生产者端的配置参数,除了之前介绍过的bootstrap.servers、key.serializer和value.serializer三个必须参数,还有很多可选的参数。下面列举了生产者Producer的配置参数及它们的含义。

1. acks

这个参数控制了发送消息的耐用性,用于指定分区中必须有多少个副本成功接收到消息,之后生产者才会认为这条消息写入是成功的,即生产者需要leader确认请求完成之前接收的应答数。通过查看ProducerConfig的源码可以看出acks参数的本质其实就是一个字符串。

acks参数有三种类型的值。

(1)acks=1。

这是acks参数的默认值。Kafka的生产者将消息发送到Kafka的Broker服务器端。只要Topic分区中的Leader成功写入消息,就算该消息成功发送。这时候,生产者就会收到Kafka服务器端Broker的成功确认信息,说明发送成功。

如果在生产者写入消息的过程中,Leader分区所在的Broker出现了宕机,将会造成消息无法正常写入。在重新选举Leader的过程中,生产者Producer会受到一个服务器端返回的错误信息。生产者为了支持容错,避免消息的丢失,会尝试重新发送该消息。直至消息成功写入Leader分区。

这里需要注意的是,如果消息已经成功写入Leader所在的分区,但还未同步至其他Follower分区的时候,如果Leader分区所在的Broker出现了宕机,这时候会造成写入Leader分区的消息丢失。所以在这种参数值的设置下,消息是可能丢失的。

可以通过下面的代码进行设置。

(2)acks=0。

在这种参数设置下,Kafka的生产者不需要等待任何服务器端的响应,所以这时Kafka集群可以达到最大的吞吐量。如果消息从生产者发送到写入Kafka消息系统的过程中出现异常,比如Broker宕机,生产者将不会得到任何反馈信息,也不会重发消息,导致消息丢失。

可以通过下面的代码进行设置。

(3)acks=-1或acks=all。

在这种参数设置下,Kafka集群将达到最高的可靠性。生产者发送完消息后,需要等待Leader分区和所有Follower分区都成功写入消息后,才返回给生产者一个成功写入消息的应答响应。

可以通过下面的代码进行设置。

最后,需要强调的是,acks参数的值是一个字符串,不能是其他数据类型。如果像下面这样设置acks的参数值。

将会抛出如下Exception错误信息。

2. buffer.memory

Kafka生产者的Sender线程在将消息发送到Kafka服务器端之前,会把消息缓存到内存中,这个参数就决定了消息缓存的内存大小,其默认值是32MB。如果生产者产生消息的速度大于将消息发送到服务器端的速度,那么生产者将会被阻塞,并最终导致生产者抛出一个RecordTooLargeException的异常,如下面KafkaProducer中的源码所示。

在实际的生产环境下,应该根据实际情况进行测试最终决定buffer.memory参数值的大小。例如,客户端线程每秒会写入多少消息的数据量?按照默认值32MB的大小,是否会经常把内存缓冲写满?如果内存很快会写满,再调整buffer.memory。经过这样的压测,你可以调试出来一个合理的内存大小。

• batch.size。

当Kafka客户端将多个消息发送到同一个分区的时候,生产者为了减少客户端与服务器端的请求交互,会尝试将消息批量打包在一起,进行统一发送,这样有助于提升客户端和服务器端的性能。该配置的默认批次大小(以字节为单位)是16 384字节。如果消息的内存大小大于该参数的配置,将不会进行批量打包的过程。

通过提升batch.size的大小,可以允许更多数据缓冲在分区中,那么一次请求服务器端所发送出去的数据量就更多了,这样吞吐量可能会有所提升。但是这样会造成大量内存的浪费。反过来如果减小batch.size的大小,则会系统地降低吞吐量。如果将batch.size设置为0,则批处理机制被禁用。所以需要在这里按照生产环境的发消息速率,调节不同的batch.size大小,从而设置一个最合理的参数。

• compression.type。

该参数指定给到Topic中数据的压缩类型,其有效值的设置可以是标准的压缩方式,例如,'gzip'、'snappy'、'lz4'、'zstd',同时该参数也可以是'uncompressed',在这种设置下,消息数将不会被压缩。

• client.id。

当生产者向服务器端发送请求时,传递给服务器端ID字符串。通过这个ID字符串,Kafka服务器端就可以追踪请求的资源,其本质就是将生产者及其请求的资源进行逻辑上的隔离。

• connections.max.idle.ms。

当生产者不再往服务器端发送消息时,这个参数用来决定关闭生产者连接的时间阈值,其默认值是9min。

• linger.ms。

该参数决定消息在由生产者发送到服务器端之前,在客户端延长发送的时间。通过这样的延时发送机制,可以将多个消息组合成一个批处理进行统一发送。从本质上讲,该参数与之前提到过的batch.size参数类似。合理设置batch.size参数和linger.ms参数,将很好地利用Kafka批处理机制。把linger.ms设置得太小了,比如默认就是0ms,或者设置为5ms,那可能导致Batch虽然设置了32KB,但是经常是还没凑够32KB的数据,5ms之后就直接强制Batch将数据发送出去,这会导致你的Batch形同虚设,一直凑不满数据。

• max.block.ms。

该配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()将消息阻塞多长时间。此外也可能是因为缓冲区已满或元数据不可用,导致这些方法被阻止。在用户提供的序列化程序或分区器中的锁定不会计入此超时,其默认值为60 000ms。

• max.request.size。

Kafka生产者能发送消息的最大值,默认值为1MB。此设置将限制生产者的单个请求中发送的消息批次数,以避免发送过大的请求。这个参数涉及其他一些相关参数,比如服务器Broker端的message.max.bytes参数,如果message.max.bytes参数设置为10,而max. request.size设置为20,这时候就可以造成生产者报错。

• retries和retry.backoff.ms。

如果生产者出现了异常,或者消息没有成功写入Kafka的服务器端,生产者可以配置重试的参数值,通过生产者端的内部重试机制来执行恢复,并不是直接将异常抛出。如果重试达到设定次数,生产者才会放弃重试并抛出异常。retries参数的默认值是0。同时,生产者的重试还与retry.backoff.ms参数有关,该参数用来设定两次重试之间的时间间隔,其默认值是100ms,从而避免无效的频繁重试。在配置retries参数和retry.backoff.ms参数之前,可以设定总重试时间要大于异常恢复时间,最好先估算一下异常恢复时间,避免生产者过早放弃重试。

• receive.buffer.bytes。

这个参数用来设置socket接收消息缓冲区的大小,该缓存区大小的默认值32KB。如果将其值设置为-1,则使用操作系统的默认值。

• send.buffer.bytes。

这个参数用来设置socket发送消息缓冲区的大小,默认值为128KB。与receive.buffer.bytes参数一样,如果将其值设置为-1,则使用操作系统的默认值。

• request.timeout.ms。

消息由生产者发出后,该参数用于决定生产者等待请求响应的最长时间,其默认值为40s。如果响应的时间超过了该参数的设置,客户端将按照重试策略进行重试。注意,这个参数值需要比Broker端的参数replica.lag.time.max.ms值要大,这样可以减少因客户端重试引起的消息重复的概率。

• reconnect.backoff.max.ms。

该参数表示Kafka客户端重连的最大时间。每次连接失败,重连时间都会成指数级增加,每次增加的时间会存在20%的随机浮动,以避免连接风暴。

• reconnect.backoff.ms。

该参数表示Kafka客户端每次重连时候的间隔时间。

• delivery.timeout.ms。

当生产者调用send方法后,该参数用于指定客户端等待发送成功或失败报告时,客户端等待时间的上限。这个时间上限包含以下几部分。

➢ 一条消息在发送前的延时时间。

➢ 生产者等待服务器端Broker确认信息的等待时间。

➢ 失败时的重试时间。

• partitioner.class。

该参数表示一个实现了org.apache.kafka.clients.producer.Partitioner接口的类。Kafka将使用这个类进行分区操作,其默认值是org.apache.kafka.clients.producer.internals.DefaultPartitioner。通过实现这个接口,可以实现自定义分区。

• transaction.timeout.ms。

该参数表示生产者主动终止当前正在进行的操作之前,Kafka等待操作状态更新的最大时间,其默认值是1min。如果该值大于Broker中max.transaction.timeout.ms的设置,则请求失败,并报"InvalidTransactionTimeout"错误。

• transactional.id。

在事务传递过程中该参数用于表示某个事务的ID。这样可以保证跨多个生产者会话时语义的可靠性。因为它允许客户端保证在开始任何新事务之前使用相同的Transactional Id的事务来完成。

• max.in.flight.requests.per.connection。

该参数表示在消息被阻塞前,每个客户端上发送的未应答请求的最大数量,其默认值是5。注意,如果该参数值设置大于1,并且消息发送失败,则由于客户端的重试增加消息重新排序的风险。

• metadata.max.age.ms。

该参数表示当超过这个时间间隔时,系统就会更新元信息,其默认值5min。Kafka的元数据信息由ZooKeeper维护,包含Topic信息、副本信息、分区信息、Broker信息。

• metadata.max.idle.ms。

当Topic处于空闲状态时,该参数用于控制生产者抓取Topic元信息的时间。