3.3 发送消息到Kafka
最简单的消息发送方式如下所示。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); ➊ try { producer.send(record); ➋ } catch (Exception e) { e.printStackTrace(); ➌ }
➊ 生产者的send()方法将ProducerRecord对象作为参数,所以我们要先创建一个ProducerRecord对象。ProducerRecord有多个构造函数,稍后我们会详细讨论。这里使用其中一个构造函数,它需要目标主题的名字和要发送的键和值对象,它们都是字符串。键和值对象的类型必须与序列化器和生产者对象相匹配。
➋ 我们使用生产者的send()方法发送ProducerRecord对象。从生产者的架构图里可以看到,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。send()方法会返回一个包含RecordMetadata的Future对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。比如,记录Twitter消息日志,或记录不太重要的应用程序日志。
➌ 我们可以忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消息之前,生产者还是有可能发生其他的异常。这些异常有可能是SerializationException(说明序列化消息失败)、BufferExhaustedException或TimeoutException(说明缓冲区已满),又或者是InterruptException(说明发送线程被中断)。
3.3.1 同步发送消息
最简单的同步发送消息方式如下所示。
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { producer.send(record).get(); ➊ } catch (Exception e) { e.printStackTrace(); ➋ }
➊ 在这里,producer.send()方法先返回一个Future对象,然后调用Future对象的get()方法等待Kafka响应。如果服务器返回错误,get()方法会抛出异常。如果没有发生错误,我们会得到一个RecordMetadata对象,可以用它获取消息的偏移量。
➋ 如果在发送数据之前或者在发送过程中发生了任何错误,比如broker返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。我们只是简单地把异常信息打印出来。
KafkaProducer一般会发生两类错误。其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProducer可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer不会进行任何重试,直接抛出异常。
3.3.2 异步发送消息
假设消息在应用程序和Kafka集群之间一个来回需要10ms。如果在发送完每个消息后都等待回应,那么发送100个消息需要1秒。但如果只发送消息而不等待响应,那么发送100个消息所需要的时间会少很多。大多数时候,我们并不需要等待响应——尽管Kafka会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必
需的。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。
为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。下面是使用回调的一个例子。
private class DemoProducerCallback implements Callback {➊ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e ! = null) { e.printStackTrace(); ➋ } } } ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); ➌ producer.send(record, new DemoProducerCallback()); ➍
➊ 为了使用回调,需要一个实现了org.apache.kafka.clients.producer.Callback接口的类,这个接口只有一个onCompletion方法。
➋ 如果Kafka返回一个错误,onCompletion方法会抛出一个非空(non null)异常。这里我们只是简单地把它打印出来,但是在生产环境应该有更好的处理方式。
➌ 记录与之前的一样。
➍ 在发送消息时传进去一个回调对象。