Parquet format
To read Avro records, you will need to add the parquet-avro
dependency:
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.2</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</exclusion>
</exclusions>
</dependency>
This format is compatible with the new Source that can be used in both batch and streaming execution modes. Thus, you can use this format for two kinds of data:
- Bounded data: lists all files and reads them all.
- Unbounded data: monitors a directory for new files that appear.
Vectorized reader
// Parquet rows are decoded in batches
FileSource.forBulkFileFormat(BulkFormat,Path...)
// Monitor the Paths to read data as unbounded data
FileSource.forBulkFileFormat(BulkFormat,Path...)
.monitorContinuously(Duration.ofMillis(5L))
.build();
Avro Parquet reader
// Parquet rows are decoded in batches
FileSource.forRecordStreamFormat(StreamFormat,Path...)
// Monitor the Paths to read data as unbounded data
FileSource.forRecordStreamFormat(StreamFormat,Path...)
.monitorContinuously(Duration.ofMillis(5L))
.build();
Following examples are all configured for bounded data. To configure the File Source for unbounded data, you must additionally call
AbstractFileSource.AbstractFileSourceBuilder.monitorContinuously(Duration)
.
In this example, you will create a DataStream containing Parquet records as Flink RowDatas. The schema is projected to read only the specified fields (“f7”, “f4” and “f99”).
Flink will read records in batches of 500 records. The first boolean parameter specifies that timestamp columns will be interpreted as UTC. The second boolean instructs the application that the projected Parquet fields names are case-sensitive. There is no watermark strategy defined as records do not contain event timestamps.
final LogicalType[] fieldTypes =
new LogicalType[] {
new DoubleType(), new IntType(), new VarCharType()
};
final ParquetColumnarRowInputFormat<FileSourceSplit> format =
new ParquetColumnarRowInputFormat<>(
new Configuration(),
RowType.of(fieldTypes, new String[] {"f7", "f4", "f99"}),
500,
false,
true);
final FileSource<RowData> source =
FileSource.forBulkFileFormat(format, /* Flink Path */)
.build();
final DataStream<RowData> stream =
Avro Records
Avro schemas are defined using JSON. You can get more information about Avro schemas and types from the Avro specification. This example uses an Avro schema example similar to the one described in the :
This schema defines a record representing a user with three fields: name, favoriteNumber, and favoriteColor. You can find more details at record specification for how to define an Avro schema.
In the following example, you will create a DataStream containing Parquet records as Avro Generic records. It will parse the Avro schema based on the JSON string. There are many other ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer to for details. After that, you will create an AvroParquetRecordFormat
via AvroParquetReaders
for Avro Generic records.
// parsing avro schema
final Schema schema =
new Schema.Parser()
.parse(
+ "\"name\": \"User\", "
+ "\"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\" },\n"
+ " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n"
+ " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
+ " ]\n"
+ " }");
final FileSource<GenericRecord> source =
FileSource.forRecordStreamFormat(
AvroParquetReaders.forGenericRecord(schema), /* Flink Path */)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10L);
final DataStream<GenericRecord> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
Based on the previously defined schema, you can generate classes by leveraging Avro code generation. Once the classes have been generated, there is no need to use the schema directly in your programs. You can either use avro-tools.jar
to generate code manually or you could use the Avro Maven plugin to perform code generation on any .avsc files present in the configured source directory. Please refer to Avro Getting Started for more information.
The following example uses the example schema :
[
{"namespace": "org.apache.flink.formats.parquet.generated",
"type": "record",
"name": "Address",
"fields": [
{"name": "num", "type": "int"},
{"name": "street", "type": "string"},
{"name": "city", "type": "string"},
{"name": "state", "type": "string"},
{"name": "zip", "type": "string"}
]
}
]
You will use the Avro Maven plugin to generate the Address
Java class:
@org.apache.avro.specific.AvroGenerated
public class Address extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
// generated code...
}
You will create an AvroParquetRecordFormat
via AvroParquetReaders
for Avro Specific record and then create a DataStream containing Parquet records as Avro Specific records.
final FileSource<GenericRecord> source =
FileSource.forRecordStreamFormat(
AvroParquetReaders.forSpecificRecord(Address.class), /* Flink Path */)
.build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10L);
final DataStream<GenericRecord> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
This example uses a simple Java POJO class Datum:
You will create an AvroParquetRecordFormat
via AvroParquetReaders
for Avro Reflect record and then create a DataStream containing Parquet records as Avro Reflect records.
final FileSource<GenericRecord> source =
FileSource.forRecordStreamFormat(
AvroParquetReaders.forReflectRecord(Datum.class), /* Flink Path */)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10L);
final DataStream<GenericRecord> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
Prerequisite for Parquet files
In order to support reading Avro reflect records, the Parquet file must contain specific meta information. The Avro schema used for creating the Parquet data must contain a namespace
, which will be used by the program to identify the concrete Java class for the reflection process.
The following example shows the User
schema used previously. But this time it contains a namespace pointing to the location(in this case the package), where the User
class for the reflection could be found.
// avro schema with namespace
final String schema =
"{\"type\": \"record\", "
+ "\"name\": \"User\", "
+ "\"namespace\": \"org.apache.flink.formats.parquet.avro\", "
+ "\"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\" },\n"
+ " {\"name\": \"favoriteNumber\", \"type\": [\"int\", \"null\"] },\n"
+ " {\"name\": \"favoriteColor\", \"type\": [\"string\", \"null\"] }\n"
+ " ]\n"
+ " }";
Parquet files created with this schema will contain meta information like:
creator: parquet-mr version 1.12.2 (build 77e30c8093386ec52c3cfa6c34b7ef3321322c94)
extra: parquet.avro.schema =
{"type":"record","name":"User","namespace":"org.apache.flink.formats.parquet.avro","fields":[{"name":"name","type":"string"},{"name":"favoriteNumber","type":["int","null"]},{"name":"favoriteColor","type":["string","null"]}]}
extra: writer.model.name = avro
file schema: org.apache.flink.formats.parquet.avro.User
--------------------------------------------------------------------------------
name: REQUIRED BINARY L:STRING R:0 D:0
favoriteNumber: OPTIONAL INT32 R:0 D:1
favoriteColor: OPTIONAL BINARY L:STRING R:0 D:1
row group 1: RC:3 TS:143 OFFSET:4
--------------------------------------------------------------------------------
name: BINARY UNCOMPRESSED DO:0 FPO:4 SZ:47/47/1.00 VC:3 ENC:PLAIN,BIT_PACKED ST:[min: Jack, max: Tom, num_nulls: 0]
favoriteNumber: INT32 UNCOMPRESSED DO:0 FPO:51 SZ:41/41/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: 1, max: 3, num_nulls: 0]
favoriteColor: BINARY UNCOMPRESSED DO:0 FPO:92 SZ:55/55/1.00 VC:3 ENC:RLE,PLAIN,BIT_PACKED ST:[min: green, max: yellow, num_nulls: 0]
With the User
class defined in the package org.apache.flink.formats.parquet.avro:
public class User {
private String name;
private Integer favoriteNumber;
private String favoriteColor;
public User() {}
public User(String name, Integer favoriteNumber, String favoriteColor) {
this.name = name;
this.favoriteNumber = favoriteNumber;
this.favoriteColor = favoriteColor;
}
public String getName() {
return name;
}
public Integer getFavoriteNumber() {
return favoriteNumber;
}
public String getFavoriteColor() {
return favoriteColor;
}
you can write the following program to read Avro Reflect records of User type from parquet files: