CSV format
Flink supports reading CSV files using . The reader utilizes Jackson library and allows passing the corresponding configuration for the CSV schema and parsing options.
CsvReaderFormat
can be initialized and used like this:
CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
FileSource<SomePojo> source =
FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();
If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level forSchema
static factory method of CsvReaderFormat
:
//Has to match the exact order of columns in the CSV file
@JsonPropertyOrder({"city","lat","lng","country","iso2",
"adminName","capital","population"})
public static class CityPojo {
public String city;
public BigDecimal lat;
public BigDecimal lng;
public String iso2;
public String capital;
public long population;
}
CsvMapper mapper = new CsvMapper();
CsvSchema schema =
mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
CsvReaderFormat<CityPojo> csvFormat =
CsvReaderFormat.forSchema(mapper, schema, TypeInformation.of(CityPojo.class));
FileSource<CityPojo> source =
FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();
The corresponding CSV file:
It is also possible to read more complex data types using fine-grained Jackson
settings:
private long id;
private int[] array;
}
CsvReaderFormat<ComplexPojo> csvFormat =
CsvReaderFormat.forSchema(
new CsvMapper(),
CsvSchema.builder()
.addColumn(
new CsvSchema.Column(0, "id", CsvSchema.ColumnType.NUMBER))
.addColumn(
new CsvSchema.Column(4, "array", CsvSchema.ColumnType.ARRAY)
.withArrayElementSeparator("#"))
TypeInformation.of(ComplexPojo.class));
Similarly to the TextLineInputFormat
, can be used in both continues and batch modes (see TextLineInputFormat for examples).
下一篇:Pulsar