FAQ

You can download a [convenience script]({% link downloads/setup-pyflink-virtual-env.sh %}) to prepare a Python virtual env zip which can be used on Mac OS and most Linux distributions. You can specify the PyFlink version to generate a Python virtual environment required for the corresponding PyFlink version, otherwise the most recent version will be installed.

After setting up a python virtual environment, as described in the previous section, you should activate the environment before executing the PyFlink job.

Local

  1. $ source venv/bin/activate
  2. $ python xxx.py

Cluster

A PyFlink job may depend on jar files, i.e. connectors, Java UDFs, etc. You can specify the dependencies with the following Python Table APIs or through directly when submitting the job.

  1. # NOTE: Only local file URLs (start with "file:") are supported.
  2. # NOTE: The Paths must specify a protocol (e.g. "file") and users should ensure that the URLs are accessible on both the client and the cluster.
  3. table_env.get_config().set("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")

For details about the APIs of adding Java dependency, you can refer to the relevant documentation

You can add the Python files of directory myDir as following:

  1. table_env.add_python_file('myDir')

When executing jobs in mini cluster(e.g. when executing jobs in IDE) and using the following APIs in the jobs( e.g. TableEnvironment.execute_sql, StatementSet.execute, etc in the Python Table API; StreamExecutionEnvironment.execute_async in the Python DataStream API), please remember to explicitly wait for the job execution to finish as these APIs are asynchronous. Otherwise you may could not find the execution results as the program will exit before the job execution finishes. Please refer to the following example on how to do that: