使用此连接器,您可以

    • 从本地文件系统或 hdfs 加载单个或多个 TsFile (只支持以DataSet的形式)到 Flink 。
    • 将本地文件系统或 hdfs 中特定目录中的所有文件加载到 Flink 中。

    快速开始

    TsFileInputFormat 示例

    1. 使用默认的 RowRowRecordParser 创建 TsFileInputFormat 。
    1. 从输入格式读取数据并打印到标准输出 stdout:
    1. inputFormat.setFilePath("source.tsfile");
    2. DataStream<Row> source = senv.createInput(inputFormat);
    3. DataStream<String> rowString = source.map(Row::toString);
    4. Iterator<String> result = DataStreamUtils.collect(rowString);
    5. while (result.hasNext()) {
    6. System.out.println(result.next());
    7. }

    DataSet:

    TSRecordOutputFormat 示例

    1. 使用默认的 RowTSRecordConverter 创建 TSRecordOutputFormat 。
    1. String[] filedNames = {
    2. QueryConstant.RESERVED_TIME,
    3. "device_1.sensor_1",
    4. "device_1.sensor_3",
    5. "device_2.sensor_1",
    6. "device_2.sensor_2",
    7. "device_2.sensor_3"
    8. };
    9. Types.LONG,
    10. Types.LONG,
    11. Types.LONG,
    12. Types.LONG,
    13. Types.LONG,
    14. Types.LONG,
    15. Types.LONG
    16. };
    17. RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations, filedNames);
    18. Schema schema = new Schema();
    19. schema.extendTemplate("template", new MeasurementSchema("sensor_1", TSDataType.INT64, TSEncoding.TS_2DIFF));
    20. schema.extendTemplate("template", new MeasurementSchema("sensor_2", TSDataType.INT64, TSEncoding.TS_2DIFF));
    21. schema.extendTemplate("template", new MeasurementSchema("sensor_3", TSDataType.INT64, TSEncoding.TS_2DIFF));
    22. RowTSRecordConverter converter = new RowTSRecordConverter(rowTypeInfo);
    1. 通过输出格式写数据:

    DataSet:

    1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    2. List<Tuple7> data = new ArrayList<>(7);
    3. data.add(new Tuple7(1L, 2L, 3L, 4L, 5L, 6L, 7L));
    4. data.add(new Tuple7(2L, 3L, 4L, 5L, 6L, 7L, 8L));
    5. data.add(new Tuple7(3L, 4L, 5L, 6L, 7L, 8L, 9L));
    6. data.add(new Tuple7(4L, 5L, 6L, 7L, 8L, 9L, 10L));
    7. data.add(new Tuple7(6L, 6L, 7L, 8L, 9L, 10L, 11L));
    8. data.add(new Tuple7(7L, 7L, 8L, 9L, 10L, 11L, 12L));
    9. data.add(new Tuple7(8L, 8L, 9L, 10L, 11L, 12L, 13L));
    10. DataSet<Tuple7> source = env.fromCollection(
    11. data, Types.TUPLE(Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG, Types.LONG));
    12. source.map(t -> {
    13. Row row = new Row(7);
    14. for (int i = 0; i < 7; i++) {
    15. row.setField(i, t.getField(i));
    16. }
    17. return row;