Kafka权威指南
上QQ阅读APP看书,第一时间看更新

3.2 创建Kafka生产者

要往Kafka写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka生产者有3个必选的属性。

bootstrap.servers

该属性指定broker的地址清单,地址的格式为host:port。清单里不需要包含所有的broker地址,生产者会从给定的broker里查找到其他broker的信息。不过建议至少要提供两个broker的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

key.serializer

broker希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把Java对象作为键和值发送给broker。这样的代码具有良好的可读性,不过生产者需要知道如何把这些Java对象转换成字节数组。key.serializer必须被设置为一个实现了org.apache.kafka.common.serialization.Serializer接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka客户端默认提供了ByteArraySerializer(这个只做很少的事情)、StringSerializer和IntegerSerializer,因此,如果你只使用常见的几种Java对象类型,那么就没必要实现自己的序列化器。要注意,key. serializer是必须设置的,就算你打算只发送值内容。

value.serializer

与key.serializer一样,value.serializer指定的类会将值序列化。如果键和值都是字符串,可以使用与key.serializer一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。

下面的代码片段演示了如何创建一个新的生产者,这里只指定了必要的属性,其他使用默认设置。

        private Properties kafkaProps = new Properties(); ➊
        kafkaProps.put("bootstrap.servers", "broker1:9092, broker2:9092");

        kafkaProps.put("key.serializer",
          "org.apache.kafka.common.serialization.StringSerializer"); ➋
        kafkaProps.put("value.serializer",
          "org.apache.kafka.common.serialization.StringSerializer");

        producer = new KafkaProducer<String, String>(kafkaProps); ➌

➊ 新建一个Properties对象。

➋ 因为我们打算把键和值定义成字符串类型,所以使用内置的StringSerializer。

➌ 在这里我们创建了一个新的生产者对象,并为键和值设置了恰当的类型,然后把Properties对象传给它。

这个接口很简单,通过配置生产者的不同属性就可以很大程度地控制它的行为。Kafka的文档涵盖了所有的配置参数,我们将在这一章的后面部分介绍其中几个比较重要的参数。实例化生产者对象后,接下来就可以开始发送消息了。发送消息主要有以下3种方式。

发送并忘记(fire-and-forget)

我们把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为Kafka是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。

同步发送

我们使用send()方法发送消息,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功。

异步发送

我们调用send()方法,并指定一个回调函数,服务器在返回响应时调用该函数。

在下面的几个例子中,我们会介绍如何使用上述几种方式来发送消息,以及如何处理可能发生的异常情况。

本章的所有例子都使用单线程,但其实生产者是可以使用多线程来发送消息的。刚开始的时候可以使用单个消费者和单个线程。如果需要更高的吞吐量,可以在生产者数量不变的前提下增加线程数量。如果这样做还不够,可以增加生产者数量。