Flink流批一体计算(9):Flink Python

news/2025/2/22 10:44:22

目录

使用Python依赖

使用自定义的Python虚拟环境

方式一:在集群中的某个节点创建Python虚拟环境

方式二:在本地开发机创建Python虚拟环境

使用JAR包

使用数据文件


使用Python依赖

通过以下场景为您介绍如何使用Python依赖:

  • 使用自定义的Python虚拟环境
  • 使用第三方Python包
  • 使用JAR包
  • 使用数据文件

使用自定义的Python虚拟环境

方式一:在集群中的某个节点创建Python虚拟环境

set -e

# 创建Python的虚拟环境。

python3.6 -m venv venv

# 激活Python虚拟环境。

source venv/bin/activate

# 准备Python虚拟环境。

pip install --upgrade pip

# 安装PyFlink依赖。

pip install "apache-flink==1.13.0"

# 退出Python虚拟环境。

deactivate

该命令执行完成后,会生成一个名为venv的目录,即为Python 3.6的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境。

为了使用该Python虚拟环境,您可以选择将该Python虚拟环境分发到集群的所有节点上,也可以在提交PyFlink作业的时候,指定使用该Python虚拟环境。

以下命令显示了不同的PyFlink作业提交用例:

  • 执行 PyFlink job:
$ ./bin/flink run --python examples/python/table/batch/word_count.py
  • 使用pyFiles和--pyModule中指定的主入口模块运行PyFlink作业:
./bin/flink run \
--pyModule batch.word_count \
--pyFiles examples/python/table/batch
  • 在特定主机<jobmanagerHost>运行的JobManager上提交PyFlink作业(相应地调整命令):
$ ./bin/flink run \
 --jobmanager <jobmanagerHost>:8081 \
 --python examples/python/table/batch/word_count.py
  • 在yarn集群上以Per-Job 模式运行PyFlink job:
$ ./bin/flink run \
--target yarn-per-job
--python examples/python/table/batch/word_count.py

方式二:在本地开发机创建Python虚拟环境

set -e

# 下载Python 3.7 miniconda.sh脚本。

wget "https://repo.continuum.io/miniconda/Miniconda3-py37_4.9.2-Linux-x86_64.sh" -O "miniconda.sh"

# 为Python 3.7 miniconda.sh脚本添加执行权限。

chmod +x miniconda.sh

# 创建Python的虚拟环境。

./miniconda.sh -b -p venv

# 激活Conda Python虚拟环境。

source venv/bin/activate ""

# 安装PyFlink依赖。

pip install "apache-flink==1.13.0"

# 退出Conda Python虚拟环境。

conda deactivate

# 删除缓存的包。

rm -rf venv/pkgs

# 将准备好的Conda Python虚拟环境打包。

zip -r venv.zip venv

该命令执行完成后,会生成一个名为venv.zip的文件,即为Python 3.7的虚拟环境。您也可以修改上述脚本,安装其他版本的Python虚拟环境,或者在虚拟环境中安装所需的第三方Python包。

使用JAR包

如果您的Flink Python作业中使用了Java类,例如作业中使用了Connector或者Java自定义函数时,则需要指定Connector或者Java自定义函数所在的JAR包。

引用Java UDF或外部连接器的PyFlink作业。--jarfile中指定的JAR文件将上载到集群。

$ ./bin/flink run \
--python examples/python/table/batch/word_count.py \
--jarfile <jarFile>

使用数据文件

如果您的Flink Python作业中需要访问数据文件,例如模型文件等,则可以通过Python Archives的方式来访问。

  • 执行 PyFlink job,增加 source和资源文件 ,--pyFiles中指定的文件将被添加到PYTHONPATH中,因此在Python代码中可用。
$ ./bin/flink run \
--python examples/python/table/batch/word_count.py \
--pyFiles file:///user.txt,hdfs:///$namenode_address/username.txt

                

http://www.niftyadmin.cn/n/699719.html

相关文章

「深度学习之优化算法」(七)杜鹃搜索算法

1. 杜鹃搜索算法简介 (以下描述,均不是学术用语,仅供大家快乐的阅读) 杜鹃搜索算法(Cuckoo search,CS)是一种模仿杜鹃鸟寻窝产卵活动的群集智能优化算法。杜鹃搜索算法的流程简单,有较强的跳出局部最优能力,但由于算法中列维飞行实现较复杂且算法提出时间不长,还有很…

Spring6快速入门

Spring6快速入门 各工具版本要求&#xff1a; JDK:Java17 Maven:3.6 Spring:6.0.9 简介 Spring是一个控制反转(IOC)&#xff0c;面向切面(AOP)编程的轻量级框架。 2002年&#xff0c;Rod Johnson首次推出Spring框架的雏形&#xff0c;interface21(https://interface21.io/) 200…

我的创作纪念日(一周年)

机缘 作为一位互联网安全专业的大一学生&#xff0c;我始终怀抱着提升自身技术能力、保护网络安全、推动互联网世界发展的初心。 通过实战项目的经验分享&#xff0c;我收获颇多。参与团队网络安全演练与攻防对抗&#xff0c;使我学会了应对不同类型攻击与漏洞&#xff0c;提…

人工智能:揭示未来科技所带来的革命性变革

目录 引言&#xff1a; 一、人工智能的定义与发展历程&#xff1a; 二、人工智能的应用领域&#xff1a; 三、人工智能对未来的影响&#xff1a; 结论&#xff1a; 引言&#xff1a; 在当今科技快速发展的时代&#xff0c;人工智能&#xff08;Artificial Intelligence&am…

哪个爬虫库用的最多?

在Python中&#xff0c;最常用的爬虫库是requests和BeautifulSoup。requests库用于发送HTTP请求和处理响应&#xff0c;而BeautifulSoup库用于解析HTML文档。这两个库通常结合使用&#xff0c;用于爬取网页内容并提取所需的数据。其他常用的爬虫库还包括Scrapy、Selenium等。 常…

设计模式第18讲——中介者模式(Mediator)

目录 一、什么是中介者模式 二、角色组成 三、优缺点 四、应用场景 4.1 生活场景 4.2 java场景 五、代码实现 5.0 代码结构 5.1 抽象中介者&#xff08;Mediator&#xff09;——LogisticsCenter 5.2 抽象同事类&#xff08;Colleague&#xff09;——Participant 5…

The Company Requires Superficial StudyPHP 打开执行PHP ②

作者 : SYFStrive 博客首页 : HomePage &#x1f4dc;&#xff1a; PHP MYSQL &#x1f4cc;&#xff1a;个人社区&#xff08;欢迎大佬们加入&#xff09; &#x1f449;&#xff1a;社区链接&#x1f517; &#x1f4cc;&#xff1a;觉得文章不错可以点点关注 &#x1f44…

Flink 源算子之 DataGeneratorSource DataGenerator

目录 1、功能说明 2、API使用说明 3、代码示例 1、功能说明 从Flink1.1开始提供了DataGen连接器&#xff0c;它提供了Source类的实现&#xff08;可并行的源算子&#xff09;&#xff0c;用来生成测试数据&#xff0c;在本地开发或者无法访问外部系统(如kafka)时&#xff0c…