在本文中,我们探讨了如何将 Avro 的内容写入文件、读取内容并将其存储在 JSON 格式的文件中,并使用示例来说明该过程。此外,值得注意的是,架构也可以存储在单独的文件中,而不是包含在数据中。
Apache Avro是一种广泛使用的数据序列化系统,由于其效率和模式演化功能,在大数据应用程序中尤其受欢迎。在本教程中,我们将介绍如何通过 Avro 将对象转换为 JSON,以及如何将整个 Avro 文件转换为 JSON 文件。这对于数据检查和调试特别有用。
在当今数据驱动的世界中,处理不同数据格式的能力至关重要。Apache Avro 通常用于需要高性能和存储效率的系统,例如 Apache Hadoop。
配置
首先,让我们将 Avro 和 JSON 的依赖项添加到我们的pom.xml文件中。
我们为本教程添加了 Apache Avro 1.11.1版本:
<dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.11.1</version> </dependency>
|
将 Avro 对象转换为 JSON
通过 Avro 将 Java 对象转换为 JSON 涉及多个步骤,包括:
- 推断/构建 Avro 模式
- 将 Java 对象转换为 Avro GenericRecord ,最后
- 将对象转换为 JSON
- 我们将利用 Avro 的 Reflect API 从 Java 对象动态推断模式,而不是手动定义模式。
为了证明这一点,让我们创建一个具有两个整数属性x和y 的Point类:
public class Point { private int x; private int y; public Point(int x, int y) { this.x = x; this.y = y; } // Getters and setters }
|
让我们继续推断模式:public Schema inferSchema(Point p) { return ReflectData.get().getSchema(p.getClass()); }
|
我们定义了一个方法inferSchema,并使用ReflectData 类的getSchema 方法从点 对象推断出架构 。该架构描述了字段x和y及其数据类型。接下来,让我们从Point对象创建一个GenericRecord对象并将其转换为 JSON:
public String convertObjectToJson(Point p, Schema schema) { try { ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); GenericRecord genericRecord = new GenericData.Record(schema); genericRecord.put("x", p.getX()); genericRecord.put("y", p.getY()); Encoder encoder = EncoderFactory.get().jsonEncoder(schema, outputStream); datumWriter.write(genericRecord, encoder); encoder.flush(); outputStream.close(); return outputStream.toString(); } catch (Exception e) { throw new RuntimeException(e); } }
|
convertObjectToJson方法使用提供的架构将Point对象转换为 JSON 字符串。首先,根据提供的架构创建一个GenericRecord对象,用Point对象的数据填充它,然后使用DatumWriter通过JsonEncoder对象 将数据传递到ByteArrayOutputStream,最后,在OutputStream对象上使用toString方法获取 JSON 字符串。让我们验证一下生成的 JSON 的内容:
private AvroFileToJsonFile avroFileToJsonFile; private Point p; private String expectedOutput; @BeforeEach public void setup() { avroFileToJsonFile = new AvroFileToJsonFile(); p = new Point(2, 4); expectedOutput = "{\"x\":2,\"y\":4}"; } @Test public void whenConvertedToJson_ThenEquals() { String response = avroFileToJsonFile.convertObjectToJson(p, avroFileToJsonFile.inferSchema(p)); assertEquals(expectedOutput, response); }
|
将 Avro 文件转换为 JSON 文件
将整个 Avro 文件转换为 JSON 文件的过程类似,但需要从文件中读取数据。当我们在磁盘上存储 Avro 格式的数据并需要将其转换为更易于访问的格式(例如 JSON)时,这种情况很常见。
让我们首先定义一个方法writeAvroToFile,它将用于将一些 Avro 数据写入文件:
public void writeAvroToFile(Schema schema, List<Point> records, File writeLocation) { try { if (writeLocation.exists()) { if (!writeLocation.delete()) { System.err.println("Failed to delete existing file."); return; } } GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter); dataFileWriter.create(schema, writeLocation); for (Point record: records) { GenericRecord genericRecord = new GenericData.Record(schema); genericRecord.put("x", record.getX()); genericRecord.put("y", record.getY()); dataFileWriter.append(genericRecord); } dataFileWriter.close(); } catch (IOException e) { e.printStackTrace(); System.out.println("Error writing Avro file."); } }
|
该方法根据提供的Schema将Point对象构造为 GenericRecord 实例,从而将其转换为 Avro 格式。GenericDatumWrite将这些记录序列化,然后使用DataFileWriter将其写入 Avro 文件。让我们验证该文件是否已写入文件并且存在:
private File dataLocation; private File jsonDataLocation; ... @BeforeEach public void setup() { // Load files from the resources folder ClassLoader classLoader = getClass().getClassLoader(); dataLocation = new File(classLoader.getResource("").getFile(), "data.avro"); jsonDataLocation = new File(classLoader.getResource("").getFile(), "data.json"); ... } ... @Test public void whenAvroContentWrittenToFile_ThenExist(){ Schema schema = avroFileToJsonFile.inferSchema(p); avroFileToJsonFile.writeAvroToFile(schema, List.of(p), dataLocation); assertTrue(dataLocation.exists()); }
|
接下来,我们将从存储位置读取文件并将其写回 JSON 格式的另一个文件。让我们创建一个名为readAvroFromFileToJsonFile的方法来处理这个问题:
public void readAvroFromFileToJsonFile(File readLocation, File jsonFilePath) { DatumReader<GenericRecord> reader = new GenericDatumReader<>(); try { DataFileReader<GenericRecord> dataFileReader = new DataFileReader<>(readLocation, reader); DatumWriter<GenericRecord> jsonWriter = new GenericDatumWriter<>(dataFileReader.getSchema()); Schema schema = dataFileReader.getSchema(); OutputStream fos = new FileOutputStream(jsonFilePath); JsonEncoder jsonEncoder = EncoderFactory.get().jsonEncoder(schema, fos); while (dataFileReader.hasNext()) { GenericRecord record = dataFileReader.next(); System.out.println(record.toString()); jsonWriter.write(record, jsonEncoder); jsonEncoder.flush(); } dataFileReader.close(); } catch (IOException e) { throw new RuntimeException(e); } }
|
我们从readLocation读取 Avro 数据并将其作为 JSON 写入jsonFilePath。我们使用DataFileReader从 Avro 文件读取GenericRecord实例,然后使用JsonEncoder和GenericDatumWriter将这些记录序列化为 JSON 格式。我们继续确认写入生成的文件的 JSON 内容:
@Test public void whenAvroFileWrittenToJsonFile_ThenJsonContentEquals() throws IOException { avroFileToJsonFile.readAvroFromFileToJsonFile(dataLocation, jsonDataLocation); String text = Files.readString(jsonDataLocation.toPath()); assertEquals(expectedOutput, text); }
|