Python API 教程

    首先,你可以使用你最熟悉的IDE,创建一个Python项目。然后,你需要安装PyFlink包,请参考构建PyFlink了解详细信息。

    编写Flink Python Table API程序的第一步是创建(或者StreamTableEnvironment,如果你要创建一个流式作业)。这是Python Table API作业的入口类。

    ExecutionEnvironment (或者StreamExecutionEnvironment,如果你要创建一个流式作业)可以用来设置执行参数,比如重启策略,缺省并发值等。

    接下来,我们将介绍如何创建源表和结果表。

    1. t_env.connect(FileSystem().path('/tmp/input')) \
    2. .with_format(OldCsv()
    3. .line_delimiter(' ')
    4. .with_schema(Schema()
    5. .field('word', DataTypes.STRING())) \
    6. t_env.connect(FileSystem().path('/tmp/output')) \
    7. .with_format(OldCsv()
    8. .field('word', DataTypes.STRING())
    9. .field('count', DataTypes.BIGINT())) \
    10. .with_schema(Schema()
    11. .field('word', DataTypes.STRING())
    12. .field('count', DataTypes.BIGINT())) \
    13. .register_table_sink('mySink')

    上面的程序展示了如何创建及在中注册表名分别为mySourcemySink的表。其中,源表mySource有一列: word,该表代表了从输入文件/tmp/input中读取的单词;结果表mySink有两列: word和count,该表会将计算结果输出到文件/tmp/output中,字段之间使用\t作为分隔符。

    接下来,我们介绍如何创建一个作业:该作业读取表mySource中的数据,进行一些变换,然后将结果写入表mySink

      该教程的完整代码如下:

      可以在IDE中或者命令行中运行作业(假设作业名为WordCount.py):

      1. $ python WordCount.py

      上述命令会构建Python Table API程序,并在本地mini cluster中运行。如果想将作业提交到远端集群执行,可以参考。