Hadoop formats
将以下依赖添加到 pom.xml
中使用 hadoop
如果你想在本地运行你的 Flink 应用(例如在 IDE 中),你需要按照如下所示将 hadoop-client
依赖也添加到 pom.xml
:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.5</version>
<scope>provided</scope>
在 Flink 中使用 Hadoop InputFormats
,必须首先使用 HadoopInputs
工具类的 readHadoopFile
或 createHadoopInput
包装 Input Format。 前者用于从 FileInputFormat
派生的 Input Format,而后者必须用于通用的 Input Format。 生成的 可通过使用 ExecutionEnvironmen#createInput
创建数据源。
下面的示例展示了如何使用 Hadoop 的 TextInputFormat
。
Java
Scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input: DataStream[(LongWritable, Text)] =
env.createInput(HadoopInputs.readHadoopFile(
new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))
// 对数据进行一些处理。
[...]
下面的示例展示了如何使用 Hadoop 的 TextOutputFormat
。
Java
Scala
val hadoopResult: DataStream[(Text, IntWritable)] = [...]
val hadoopOF = new HadoopOutputFormat[Text,IntWritable](
new TextOutputFormat[Text, IntWritable],
new JobConf)
hadoopOF.getJobConf.set("mapred.textoutputformat.separator", " ")
FileOutputFormat.setOutputPath(hadoopOF.getJobConf, new Path(resultPath))
hadoopResult.output(hadoopOF)