Kafka权威指南
上QQ阅读APP看书,第一时间看更新

3.6 分区

在之前的例子里,ProducerRecord对象包含了目标主题、键和值。Kafka的消息是一个个键值对,ProducerRecord对象可以只包含目标主题和值,键可以设置为默认的null,不过大多数应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。也就是说,如果一个进程只从一个主题的分区读取数据(第4章会介绍更多细节),那么具有相同键的所有记录都会被该进程读取。要创建一个包含键值的记录,只需像下面这样创建ProducerRecord对象:

        ProducerRecord<Integer, String> record =
              new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");

如果要创建键为null的消息,不指定键就可以了:

        ProducerRecord<Integer, String> record =
              new ProducerRecord<>("CustomerCountry", "USA"); ➊

➊ 这里的键被设为null。

如果键值为null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。

如果键不为空,并且使用了默认的分区器,那么Kafka会对键进行散列(使用Kafka自己的散列算法,即使升级Java版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。我们将在第6章讨论Kafka的复制功能和可用性。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在分区数量保持不变的情况下,可以保证用户045189的记录总是被写到分区34。在从分区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证了——旧数据仍然留在分区34,但新的记录可能被写到其他分区上。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好(第2章介绍了如何确定合适的分区数量),而且永远不要增加新分区。

实现自定义分区策略

我们已经讨论了默认分区器的特点,它是使用次数最多的分区器。不过,除了散列分区之外,有时候也需要对数据进行不一样的分区。假设你是一个B2B供应商,你有一个大客户,它是手持设备Banana的制造商。Banana占据了你整体业务10%的份额。如果使用默认的散列分区算法,Banana的账号记录将和其他账号记录一起被分配给相同的分区,导致这个分区比其他分区要大一些。服务器可能因此出现存储空间不足、处理缓慢等问题。我们需要给Banana分配单独的分区,然后使用散列分区算法处理其他账号。

下面是一个自定义分区器的例子:

        import org.apache.kafka.clients.producer.Partitioner;
        import org.apache.kafka.common.Cluster;
        import org.apache.kafka.common.PartitionInfo;
        import org.apache.kafka.common.record.InvalidRecordException;
        import org.apache.kafka.common.utils.Utils;

        public class BananaPartitioner implements Partitioner {

              public void configure(Map<String, ? > configs) {} ➊

              public int partition(String topic, Object key, byte[] keyBytes,
                                      Object value, byte[] valueBytes,
                                        Cluster cluster) {
                      List<PartitionInfo> partitions =
                        cluster.partitionsForTopic(topic);
              int numPartitions = partitions.size();

              if ((keyBytes == null) || (! (key instanceOf String))) ➋
                throw new InvalidRecordException("We expect all messages
                  to have customer name as key")

              if (((String) key).equals("Banana"))
                return numPartitions; // Banana总是被分配到最后一个分区

              // 其他记录被散列到其他分区
              return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions -1))
              }

              public void close() {}
        }

➊ Partitioner接口包含了configure、partition和close这3个方法。这里我们只实现partition方法,不过我们真不应该在partition方法里硬编码客户的名字,而应该通过configure方法传进来。

➋ 我们只接受字符串作为键,如果不是字符串,就抛出异常。