Hadoop 兼容 Beta

    You can:

    • use Hadoop’s data types in Flink programs.
    • use any Hadoop InputFormat as a .
    • use any Hadoop OutputFormat as a DataSink.
    • use a Hadoop Mapper as .
    • use a Hadoop Reducer as GroupReduceFunction.

    This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the guide for reading from Hadoop supported file systems.

    Support for Hadoop input/output formats is part of the flink-java andflink-scala Maven modules that are always required when writing Flink jobs.The code is located in org.apache.flink.api.java.hadoop andorg.apache.flink.api.scala.hadoop in an additional sub-package for themapred and mapreduce API.

    Support for Hadoop Mappers and Reducers is contained in the flink-hadoop-compatibilityMaven module.This code resides in the org.apache.flink.hadoopcompatibilitypackage.

    Add the following dependency to your pom.xml if you want to reuse Mappersand Reducers.

    Using Hadoop Data Types

    To use Hadoop InputFormats with Flink the format must first be wrappedusing either readHadoopFile or createHadoopInput of theHadoopInputs utility class.The former is used for input formats derivedfrom FileInputFormat while the latter has to be used for general purposeinput formats.The resulting InputFormat can be used to create a data source by usingExecutionEnvironmen#createInput.

    The resulting DataSet contains 2-tuples where the first fieldis the key and the second field is the value retrieved from the HadoopInputFormat.

    The following example shows how to use Hadoop’s TextInputFormat.

    1. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    2. env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
    3. LongWritable.class, Text.class, textPath));
    4. // Do something with the data.
    5. [...]

    Using Hadoop OutputFormats

    Flink provides a compatibility wrapper for Hadoop OutputFormats. Any classthat implements org.apache.hadoop.mapred.OutputFormat or extendsorg.apache.hadoop.mapreduce.OutputFormat is supported.The OutputFormat wrapper expects its input data to be a DataSet containing2-tuples of key and value. These are to be processed by the Hadoop OutputFormat.

    The following example shows how to use Hadoop’s TextOutputFormat.

    1. // Obtain the result we want to emit
    2. DataSet<Tuple2<Text, IntWritable>> hadoopResult = [...]
    3. // Set up the Hadoop TextOutputFormat.
    4. HadoopOutputFormat<Text, IntWritable> hadoopOF =
    5. // create the Flink wrapper.
    6. new HadoopOutputFormat<Text, IntWritable>(
    7. // set the Hadoop OutputFormat and specify the job.
    8. new TextOutputFormat<Text, IntWritable>(), job
    9. );
    10. hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");
    11. TextOutputFormat.setOutputPath(job, new Path(outputPath));
    12. // Emit data using the Hadoop TextOutputFormat.
    13. hadoopResult.output(hadoopOF);

    The wrappers take a DataSet<Tuple2<KEYIN,VALUEIN>> as input and produce a DataSet<Tuple2<KEYOUT,VALUEOUT>> as output where KEYIN and KEYOUT are the keys and VALUEIN and VALUEOUT are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction) and without a Combiner (HadoopReduceFunction). The wrappers accept an optional JobConf object to configure the Hadoop Mapper or Reducer.

    Flink’s function wrappers are

    • org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,
    • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, and
    • org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction.

    and can be used as regular Flink FlatMapFunctions or .

    The following example shows how to use Hadoop Mapper and Reducer functions.

    1. // Obtain data to process somehow.
    2. DataSet<Tuple2<LongWritable, Text>> text = [...]
    3. DataSet<Tuple2<Text, LongWritable>> result = text
    4. // use Hadoop Mapper (Tokenizer) as MapFunction
    5. .flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(
    6. new Tokenizer()
    7. ))
    8. .groupBy(0)
    9. // use Hadoop Reducer (Counter) as Reduce- and CombineFunction
    10. .reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(
    11. new Counter(), new Counter()

    Please note: The Reducer wrapper works on groups as defined by Flink’s groupBy() operation. It does not consider any custom partitioners, sort or grouping comparators you might have set in the JobConf.

    Complete Hadoop WordCount Example