从 POJO 转换为 Avro Record

在本文中,我们探讨了在 Java 中将 POJO 转换为 Avro 记录的不同方法。我们从一种简单的方法开始,这种方法虽然简单,但在可维护性和灵活性方面存在缺点。接下来,我们分析了一种使用 Java 反射的解决方案。这种方法更强大,更容易适应类结构的变化。但是,对于较大的对象或频繁调用,它存在性能问题。

最后,我们想出了一个使用 Avro 的ReflectDatumWriter类的解决方案。这个类适合这个特定目的,是最适合我们需求的选择。此外,它受益于 Avro 的内部优化,建议用于复杂场景。


在 Java 应用程序中使用 Apache Avro时,我们经常需要将普通旧式 Java 对象(POJO) 转换为其 Avro 等效项。虽然通过单独设置每个字段手动执行此操作是完全可以接受的,但使用泛型执行此转换是一种更好且更易于维护的方法。

在本文中,我们将探讨如何将 POJO 转换为 Avro 对象。我们将以一种稳健的方式处理对原始 Java 类结构所做的更改。

1、直接的方法
假设我们有一段包含 POJO 的代码,我们想要将其转换为 Avro 对象。

让我们看看我们的 POJO:

public class Pojo {
    private final Map<String, String> aMap;
    private final long uid;
    private final long localDateTime;
    public Pojo() {
        aMap = new HashMap<>();
        uid = ThreadLocalRandom.current().nextLong();
        localDateTime = LocalDateTime.now().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
        aMap.put("mapKey", "mapValue");
    }
   
//getters
}

然后,我们有一个使用特定方法进行映射的类:

public static Record mapPojoToRecordStraightForward(Pojo pojo){
    Schema schema = ReflectData.get().getSchema(pojo.getClass());
    GenericData.Record avroRecord = new GenericData.Record(schema);
    avroRecord.put("uid", pojo.getUid());
    avroRecord.put(
"localDateTime", pojo.getLocalDateTime());
    avroRecord.put(
"aMap", pojo.getaMap());
    return avroRecord;
}

我们可以看到,最直接的方法就是显式设置每个字段。只要看一下这个解决方案,我们就能知道将来可能出现的问题。这个解决方案很脆弱,只要 POJO 结构发生变化,就需要更新。这不是最好的解决方案。

请注意,我们可以从 POJO 本身以外的来源提取模式;例如,我们也可以通过模式版本来查找它。

2. 使用反射进行泛型转换
另一种方法是使用 Java Reflection。此方法使用反射并遍历 POJO 中的每个字段。接下来,它设置 Avro Record 中的每个字段。

它看起来是这样的:

public static Record mapPojoToRecordReflection(Pojo pojo) throws IllegalAccessException {
    Class<?> pojoClass = pojo.getClass();
    Schema schema = ReflectData.get().getSchema(pojoClass);
    GenericData.Record avroRecord = new GenericData.Record(schema);
    for (Field field : pojoClass.getDeclaredFields()) {
        field.setAccessible(true);
        avroRecord.put(field.getName(), field.get(pojo));
    }

之后,它遍历每个超类并在记录中设置这些字段:

    // Handle superclass fields
    Class<?> superClass = pojoClass.getSuperclass();
    while (superClass != null && superClass != Object.class) {
        for (Field field : superClass.getDeclaredFields()) {
            field.setAccessible(true);
            avroRecord.put(field.getName(), field.get(pojo));
        }
        superClass = superClass.getSuperclass();
    }
    return avroRecord;
}

最重要的是,这种方法很简单,但对于较大的对象或频繁调用时速度会较慢。

3.使用 Avro 的ReflectDatumWriter类
Avro 有一个针对此场景的内置功能,即ReflectDatumWriter类。首先,我们从 POJO 类生成一个 Avro 模式。接下来,我们创建一个ReflectDatumWriter来序列化 POJO。然后,我们设置一个ByteArrayOutputStream 和BinaryEncoder进行写入:

public static GenericData.Record mapPojoToRecordReflectDatumWriter(Object pojo) throws IOException {
    Schema schema = ReflectData.get().getSchema(pojo.getClass());
    ReflectDatumWriter<Object> writer = new ReflectDatumWriter<>(schema);
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);

接下来我们将 POJO 序列化为二进制格式:

    writer.write(pojo, encoder);
    encoder.flush();

最后,我们创建一个BinaryDecoder来读取序列化的数据,并使用GenericDatumReader将二进制数据反序列化为GenericData.Record:

    BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(out.toByteArray(), null);
    GenericDatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
    return reader.read(null, decoder);
}

此方法使用 Avro 的序列化和反序列化功能将 POJO 转换为 Avro 记录。请注意,此转换版本对于复杂对象更有效,但对于简单对象则会增加复杂性。