Hadoop FileSystem 连接器

    注意连接器目前还不是二进制发行版的一部分,添加依赖、打包配置以及集群运行信息请参考 这里

    分桶文件 Sink

    关于分桶的配置我们后面会有讲述,这里先创建一个分桶 sink,默认情况下这个 sink 会将数据写入到按照时间切分的滚动文件中:

    1. input.addSink(new BucketingSink<String>("/base/path"));

    初始化时只需要一个参数,这个参数表示分桶文件存储的路径。分桶 sink 可以通过指定自定义的 bucketer、 writer 和 batch 值进一步配置。

    可以调用BucketingSinksetBucketer() 方法指定自定义的 bucketer,如果需要的话,也可以使用一个元素或者元组属性来决定桶的路径。

    默认的 writer 是 。数据到达时,通过 toString() 方法得到内容,内容以换行符分隔,StringWriter 将数据内容写入部分文件。可以通过 BucketingSinksetWriter() 指定自定义的 writer。SequenceFileWriter 支持写入 HadoopSequenceFiles,并且可以配置是否开启压缩。

    关闭部分文件和打开新部分文件的时机可以通过两个配置来确定:

    • 设置文件滚动周期,单位是毫秒(默认滚动周期是 Long.MAX_VALUE

    示例:

    1. DataStream<Tuple2<IntWritable,Text>> input = ...;
    2. BucketingSink<Tuple2<IntWritable,Text>> sink = new BucketingSink<Tuple2<IntWritable,Text>>("/base/path");
    3. sink.setWriter(new SequenceFileWriter<IntWritable, Text>());
    4. sink.setBatchSize(1024 * 1024 * 400); // this is 400 MB,
    5. sink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins

    上述代码会创建一个 sink,这个 sink 按下面的模式写入桶文件:

    1. /base/path/{date-time}/part-{parallel-task}-{count}

    date-time 是我们从日期/时间格式获得的字符串,parallel-task 是 sink 并发实例的索引,count 是因文件大小或者滚动周期而产生的文件的编号。