常见问题
您可以下载[便捷脚本]({% link downloads/setup-pyflink-virtual-env.sh %}),以准备可在Mac OS和大多数Linux发行版上使用的Python虚拟环境包(virtual env zip)。 您可以指定PyFlink的版本,来生成对应的PyFlink版本所需的Python虚拟环境,否则将安装最新版本的PyFlink所对应的Python虚拟环境。
集群(Cluster)
$ # 指定用于执行python UDF workers (用户自定义函数工作者) 的python解释器的路径
$ table_env.get_config().set_python_executable("venv.zip/venv/bin/python")
如果需要了解add_python_archive
和set_python_executable
用法的详细信息,请参阅相关文档。
有关添加Java依赖项的API的详细信息,请参阅。
您可以使用命令行参数pyfs
或TableEnvironment的API 添加python文件依赖,这些依赖可以是python文件,python软件包或本地目录。 例如,如果您有一个名为myDir
的目录,该目录具有以下层次结构:
myDir
├──utils
├──__init__.py
├──my_util.py
当在 mini cluster 环境执行作业(比如,在IDE中执行作业)且在作业中使用了如下API(比如 Python Table API 的 TableEnvironment.execute_sql, StatementSet.execute 和 Python DataStream API 的 StreamExecutionEnvironment.execute_async) 的时候,因为这些API是异步的,请记得显式地等待作业执行结束。否则程序会在已提交的作业执行结束之前退出,以致无法观测到已提交作业的执行结果。 请参考如下示例代码,了解如何显式地等待作业执行结束:
t_result = table_env.execute_sql(...)
t_result.wait()
# 异步执行 DataStream 作业
job_client = stream_execution_env.execute_async('My DataStream Job')
job_client.get_job_execution_result().result()
注意: 当往远程集群提交作业时,无需显式地等待作业执行结束,所以当往远程集群提交作业之前,请记得移除这些等待作业执行结束的代码逻辑。