Python REPL

    Note The Python Shell will run the command “python”. Please refer to the Python Table API installation guide on how to set up the Python execution environments.

    To use the shell with an integrated Flink cluster, you can simply install PyFlink with PyPi and execute the shell directly:

    To run the shell on a cluster, please see the Setup section below.

    The shell only supports Table API currently. The Table Environments are automatically prebound after startup. Use “bt_env” and “st_env” to access BatchTableEnvironment and StreamTableEnvironment respectively.

    stream

    1. >>> import os
    2. >>> import shutil
    3. >>> sink_path = tempfile.gettempdir() + '/streaming.csv'
    4. >>> if os.path.exists(sink_path):
    5. ... if os.path.isfile(sink_path):
    6. ... os.remove(sink_path)
    7. ... else:
    8. ... shutil.rmtree(sink_path)
    9. >>> s_env.set_parallelism(1)
    10. >>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
    11. >>> st_env.create_temporary_table("stream_sink", TableDescriptor.for_connector("filesystem")
    12. ... .schema(Schema.new_builder()
    13. ... .column("a", DataTypes.BIGINT())
    14. ... .column("b", DataTypes.STRING())
    15. ... .column("c", DataTypes.STRING())
    16. ... .build())
    17. ... .option("path", path)
    18. ... .format(FormatDescriptor.for_format("csv")
    19. ... .option("field-delimiter", ",")
    20. ... .build())
    21. ... .build())
    22. >>> t.select("a + 1, b, c")\
    23. >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
    24. >>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
    25. ... print(f.read())

    batch

    1. >>> import tempfile
    2. >>> import os
    3. >>> sink_path = tempfile.gettempdir() + '/batch.csv'
    4. >>> if os.path.exists(sink_path):
    5. ... if os.path.isfile(sink_path):
    6. ... os.remove(sink_path)
    7. ... else:
    8. ... shutil.rmtree(sink_path)
    9. >>> b_env.set_parallelism(1)
    10. >>> t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])
    11. >>> st_env.create_temporary_table("batch_sink", TableDescriptor.for_connector("filesystem")
    12. ... .schema(Schema.new_builder()
    13. ... .column("a", DataTypes.BIGINT())
    14. ... .column("b", DataTypes.STRING())
    15. ... .column("c", DataTypes.STRING())
    16. ... .build())
    17. ... .option("path", path)
    18. ... .format(FormatDescriptor.for_format("csv")
    19. ... .option("field-delimiter", ",")
    20. ... .build())
    21. ... .build())
    22. >>> t.select("a + 1, b, c")\
    23. ... .execute_insert("batch_sink").wait()
    24. >>> # If the job runs in local mode, you can exec following code in Python shell to see the result:
    25. >>> with open(os.path.join(sink_path, os.listdir(sink_path)[0]), 'r') as f:
    26. ... print(f.read())

    To get an overview of what options the Python Shell provides, please use

    To use the shell with an integrated Flink cluster just execute:

    1. pyflink-shell.sh local

      The shell can deploy a Flink cluster to YARN, which is used exclusively by the shell. The shell deploys a new Flink cluster on YARN and connects the cluster. You can also specify options for YARN cluster such as memory for JobManager, name of YARN application, etc.

      For example, to start a Yarn cluster for the Python Shell with two TaskManagers use the following:

      For all other options, see the full reference at the bottom.

      If you have previously deployed a Flink cluster using the Flink Yarn Session, the Python shell can connect with it using the following command:

      1. pyflink-shell.sh yarn
      1. Usage: pyflink-shell.sh [local|remote|yarn] [options] <args>...
      2. Command: local [options]
      3. Starts Flink Python shell with a local Flink cluster
      4. usage:
      5. -h,--help Show the help message with descriptions of all options.
      6. Command: remote [options] <host> <port>
      7. Starts Flink Python shell connecting to a remote cluster
      8. <host>
      9. Remote host name as string
      10. <port>
      11. Remote port as integer
      12. usage:
      13. -h,--help Show the help message with descriptions of all options.
      14. Command: yarn [options]
      15. Starts Flink Python shell connecting to a yarn cluster
      16. usage:
      17. -h,--help Show the help message with descriptions of
      18. all options.
      19. -jm,--jobManagerMemory <arg> Memory for JobManager Container with
      20. optional unit (default: MB)
      21. -nm,--name <arg> Set a custom name for the application on
      22. YARN
      23. -qu,--queue <arg> Specify YARN queue.
      24. -s,--slots <arg> Number of slots per TaskManager
      25. -tm,--taskManagerMemory <arg> Memory per TaskManager Container with
      26. optional unit (default: MB)