Hadoop formats

    将以下依赖添加到 pom.xml 中使用 hadoop

    如果你想在本地运行你的 Flink 应用(例如在 IDE 中),你需要按照如下所示将 hadoop-client 依赖也添加到 pom.xml

    1. <dependency>
    2. <groupId>org.apache.hadoop</groupId>
    3. <artifactId>hadoop-client</artifactId>
    4. <version>2.8.5</version>
    5. <scope>provided</scope>

    在 Flink 中使用 Hadoop InputFormats,必须首先使用 HadoopInputs 工具类的 readHadoopFilecreateHadoopInput 包装 Input Format。 前者用于从 FileInputFormat 派生的 Input Format,而后者必须用于通用的 Input Format。 生成的 可通过使用 ExecutionEnvironmen#createInput 创建数据源。

    下面的示例展示了如何使用 Hadoop 的 TextInputFormat

    Java

    Scala

    1. val env = StreamExecutionEnvironment.getExecutionEnvironment
    2. val input: DataStream[(LongWritable, Text)] =
    3. env.createInput(HadoopInputs.readHadoopFile(
    4. new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
    5. // 对数据进行一些处理。
    6. [...]

    下面的示例展示了如何使用 Hadoop 的 TextOutputFormat

    Java

    Scala

    1. val hadoopResult: DataStream[(Text, IntWritable)] = [...]
    2. val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
    3. new TextOutputFormat[Text, IntWritable],
    4. new JobConf)
    5. hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
    6. FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
    7. hadoopResult.output(hadoopOF)