有了这个连接器,用户可以

    • 将单个Tsfile文件加载进Hadoop,不论文件是存储在本地文件系统或者是HDFS中
    • 将某个特定目录下的所有文件加载进Hadoop,不论文件是存储在本地文件系统或者是HDFS中
    • 将Hadoop处理完后的结果以Tsfile的格式保存
    TsFile 数据类型Hadoop writable
    BOOLEANBooleanWritable
    INT32IntWritable
    INT64LongWritable
    FLOATFloatWritable
    DOUBLEDoubleWritable
    TEXTText

    TSFInputFormat继承了Hadoop中FileInputFormat类,重写了其中切片的方法。

    目前的切片方法是根据每个ChunkGroup的中点的offset是否属于Hadoop所切片的startOffset和endOffset之间,来判断是否将该ChunkGroup放入此切片。

    TSFInputFormat将tsfile中的数据以多个MapWritable记录的形式返回给用户。

    s1的类型是BOOLEAN, s2的类型是 DOUBLE, s3的类型是TEXT.

    MapWritable的结构如下所示:

    在Hadoop的Map job中,你可以采用如下方法获得你想要的任何值

    mapwritable.get(new Text("s1"))

    读示例: 求和

    1. // configure reading time enable
    2. TSFInputFormat.setReadTime(job, true);
    3. // configure reading deviceId enable
    4. TSFInputFormat.setReadDeviceId(job, true);
    5. // configure reading which deltaObjectIds
    6. String[] deviceIds = {"device_1"};
    7. TSFInputFormat.setReadDeviceIds(job, deltaObjectIds);
    8. // configure reading which measurementIds
    9. String[] measurementIds = {"sensor_1", "sensor_2", "sensor_3"};
    10. TSFInputFormat.setReadMeasurementIds(job, measurementIds);

    然后,必须指定mapper和reducer输出的键和值类型

    接着,就可以编写包含具体的处理数据逻辑的mapperreducer类了。

    1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, DoubleWritable> {
    2. @Override
    3. protected void map(NullWritable key, MapWritable value,
    4. throws IOException, InterruptedException {
    5. Text deltaObjectId = (Text) value.get(new Text("device_id"));
    6. context.write(deltaObjectId, (DoubleWritable) value.get(new Text("sensor_3")));
    7. }
    8. public static class TSReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
    9. @Override
    10. protected void reduce(Text key, Iterable<DoubleWritable> values,
    11. Reducer<Text, DoubleWritable, Text, DoubleWritable>.Context context)
    12. throws IOException, InterruptedException {
    13. double sum = 0;
    14. for (DoubleWritable value : values) {
    15. sum = sum + value.get();
    16. }
    17. context.write(key, new DoubleWritable(sum));
    18. }
    19. }

    除了OutputFormatClass,剩下的配置代码跟上面的读示例是一样的

    然后,是包含具体的处理数据逻辑的mapperreducer类。

    1. public static class TSMapper extends Mapper<NullWritable, MapWritable, Text, MapWritable> {
    2. @Override
    3. protected void map(NullWritable key, MapWritable value,
    4. Mapper<NullWritable, MapWritable, Text, MapWritable>.Context context)
    5. throws IOException, InterruptedException {
    6. Text deltaObjectId = (Text) value.get(new Text("device_id"));
    7. long timestamp = ((LongWritable)value.get(new Text("timestamp"))).get();
    8. context.write(deltaObjectId, new MapWritable(value));
    9. }
    10. }
    11. /**
    12. * This reducer calculate the average value.
    13. */
    14. public static class TSReducer extends Reducer<Text, MapWritable, NullWritable, HDFSTSRecord> {
    15. @Override
    16. protected void reduce(Text key, Iterable<MapWritable> values,
    17. Reducer<Text, MapWritable, NullWritable, HDFSTSRecord>.Context context) throws IOException, InterruptedException {
    18. long sensor1_value_sum = 0;
    19. long sensor2_value_sum = 0;
    20. double sensor3_value_sum = 0;
    21. long num = 0;
    22. for (MapWritable value : values) {
    23. num++;
    24. sensor1_value_sum += ((LongWritable)value.get(new Text("sensor_1"))).get();
    25. sensor2_value_sum += ((LongWritable)value.get(new Text("sensor_2"))).get();
    26. sensor3_value_sum += ((DoubleWritable)value.get(new Text("sensor_3"))).get();
    27. }
    28. HDFSTSRecord tsRecord = new HDFSTSRecord(1L, key.toString());
    29. DataPoint dPoint1 = new LongDataPoint("sensor_1", sensor1_value_sum / num);
    30. DataPoint dPoint2 = new LongDataPoint("sensor_2", sensor2_value_sum / num);
    31. DataPoint dPoint3 = new DoubleDataPoint("sensor_3", sensor3_value_sum / num);
    32. tsRecord.addTuple(dPoint1);
    33. tsRecord.addTuple(dPoint2);
    34. tsRecord.addTuple(dPoint3);
    35. context.write(NullWritable.get(), tsRecord);
    36. }