CSV format

    Flink supports reading CSV files using . The reader utilizes Jackson library and allows passing the corresponding configuration for the CSV schema and parsing options.

    CsvReaderFormat can be initialized and used like this:

    1. CsvReaderFormat<SomePojo> csvFormat = CsvReaderFormat.forPojo(SomePojo.class);
    2. FileSource<SomePojo> source =
    3. FileSource.forRecordStreamFormat(csvFormat, Path.fromLocalFile(...)).build();

    If you need more fine-grained control over the CSV schema or the parsing options, use the more low-level forSchema static factory method of CsvReaderFormat:

    1. //Has to match the exact order of columns in the CSV file
    2. @JsonPropertyOrder({"city","lat","lng","country","iso2",
    3. "adminName","capital","population"})
    4. public static class CityPojo {
    5. public String city;
    6. public BigDecimal lat;
    7. public BigDecimal lng;
    8. public String iso2;
    9. public String capital;
    10. public long population;
    11. }
    12. CsvMapper mapper = new CsvMapper();
    13. CsvSchema schema =
    14. mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');
    15. CsvReaderFormat<CityPojo> csvFormat =
    16. CsvReaderFormat.forSchema(mapper, schema, TypeInformation.of(CityPojo.class));
    17. FileSource<CityPojo> source =
    18. FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();

    The corresponding CSV file:

    It is also possible to read more complex data types using fine-grained Jackson settings:

    1. private long id;
    2. private int[] array;
    3. }
    4. CsvReaderFormat<ComplexPojo> csvFormat =
    5. CsvReaderFormat.forSchema(
    6. new CsvMapper(),
    7. CsvSchema.builder()
    8. .addColumn(
    9. new CsvSchema.Column(0, "id", CsvSchema.ColumnType.NUMBER))
    10. .addColumn(
    11. new CsvSchema.Column(4, "array", CsvSchema.ColumnType.ARRAY)
    12. .withArrayElementSeparator("#"))
    13. TypeInformation.of(ComplexPojo.class));

    Similarly to the TextLineInputFormat, can be used in both continues and batch modes (see TextLineInputFormat for examples).