3.5 序列化器
我们已经在之前的例子里看到,创建一个生产者对象必须指定序列化器。我们已经知道如何使用默认的字符串序列化器,Kafka还提供了整型和字节数组序列化器,不过它们还不足以满足大部分场景的需求。到最后,我们需要序列化的记录类型会越来越多。
接下来演示如何开发自己的序列化器,并介绍Avro序列化器作为推荐的备选方案。
3.5.1 自定义序列化器
如果发送到Kafka的对象不是简单的字符串或整型,那么可以使用序列化框架来创建消息记录,如Avro、Thrift或Protobuf,或者使用自定义序列化器。我们强烈建议使用通用的序列化框架。不过,为了了解序列化器的工作原理,也为了说明为什么要使用序列化框架,让我们一起来看看如何自定义一个序列化器。
假设你创建了一个简单的类来表示一个客户:
public class Customer { private int customerID; private String customerName; public Customer(int ID, String name) { this.customerID = ID; this.customerName = name; } public int getID() {
return customerID; } public String getName() { return customerName; } }
现在我们要为这个类创建一个序列化器,它看起来可能是这样的:
import org.apache.kafka.common.errors.SerializationException; import java.nio.ByteBuffer; import java.util.Map; public class CustomerSerializer implements Serializer<Customer> { @Override public void configure(Map configs, boolean isKey) { // 不做任何配置 } @Override /** Customer对象被序列化成: 表示customerID的4字节整数 表示customerName长度的4字节整数(如果customerName为空,则长度为0) 表示customerName的N个字节 */ public byte[] serialize(String topic, Customer data) { try { byte[] serializedName; int stringSize; if (data == null) return null; else { if (data.getName() ! = null) { serializedName = data.getName().getBytes("UTF-8"); stringSize = serializedName.length; } else { serializedName = new byte[0]; stringSize = 0; } } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getID()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch (Exception e) { throw new SerializationException("Error when serializing Customer to byte[] " + e); } }
@Override public void close() { // 不需要关闭任何东西 } }
只要使用这个CustomerSerializer,就可以把消息记录定义成ProducerRecord<String, Customer>,并且可以直接把Customer对象传给生产者。这个例子很简单,不过代码看起来太脆弱了——如果我们有多种类型的消费者,可能需要把customerID字段变成长整型,或者为Customer添加startDate字段,这样就会出现新旧消息的兼容性问题。在不同版本的序列化器和反序列化器之间调试兼容性问题着实是个挑战——你需要比较原始的字节数组。更糟糕的是,如果同一个公司的不同团队都需要往Kafka写入Customer数据,那么他们就需要使用相同的序列化器,如果序列化器发生改动,他们几乎要在同一时间修改代码。
基于以上几点原因,我们不建议使用自定义序列化器,而是使用已有的序列化器和反序列化器,比如JSON、Avro、Thrift或Protobuf。下面我们将会介绍Avro,然后演示如何序列化Avro记录并发送给Kafka。
3.5.2 使用Avro序列化
Apache Avro(以下简称Avro)是一种与编程语言无关的序列化格式。Doug Cutting创建了这个项目,目的是提供一种共享数据文件的方式。
Avro数据通过与语言无关的schema来定义。schema通过JSON来描述,数据被序列化成二进制文件或JSON文件,不过一般会使用二进制文件。Avro在读写文件时需要用到schema, schema一般会被内嵌在数据文件里。
Avro有一个很有意思的特性是,当负责写消息的应用程序使用了新的schema,负责读消息的应用程序可以继续处理消息而无需做任何改动,这个特性使得它特别适合用在像Kafka这样的消息系统上。
假设最初的schema是这样的:
{"namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "faxNumber", "type": ["null", "string"], "default": "null"} ➊ ] }
➊ id和name字段是必需的,faxNumber是可选的,默认为null。
假设我们已经使用了这个schema几个月的时间,并用它生成了几个太字节的数据。现在,我们决定在新版本里做一些修改。因为在21世纪不再需要faxNumber字段,需要用email
字段来代替它。
新的schema如下:
{"namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "email", "type": ["null", "string"], "default": "null"} ] }
更新到新版的schema后,旧记录仍然包含faxNumber字段,而新记录则包含email字段。部分负责读取数据的应用程序进行了升级,那么它们是如何处理这些变化的呢?
在应用程序升级之前,它们会调用类似getName()、getId()和getFaxNumber()这样的方法。如果碰到使用新schema构建的消息,getName()和getId()方法仍然能够正常返回,但getFaxNumber()方法会返回null,因为消息里不包含传真号码。
在应用程序升级之后,getEmail()方法取代了getFaxNumber()方法。如果碰到一个使用旧schema构建的消息,那么getEmail()方法会返回null,因为旧消息不包含邮件地址。
现在可以看出使用Avro的好处了:我们修改了消息的schema,但并没有更新所有负责读取数据的应用程序,而这样仍然不会出现异常或阻断性错误,也不需要对现有数据进行大幅更新。
不过这里有以下两个需要注意的地方。
· 用于写入数据和读取数据的schema必须是相互兼容的。Avro文档提到了一些兼容性原则。
· 反序列化器需要用到用于写入数据的schema,即使它可能与用于读取数据的schema不一样。Avro数据文件里就包含了用于写入数据的schema,不过在Kafka里有一种更好的处理方式,下一小节我们会介绍它。
3.5.3 在Kafka里使用Avro
Avro的数据文件里包含了整个schema,不过这样的开销是可接受的。但是如果在每条Kafka记录里都嵌入schema,会让记录的大小成倍地增加。不过不管怎样,在读取记录时仍然需要用到整个schema,所以要先找到schema。我们遵循通用的结构模式并使用“schema注册表”来达到目的。schema注册表并不属于Kafka,现在已经有一些开源的schema注册表实现。在这个例子里,我们使用的是Confluent Schema Registry。该注册表的代码可以在GitHub上找到,你也可以把它作为Confluent平台的一部分进行安装。如果你决定使用这个注册表,可以参考它的文档。
我们把所有写入数据需要用到的schema保存在注册表里,然后在记录里引用schema的标识符。负责读取数据的应用程序使用标识符从注册表里拉取schema来反序列化记录。序列化器和反序列化器分别负责处理schema的注册和拉取。Avro序列化器的使用方法与其他序列化器是一样的。
图3-2:Avro记录的序列化和反序列化流程图
下面的例子演示了如何把生成的Avro对象发送到Kafka(关于如何使用Avro生成代码请参考Avro文档):
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); ➊ props.put("schema.registry.url", schemaUrl); ➋ String topic = "customerContacts"; Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props); ➌ // 不断生成事件,直到有人按下Ctrl+C组合键 while (true) { Customer customer = CustomerGenerator.getNext(); System.out.println("Generated customer " + customer.toString()); ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), cus- tomer); ➍ producer.send(record); ➎ }
➊ 使用Avro的KafkaAvroSerializer来序列化对象。注意,AvroSerializer也可以处理原语,这就是我们以后可以使用字符串作为记录键、使用客户对象作为值的原因。
➋ schema.registry.url是一个新的参数,指向schema的存储位置。
➌ Customer是生成的对象。我们会告诉生产者Customer对象就是记录的值。
➍ 实例化一个ProducerRecord对象,并指定Customer为值的类型,然后再传给它一个Customer对象。
❺ 把Customer对象作为记录发送出去,KafkaAvroSerializer会处理剩下的事情。
如果你选择使用一般的Avro对象而非生成的Avro对象该怎么办?不用担心,这个时候你只需提供schema就可以了:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); ➊ props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", url); ➋ String schemaString = "{\"namespace\": \"customerManagement.avro\", \"type\": \"record\", " + ➌ "\"name\": \"Customer\", " + "\"fields\": [" + "{\"name\": \"id\", \"type\": \"int\"}, " + "{\"name\": \"name\", \"type\": \"string\"}, " + "{\"name\": \"email\", \"type\": [\"null\", \"string \"], \"default\":\"null\" }" + "]}"; Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props); ➍ Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(schemaString); for (int nCustomers = 0; nCustomers < customers; nCustomers++) { String name = "exampleCustomer" + nCustomers; String email = "example" + nCustomers + "@example.com"; GenericRecord customer = new GenericData.Record(schema); ➎ customer.put("id", nCustomers); customer.put("name", name); customer.put("email", email); ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("customerContacts", name, customer); producer.send(data); } }
➊ 仍然使用同样的KafkaAvroSerializer。
➋ 提供同样的schema注册表URI。
➌ 这里需要提供Avro schema,因为我们没有使用Avro生成的对象。
➍ 对象类型是Avro GenericRecord,我们通过schema和需要写入的数据来初始化它。
❺ ProducerRecord的值就是一个GenericRecord对象,它包含了schema和数据。序列化器知道如何从记录里获取schema,把它保存到注册表里,并用它序列化对象数据。