使用 Java 将 Avro 文件转换为 JSON 文件

在本文中,我们探讨了如何将 Avro 的内容写入文件、读取内容并将其存储在 JSON 格式的文件中,并使用示例来说明该过程。此外,值得注意的是,架构也可以存储在单独的文件中,而不是包含在数据中。

Apache Avro是一种广泛使用的数据序列化系统,由于其效率和模式演化功能,在大数据应用程序中尤其受欢迎。在本教程中,我们将介绍如何通过 Avro 将对象转换为 JSON,以及如何将整个 Avro 文件转换为 JSON 文件。这对于数据检查和调试特别有用。

在当今数据驱动的世界中,处理不同数据格式的能力至关重要。Apache Avro 通常用于需要高性能和存储效率的系统,例如 Apache Hadoop。

配置
首先,让我们将 Avro 和 JSON 的依赖项添加到我们的pom.xml文件中。

我们为本教程添加了 Apache Avro 1.11.1版本:

<dependency>
    <groupId>org.apache.avro</groupId>
    <artifactId>avro</artifactId>
    <version>1.11.1</version>
</dependency>

将 Avro 对象转换为 JSON
通过 Avro 将 Java 对象转换为 JSON 涉及多个步骤,包括:

  • 推断/构建 Avro 模式
  • 将 Java 对象转换为 Avro  GenericRecord ,最后
  • 将对象转换为 JSON
  • 我们将利用 Avro 的 Reflect API 从 Java 对象动态推断模式,而不是手动定义模式。

为了证明这一点,让我们创建一个具有两个整数属性x和y 的Point类:

public class Point {
    private int x;
    private int y;
    public Point(int x, int y) {
        this.x = x;
        this.y = y;
    }
    // Getters and setters
}

让我们继续推断模式:

public Schema inferSchema(Point p) {
    return ReflectData.get().getSchema(p.getClass());
}

我们定义了一个方法inferSchema,并使用ReflectData 类的getSchema 方法从点 对象推断出架构 。该架构描述了字段x和y及其数据类型。

接下来,让我们从Point对象创建一个GenericRecord对象并将其转换为 JSON:

public String convertObjectToJson(Point p, Schema schema) {
    try {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        GenericRecord genericRecord = new GenericData.Record(schema);
        genericRecord.put("x", p.getX());
        genericRecord.put(
"y", p.getY());
        Encoder encoder = EncoderFactory.get().jsonEncoder(schema, outputStream);
        datumWriter.write(genericRecord, encoder);
        encoder.flush();
        outputStream.close();
        return outputStream.toString();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

convertObjectToJson方法使用提供的架构将Point对象转换为 JSON 字符串。首先,根据提供的架构创建一个GenericRecord对象,用Point对象的数据填充它,然后使用DatumWriter通过JsonEncoder对象 将数据传递到ByteArrayOutputStream,最后,在OutputStream对象上使用toString方法获取 JSON 字符串。

让我们验证一下生成的 JSON 的内容:

private AvroFileToJsonFile avroFileToJsonFile;
private Point p;
private String expectedOutput;
@BeforeEach
public void setup() {
    avroFileToJsonFile = new AvroFileToJsonFile();
    p = new Point(2, 4);
    expectedOutput = "{\"x\":2,\"y\":4}";
}
@Test
public void whenConvertedToJson_ThenEquals() {
    String response = avroFileToJsonFile.convertObjectToJson(p, avroFileToJsonFile.inferSchema(p));
    assertEquals(expectedOutput, response);
}

将 Avro 文件转换为 JSON 文件
将整个 Avro 文件转换为 JSON 文件的过程类似,但需要从文件中读取数据。当我们在磁盘上存储 Avro 格式的数据并需要将其转换为更易于访问的格式(例如 JSON)时,这种情况很常见。

让我们首先定义一个方法writeAvroToFile,它将用于将一些 Avro 数据写入文件:

public void writeAvroToFile(Schema schema, List<Point> records, File writeLocation) {
    try {
        if (writeLocation.exists()) {
            if (!writeLocation.delete()) {
                System.err.println("Failed to delete existing file.");
                return;
            }
        }
        GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
        DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
        dataFileWriter.create(schema, writeLocation);
        for (Point record: records) {
            GenericRecord genericRecord = new GenericData.Record(schema);
            genericRecord.put(
"x", record.getX());
            genericRecord.put(
"y", record.getY());
            dataFileWriter.append(genericRecord);
        }
        dataFileWriter.close();
    } catch (IOException e) {
        e.printStackTrace();
        System.out.println(
"Error writing Avro file.");
    }
}

该方法根据提供的Schema将Point对象构造为 GenericRecord 实例,从而将其转换为 Avro 格式。GenericDatumWrite将这些记录序列化,然后使用DataFileWriter将其写入 Avro 文件。

让我们验证该文件是否已写入文件并且存在:

private File dataLocation;
private File jsonDataLocation;
...
@BeforeEach
public void setup() {
    // Load files from the resources folder
    ClassLoader classLoader = getClass().getClassLoader();
    dataLocation = new File(classLoader.getResource(
"").getFile(), "data.avro");
    jsonDataLocation = new File(classLoader.getResource(
"").getFile(), "data.json");
    ...
}
...
@Test
public void whenAvroContentWrittenToFile_ThenExist(){
    Schema schema = avroFileToJsonFile.inferSchema(p);
    avroFileToJsonFile.writeAvroToFile(schema, List.of(p), dataLocation);
    assertTrue(dataLocation.exists());
}

接下来,我们将从存储位置读取文件并将其写回 JSON 格式的另一个文件。

让我们创建一个名为readAvroFromFileToJsonFile的方法来处理这个问题:

public void readAvroFromFileToJsonFile(File readLocation, File jsonFilePath) {
    DatumReader<GenericRecord> reader = new GenericDatumReader<>();
    try {
        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(readLocation, reader);
        DatumWriter<GenericRecord> jsonWriter = new GenericDatumWriter<>(dataFileReader.getSchema());
        Schema schema = dataFileReader.getSchema();
        OutputStream fos = new FileOutputStream(jsonFilePath);
        JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, fos);
        while (dataFileReader.hasNext()) {
            GenericRecord record = dataFileReader.next();
            System.out.println(record.toString());
            jsonWriter.write(record, jsonEncoder);
            jsonEncoder.flush();
        }
        dataFileReader.close();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

我们从readLocation读取 Avro 数据并将其作为 JSON 写入jsonFilePath。我们使用DataFileReader从 Avro 文件读取GenericRecord实例,然后使用JsonEncoder和GenericDatumWriter将这些记录序列化为 JSON 格式。

我们继续确认写入生成的文件的 JSON 内容:

@Test
public void whenAvroFileWrittenToJsonFile_ThenJsonContentEquals() throws IOException {
    avroFileToJsonFile.readAvroFromFileToJsonFile(dataLocation, jsonDataLocation);
    String text = Files.readString(jsonDataLocation.toPath());
    assertEquals(expectedOutput, text);
}