Deserializers
前言
Custom deserializers
使用Avro反序列化与Kafka消费者
参考资料
前言
Kafka生产者需要序列化程序将对象转换为字节数组,然后发送到Kafka。 同样,Kafka消费者需要使用反序列化器将从Kafka收到的字节数组转换为Java对象。 在前面的示例中,我们假设每个消息的键和值都是字符串,我们在消费者配置中使用了默认的StringDeserializer。
在第3章关于Kafka生产者的过程中,我们了解了如何自定义序列化类型以及如何使用Avro和AvroSerializer根据模式定义生成Avro对象,然后在向Kafka生成消息时对其进行序列化。 我们现在将介绍如何为自己的对象创建自定义反序列化器以及如何使用Avro及其反序列化器。
很明显,用于向Kafka生成事件的序列化程序必须与消耗事件时将使用的反序列化程序匹配。
假如我们使用IntSerializer进行序列化,然后使用StringDeserializer进行反序列化,这很有可能出现意想不到的结果。
这意味着作为开发人员,你需要跟踪用于写入每个主题的序列化程序,并确保每个主题仅包含你使用的反序列化程序可以解析的数据。
这是使用Avro和Schema Repository进行序列化和反序列化的好处之一 —— AvroSerializer可以确保写入特定主题的所有数据都与主题的模式兼容,这意味着它可以通过匹配的反序列化器和模式进行反序列化 。 生产者或消费者方面的兼容性错误将通过适当的错误消息轻松捕获,这意味着我们不需要尝试调试字节数组以查找序列化错误。
我们将首先快速展示如何编写自定义反序列化器,即使这是不太常用的方法,然后我们将继续讨论如何使用Avro反序列化消息键和值的示例。
Custom deserializers
让我们采用第3章中序列化的相同自定义对象,并为其编写反序列化器:
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
} }
自定义反序列化器将如下所示:
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerDeserializer implements
Deserializer<Customer> { //[1]
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null)
return null;
if (data.length < 8)
throw new SerializationException("Size of data received by
IntegerDeserializer is shorter than expected");
ByteBuffer buffer = ByteBuffer.wrap(data);
id = buffer.getInt();
String nameSize = buffer.getInt();
byte[] nameBytes = new Array[Byte](nameSize);
buffer.get(nameBytes);
name = new String(nameBytes, 'UTF-8');
return new Customer(id, name); //[2]
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer
to byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
} }
[1]: 消费者还需要Customer类的实现,并且类和序列化器都需要匹配生产和消费应用程序。 在一个拥有许多消费者和生产者共享数据访问权限的大型组织中,这可能变得具有挑战性。
[2]我们只是在这里颠倒串行器的逻辑 - 我们从字节数组中获取客户ID和名称,并使用它们构造我们需要的对象。
使用此序列化程序的使用者代码与此示例类似:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.CustomerDeserializer");
KafkaConsumer<String, Customer> consumer =
new KafkaConsumer<>(props);
consumer.subscribe("customerCountries")
while (true) {
ConsumerRecords<String, Customer> records =
consumer.poll(100);
for (ConsumerRecord<String, Customer> record : records)
{
System.out.println("current customer Id: " +
record.value().getId() + " and
current customer name: " + record.value().getName());
} }
同样,重要的是要注意不建议实现自定义序列化器和反序列化器。 它紧密地耦合了生产者和消费者,并且易碎且容易出错。 更好的解决方案是使用标准消息格式,如JSON,Thrift,Protobuf或Avro。 我们现在将看到如何将Avro反序列化器与Kafka消费者一起使用。
使用Avro反序列化与Kafka消费者
假设我们正在使用第3章中显示的Avro中Customer类的实现。为了从Kafka中使用这些对象,你希望实现类似于此的消费应用程序:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer"); //[1]
props.put("schema.registry.url", schemaUrl); //[2]
String topic = "customerContacts"
KafkaConsumer consumer = new
KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);
while (true) {
ConsumerRecords<String, Customer> records =
consumer.poll(1000); //[3]
for (ConsumerRecord<String, Customer> record: records) {
System.out.println("Current customer name is: " +
record.value().getName()); //[4]
}
consumer.commitSync();
}
[1] 我们使用KafkaAvroDeserializer来反序列化Avro消息。
[2] schema.registry.url是一个新参数。 这只是指向我们存储模式的位置。 这样,消费者可以使用生产者注册的模式来反序列化消息。
[3] 我们将生成的类Customer指定为记录值的类型。
[4] record.value()是一个Customer实例,我们可以相应地使用它。
|
|