Python REPL

    为了使用Flink的Python Shell,你只需要在Flink的binary目录下执行:

    关于如何在一个Cluster集群上运行Python shell,可以参考启动章节介绍。

    下面是一个通过Python Shell 运行的简单示例:

    1. >>> import os
    2. >>> import shutil
    3. >>> sink_path = tempfile.gettempdir() + '/streaming.csv'
    4. >>> if os.path.exists(sink_path):
    5. ... if os.path.isfile(sink_path):
    6. ... os.remove(sink_path)
    7. ... else:
    8. ... shutil.rmtree(sink_path)
    9. >>> s_env.set_parallelism(1)
    10. >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
    11. >>> st_env.connect(FileSystem().path(sink_path))\
    12. ... .with_format(OldCsv()
    13. ... .field_delimiter(',')
    14. ... .field("a", DataTypes.BIGINT())
    15. ... .field("b", DataTypes.STRING())
    16. ... .field("c", DataTypes.STRING()))\
    17. ... .with_schema(Schema()
    18. ... .field("a", DataTypes.BIGINT())
    19. ... .field("b", DataTypes.STRING())
    20. ... .field("c", DataTypes.STRING()))\
    21. ... .register_table_sink("stream_sink")
    22. >>> t.select("a + 1, b, c")\
    23. ... .insert_into("stream_sink")
    24. >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
    25. >>> with open(sink_path, 'r') as f:
    26. ... print(f.read())
    1. >>> import tempfile
    2. >>> import os
    3. >>> import shutil
    4. >>> if os.path.exists(sink_path):
    5. ... if os.path.isfile(sink_path):
    6. ... os.remove(sink_path)
    7. ... else:
    8. ... shutil.rmtree(sink_path)
    9. >>> b_env.set_parallelism(1)
    10. >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
    11. >>> bt_env.connect(FileSystem().path(sink_path))\
    12. ... .with_format(OldCsv()
    13. ... .field_delimiter(',')
    14. ... .field("a", DataTypes.BIGINT())
    15. ... .field("b", DataTypes.STRING())
    16. ... .field("c", DataTypes.STRING()))\
    17. ... .with_schema(Schema()
    18. ... .field("a", DataTypes.BIGINT())
    19. ... .field("b", DataTypes.STRING())
    20. ... .field("c", DataTypes.STRING()))\
    21. ... .register_table_sink("batch_sink")
    22. >>> t.select("a + 1, b, c")\
    23. ... .insert_into("batch_sink")
    24. >>> bt_env.execute("batch_job")
    25. >>> # 如果作业运行在local模式, 你可以执行以下代码查看结果:
    26. >>> with open(sink_path, 'r') as f:
    27. ... print(f.read())

    查看Python Shell提供的可选参数,可以使用:

    1. bin/pyflink-shell.sh local

    Python Shell运行在一个指定的JobManager上,通过关键字remote和对应的JobManager的地址和端口号来进行指定:

      Python Shell可以运行在YARN集群之上。YARN的container的数量可以通过参数-n <arg>进行指定。Python shell在Yarn上部署一个新的Flink集群,并进行连接。除了指定container数量,你也可以指定JobManager的内存,YARN应用的名字等参数。例如,在一个部署了两个TaskManager的Yarn集群上运行Python Shell:

      如果你已经通过Flink Yarn Session部署了一个Flink集群,能够通过以下的命令连接到这个集群:

      1. bin/pyflink-shell.sh yarn
      1. Flink Python Shell
      2. 命令: local [选项]
      3. 启动一个部署在localFlink Python shell
      4. 使用:
      5. -h,--help 查看所有可选的参数
      6. 命令: remote [选项] <host> <port>
      7. 启动一个部署在remote集群的Flink Python shell
      8. <host>
      9. JobManager的主机名
      10. <port>
      11. JobManager的端口号
      12. 使用:
      13. -h,--help 查看所有可选的参数
      14. 命令: yarn [选项]
      15. 启动一个部署在Yarn集群的Flink Python Shell
      16. 使用:
      17. -h,--help 查看所有可选的参数
      18. -jm,--jobManagerMemory <arg> 具有可选单元的JobManager
      19. container的内存(默认值:MB)
      20. -n,--container <arg> 需要分配的YARN container
      21. 数量 (=TaskManager的数量)
      22. -nm,--name <arg> 自定义YARN Application的名字
      23. -qu,--queue <arg> 指定YARNqueue
      24. -s,--slots <arg> 每个TaskManagerslots的数量
      25. -tm,--taskManagerMemory <arg> 具有可选单元的每个TaskManager
      26. container的内存(默认值:MB
      27. -h | --help