单节点pyspark-tensorflow-jupyter notebook配置教程(以使用LSTM算法预测比特币价格的案例程序为例)

单节点pyspark-tensorflow-jupyter notebook配置教程(以使用LSTM算法预测比特币价格的案例程序为例),第1张

本文的目的是在一个Linux节点上集成pysparktensorflow、使用jupyter notebook 来编写和调试代码,从而集成spark和tensorflow进行深度学习。为了达成此目的,本文选取了比特币价格预测的案例(仅示范了分布式推断),相关的数据文件和代码文件在本文中以百度网盘的链接给出。此外,本文的组织结构已在下面的目录中给出,读者可针对性的阅读。

文章目录 0 前言1 安装配置PySpark1.1 安装Anaconda1.2 创建虚拟环境 2 安装配置Jupyter Notebook2.1 安装jupyter notebook2.2 配置Jupyter Notebook2.3 后台运行jupyter服务2.4 在jupyter notebook上编写pyspark程序 3 集成Python第三方库(以TF为例)3.1 下载Tensorflow3.2 复现简易程序测试(以预测比特币价格为例) 4 知识附录4.1 conda install与pip install4.2 PySpark架构4.3 pandas DataFrame与spark DataFrame4.4 pandas_udf的用法 5 参考资料


0 前言

因为 *** 作系统、大数据组件(如hadoop和spark)、python和tensorflow等第三方库之间需要版本兼容,所以在依据本教程进行复现时推荐与作者使用一致的版本,或自己去查阅相关组件的版本兼容性。

本教程使用的 *** 作系统是CentOS7.6,使用的各组件版本如下表,这些组件安装包皆可从笔者的百度云链接获取:https://pan.baidu.com/s/1lV6RkuCWQ2URvcVl33dEaQ,提取码:h202。

组件版本
anaconadAnaconda3-2021.05-Linux-x86_64
python3.8.8
hadoop3.2.2
sparkspark-3.1.1-bin-hadoop3.2
tensorflow2.7.0
为了更有针对性的介绍,此处对hadoop和spark单节点配置的过程便不再展开,当然,因为考虑到部分读者可能需要相关参考,笔者单独写了一篇博客:Spark-Hadoop在Linux节点上以Local模式部署 作为参考。
1 安装配置PySpark 1.1 安装Anaconda

如果想自定义anaconda的版本,可以前往 清华大学开源软件镜像站选择合适的版本进行下载,如果配置与笔者一致,可以直接使用上文百度云分享的安装脚本。

笔者使用的Anaconda安装文件是Anaconda3-2021.05-Linux-x86_64.sh,该安装文件保存在Linux系统的/home/ZSX/tarfile目录下,其中/home/ZSX是用户根目录,等于~,则可执行如下命令开始安装Anaconda:

cd ~/tarfile/
bash Anaconda3-2021.05-Linux-x86_64.sh

输入命令以后,如下图所示,会提示你查看许可文件,直接敲入回车即可。

[ZSX@westgis123 ~/tarfile]$ bash Anaconda3-2021.05-Linux-x86_64.sh 

Welcome to Anaconda3 2021.05

In order to continue the installation process, please review the license
agreement.
Please, press ENTER to continue
>>> 

敲入回车以后,会出现软件许可文件,这个文件很长,可以一直不断按回车,来翻到文件的末尾。

翻到许可文件末尾以后,会出现提示“是否接受许可条款”,如方框中的内容所示,此时只需输入yes后回车即可:

Last updated April 5, 2021
Do you accept the license terms? [yes|no]
[no] >>> 
Please answer 'yes' or 'no':'
>>> 

然后,会出现如下所示界面,提醒你选择安装路径,这里不要自己指定路径,直接回车就可以(回车后系统就会安装到默认路径,比如这里是/home/ZSX/anaconda3,即在用户根目录下新建一个anaconda3,等于~/anaconda3)。

Anaconda3 will now be installed into this location:
/home/ZSX/anaconda3

  - Press ENTER to confirm the location
  - Press CTRL-C to abort the installation
  - Or specify a different location below

[/home/ZSX/anaconda3] >>> 

系统会提示你是否运行conda初始化,也就是设置一些环境变量,这里输入yes以后回车。

Preparing transaction: done
Executing transaction: done
installation finished.
Do you wish the installer to initialize Anaconda3
by running conda init? [yes|no]
[no] >>>  yes

Tips:如果长时间不 *** 作,安装程序会默认按[no]选项进行安装,若之后还想进行初始化 *** 作,可以使用命令:conda init 进行初始化。

安装成功以后,可以看到如下信息。

==> For changes to take effect, close and re-open your current shell. <==

If you'd prefer that conda's base environment not be activated on startup, 
   set the auto_activate_base parameter to false: 

conda config --set auto_activate_base false

Thank you for installing Anaconda3!

===========================================================================

Working with Python and Jupyter notebooks is a breeze with PyCharm Pro,
designed to be used with Anaconda. Download now and have the best data
tools at your fingertips.

PyCharm Pro for Anaconda is available at: https://www.anaconda.com/pycharm

安装结束后,重新打开一个终端,输入命令:conda -V,可以查看版本信息,若如下所示,显示出来对应的版本号,则anaconda安装即初始化成功。

(base) [ZSX@westgis123 ~]$ conda -V
conda 4.10.1

这时,你会发现,在命令提示符的开头多了一个(base),这意味着当前终端激活了anaconda的虚拟环境,看着很难受,可以该虚拟环境的名字叫做"base",可以在终端中运行如下命令来禁止每次打开终端时自动启动该虚拟环境。

conda config --set auto_activate_base false

1.2 创建虚拟环境

此时我们便可在名为base的虚拟环境下安装Python的第三方库了,但为了方便包管理与环境管理配置,所以建议先针对任务场景创建虚拟环境;此外我们还需要配置下载源镜像,防止下载Python包速度太慢的情况,这里使用的是清华大学开源镜像源,执行下列命令。

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --set show_channel_urls yes

之后创建使用Jupyter Notebook和TensorFlow时专属的conda虚拟环境,命令如下:

conda create -n tensorflow python=3.8.8

在创建虚拟环境的过程中,会出现“是否继续”的提示,键入"y"即可:

Proceed ([y]/n)? y

之后仅需等待虚拟环境创建即可,创建成功的结果如下图所示:

之后执行命令conda activate tensorflow即可激活我们方才创建的虚拟环境,效果如下图所示,从图中我们可以发现命令行提示符前有(tensorflow)的标记,这说明虚拟环境已激活成功。


2 安装配置Jupyter Notebook 2.1 安装jupyter notebook

使用如下命令安装jupyter notebook:

conda install jupyter notebook

安装过程中会出现“是否继续”的提示,键入"y"之后等待安装完成即可:

Proceed ([y]/n)? y

安装成功的结果如下图所示:


2.2 配置Jupyter Notebook

安装成功之后即可配置Jupyter Notebook,先在终端中执行如下命令:

jupyter notebook --generate-config

执行效果如下图所示,可以发现该命令在~/.jupyter目录下产生了配置文件

之后在linux终端输入下命令: ~/anaconda3/bin/python,进入python解释器界面:

然后,在Python解释器界面(不是Linux Shell命令界面)后面输入如下命令:

from notebook.auth import passwd
passwd()

运行结果如下图所示:


此时系统会让输入密码,并让你确认密码(如:123456),这个密码是后面进入Jupyter网页页面的密码,然后系统会生成一个密码字符串,如下图所示:

把这个argon2字符串复制粘贴到一个文件中保存起来,之后退出python界面,之后使用得到的argon2密码字符串在jupyter的配置文件中配置密码,笔者得到的密钥如下:

'argon2:$argon2id$v=19$m=10240,t=10,p=8$wnWBpN1MmBI+0pD4WmScqgpFULGVRlDWPNeTIOvz9Q'

配置文件的路径是~/.jupyter/jupyter_notebook_config.py,直接使用vim命令对其编辑即可,进入到编辑页面,在文件的开头增加以下内容:

# Configuration file for jupyter-notebook.
c.ServerApp.ip='*'  # 就是设置所有ip皆可访问  
c.ServerApp.password = 'argon2:$argon2id$v=19$m=10240,t=10,p=8$wnWBpN1MmBI+0pD4WmScqgpFULGVRlDWPNeTIOvz9Q'    #上一步生成的密文  
c.ServerApp.port =8888  # 端口
c.ServerApp.allow_remote_access = True  #远程访问
c.ServerApp.notebook_dir = '/home/ZSX/jupyter'  #设置Notebook启动进入的目录
c.NotebookApp.open_browser = False# 禁止自动打开浏览器

添加信息后,保存并退出即可,需要注意的是,该配置文件中c.ServerApp.password的取值就是前面生成的密文。另外,c.ServerApp.notebook_dir = '/home/ZSX/jupyter'这行用于设置Notebook启动进入的目录,由于该目录还不存在,所以需要在终端中执行如下命令创建:mkdir ~/jupyter/

目录创建成功后,即可进入目录启动jupyter notebook,具体命令如下所示:

cd ~/jupyter/
jupyter notebook --ip 192.168.1.96 #jupyter服务器的公网IP,即Linux节点公网IP

需要注意的是这里的192.168.1.96读者需要替换成自己linux节点的公网IP,该参数的作用是使指定的Linux节点作为jupyter notebook的服务器,并向外网开放自己的服务,访问方式是通过IP:端口的形式访问,以该方式访问时需要输入登入密码。

输入命令后得到的结果如下所示:

[I 13:08:00.760 NotebookApp] Serving notebooks from local directory: /home/ZSX
[I 13:08:00.760 NotebookApp] Jupyter Notebook 6.4.11 is running at:
[I 13:08:00.760 NotebookApp] http://192.168.1.96:8888/?token=1fa5f34e18ce6b8cf7c9db922203b074b3a8198d6a981838
[I 13:08:00.760 NotebookApp]  or http://127.0.0.1:8888/?token=1fa5f34e18ce6b8cf7c9db922203b074b3a8198d6a981838
[I 13:08:00.760 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 13:08:00.764 NotebookApp] 

    To access the notebook, open this file in a browser:
        file:///home/ZSX/.local/share/jupyter/runtime/nbserver-9401-open.html
    Or copy and paste one of these URLs:
        http://192.168.1.96:8888/?token=1fa5f34e18ce6b8cf7c9db922203b074b3a8198d6a981838
     or http://127.0.0.1:8888/?token=1fa5f34e18ce6b8cf7c9db922203b074b3a8198d6a981838

我们可以在Linux节点上打开浏览器,复制上述输出结果中最后两个网址中的任意一个来进入Jupyter Notebook页面,但是因为在linux节点上启动浏览器容易卡顿,所以推荐用window打开浏览器输入倒数第二个网址进行访问。

Tip1:要想在window上成功访问Linux节点,还需要先关闭Linux节点的防火墙,否则两节点间无法ping通)

Tip2:以生成链接的方式访问Jupyter服务器时,不需要输入登入密码

在浏览器访问到的Jupyter notebook页面如下图所示:

此时可以新建python文件,进行代码的编写和调试,具体流程如下图:

在新建的python笔记本文件内,输入print("This is a ipynb!"),并按ctrl+Enter执行,查看代码是否可以正常运行,若可以,则jupyter notebook配置成功,正常运行结果如下图所示。

注:关闭jupyter notebook服务时,在jupyter notebook主页右上角找到Quit按钮,点击该按钮即可关闭。


2.3 后台运行jupyter服务

在某个终端启动jupyter notebook服务时,我们可以发现该终端会被占用,无法键入新的 *** 作命令;此外,当该终端被关闭时,jupyter notebook服务也会随之停止。为了解决上述缺陷,可以使用nohup命令后台运行jupyter notebook服务,并将输出信息重定向到当前目录下的nohup.out文件内,执行命令如下:

cd ~/jupyter
nohup ~/anaconda3/bin/jupyter notebook --ip 192.168.1.96

执行命令后会在~/jupyter/目录下产生nohup.out文件,访问链接可在该文件内找到,在查找时可以用vim命令查看文件,也可以使用cat nohup.out|grep 192.168.1.96:8888来过滤出访问链接。


2.4 在jupyter notebook上编写pyspark程序

首先,要先在~/.bashrc文件内配置PySpark相关的环境变量,添加如下内容:

#PYSPARK
export PYSPARK_PYTHON=/home/ZSX/anaconda3/bin/python
export PYSPARK_DRIVER_PYTHON=/home/ZSX/anaconda3/bin/python
export LD_LIBRARY_PATH=/home/ZSX/anaconda3/lib/:$LD_LIBRARY_PATH
export PYTHONPATH=$SPARK_HOME/python:$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH

读者把上述内容中的/home/ZSX/anaconda换成自己anaconda所在的安装目录即可,之后再使用source ~/.bashrc更新环境变量。

为了检测环境变量是否已更新,可以在命令行界面键入pyspark,进入PySpark的交互式界面,之后键入下述代码进行测试。

#Tip:在运行代码之前,读者需要根据自己的文件路径修改logFile的路径
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
logFile = "file:///home/ZSX/bigdata/spark/README.md"
logData = sc.textFile(logFile, 2).cache()
numAs = logData.filter(lambda line: 'a' in line).count()
numBs = logData.filter(lambda line: 'b' in line).count()
print('Lines with a: %s, Lines with b: %s' % (numAs, numBs))

代码运行结果如下图所示,则说明环境变量生效。

此时打开一个新终端,激活虚拟环境,重启jupyter notebook服务:

conda activate tensorflow
cd ~/jupyter
rm nohup.out
nohup ~/anaconda3/bin/jupyter notebook --ip 192.168.1.96

之后在jupyter notebook页面创建一个新的python笔记本,重命名为pyspark_test,并键入之前在pyspark交互式界面用来测试的代码,之后运行,若运行成功,则jupyter notebook与pyspark集成成功,具体效果如下图所示:


3 集成Python第三方库(以TF为例) 3.1 下载Tensorflow

激活虚拟环境,并使用pip下载tensorflow,具体命令如下所示:

conda activate tensorflow #如果已经激活虚拟环境了,则不需要执行该语句
pip install --upgrade tensorflow==2.7.0 -i  https://pypi.douban.com/simple

命令执行效果如下图,耐心等待下载完成即可。(如果读者与笔者使用版本一致,那大概率是不会在下载中途报错的,但若版本不一致,则可能会出现版本兼容性问题,如glibc版本与tensorflow不兼容等,此时可以自行检索解决方案,也可更换组件版本)

耐心等待一段时间后,TensorFlow便下载完成了,笔者这次下载的过程中没有报错。

下载完成后,在命令行界面输入python,进入python交互式界面,之后输入以下代码,查看是否可以成功加载tensorflow模块。

import tensorflow as tf

很不幸,代码报了如下错误:

2022-06-08 15:55:50.806131: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
Traceback (most recent call last):
  File "", line 1, in 
  File "/home/ZSX/anaconda3/envs/tensorflow/lib/python3.8/site-packages/tensorflow/__init__.py", line 41, in 
    from tensorflow.python.tools import module_util as _module_util
  File "/home/ZSX/anaconda3/envs/tensorflow/lib/python3.8/site-packages/tensorflow/python/__init__.py", line 41, in 
    from tensorflow.python.eager import context
  File "/home/ZSX/anaconda3/envs/tensorflow/lib/python3.8/site-packages/tensorflow/python/eager/context.py", line 33, in 
    from tensorflow.core.framework import function_pb2
  File "/home/ZSX/anaconda3/envs/tensorflow/lib/python3.8/site-packages/tensorflow/core/framework/function_pb2.py", line 16, in 
    from tensorflow.core.framework import attr_value_pb2 as tensorflow_dot_core_dot_framework_dot_attr__value__pb2
  File "/home/ZSX/anaconda3/envs/tensorflow/lib/python3.8/site-packages/tensorflow/core/framework/attr_value_pb2.py", line 16, in 
    from tensorflow.core.framework import tensor_pb2 as tensorflow_dot_core_dot_framework_dot_tensor__pb2
  File "/home/ZSX/anaconda3/envs/tensorflow/lib/python3.8/site-packages/tensorflow/core/framework/tensor_pb2.py", line 16, in 
    from tensorflow.core.framework import resource_handle_pb2 as tensorflow_dot_core_dot_framework_dot_resource__handle__pb2
  File "/home/ZSX/anaconda3/envs/tensorflow/lib/python3.8/site-packages/tensorflow/core/framework/resource_handle_pb2.py", line 16, in 
    from tensorflow.core.framework import tensor_shape_pb2 as tensorflow_dot_core_dot_framework_dot_tensor__shape__pb2
  File "/home/ZSX/anaconda3/envs/tensorflow/lib/python3.8/site-packages/tensorflow/core/framework/tensor_shape_pb2.py", line 36, in 
    _descriptor.FieldDescriptor(
  File "/home/ZSX/anaconda3/envs/tensorflow/lib/python3.8/site-packages/google/protobuf/descriptor.py", line 560, in __new__
    _message.Message._CheckCalledFromGeneratedFile()
TypeError: Descriptors cannot not be created directly.
If this call came from a _pb2.py file, your generated code is out of date and must be regenerated with protoc >= 3.19.0.
If you cannot immediately regenerate your protos, some other possible workarounds are:
 1. Downgrade the protobuf package to 3.20.x or lower.
 2. Set PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION=python (but this will use pure-Python parsing and will be much slower).

可以看到错误提示中包含了修改建议Downgrade the protobuf package to 3.20.x or lower, 此时我们按照提示重装protobuf,在bash命令行键入:

pip install protobuf==3.20.1 -i  https://pypi.douban.com/simple 

完成如上 *** 作之后,再重新导入tensorflow,则不会报错,至此tensorflow安装成功。

虽然经过上述 *** 作之后没有报错了,但是会出现warning,这个和TF版本有关,但是不影响程序运行,如果想消除warning,需要根据提示选择合适的CPU或GPU版本


3.2 复现简易程序测试(以预测比特币价格为例)

在编写pyspark程序之前,需要先安装pyarrow,具体命令如下:

 pip install pyarrow -i  https://pypi.douban.com/simple

值得一提的是:笔者是先在conda的默认虚拟环境base下配置好环境、编写调试通过本文使用的案例程序才着手写的当前教程,为了使第三方库管理更加规范才创建了名为Tensoflow的新虚拟环境,但不幸的是在该虚拟环境下无法在jupyter notebook中通过pandas_udf函数调用tensorflow模块,会出现ModuleNotFoundError: No module named 'tensorflow'的错误,原因是使用虚拟环境时,PySpark相关的环境变量配置会更复杂,读者可以通过本文的第4章节“知识附录”的第1部分内容进行了解,不修改环境变量的解决方法是:

退出tensorflow的虚拟环境,激活base的虚拟环境

在该虚拟环境下重新使用pip下载pyarrow

在该虚拟环境下重新使用pip下载tensorflow

在该虚拟环境下重新使用pip重装protobuf

具体的代码如下所示:

conda deactivate #退出tensorflow虚拟环境
conda activate base #激活base虚拟环境
pip install pyarrow -i  https://pypi.douban.com/simple
pip install --upgrade tensorflow==2.7.0 -i  https://pypi.douban.com/simple
pip install protobuf==3.20.1 -i  https://pypi.douban.com/simple 

之后无论是base环境还是tensorflow环境,我们进入jupyter notebook编写和调试代码都不会存在之前的问题了,相关代码文件和需要的数据文件作者已整理放在了百度网盘:

链接:https://pan.baidu.com/s/1fgp3FtNUOwgFk-58iu6-RQ

提取码:h202

文件内容如下图所示,读者只需关心图中红框标识的两个核心文件即可,bitstampUSD.csv是案例程序使用的原始的比特币交易数据集,PredictBTC.ipynb是笔者编写的案例程序的jupyter notebook文件,PredBTC.py是由PredictBTC.ipynb导出得到的py文件。

读者可以将百度网盘下载下来的文件夹存放到Linux节点的~/jupyter/目录,因为笔者使用的是local模式,数据文件路径也是相对路径,所以读者复现程序时甚至无需修改代码,直接执行PredictBTC.ipynb文件就可复现该示例程序,部分代码内容及运行效果如下图所示。

若读者能顺利复现该程序,则意味着:

读者成功的配置好了单节点模式的pyspark-jupyter notebook-tensorflow环境

通过类似的流程,读者可以将pyspark与其它第三方库整合

虽然之前我们创建的tensorflow虚拟环境没有用到,但这并不意味着它是没用的——在集群模式下,如果其它节点没有配置虚拟环境,我们可以将当前节点的虚拟环境打包为zip文件,之后用spark-submit提交应用程序到集群时指定archives参数,将虚拟环境分发到其它节点上,archives参数的取值为虚拟环境zip文件所在路径。(local模式下用本地文件路径即可,若以集群模式提交则推荐先上传zip文件到HDFS,之后使用HDFS的路径)

base的环境也是可打包的,但如果所有第三方库都安装在base虚拟环境,则打包产生的zip文件就会很大,会降低pyspark的程序性能,因此单独创建一个虚拟环境仅安装某一应用需要的库之后再打包为zip文件可以提升程序性能。

还是以本文的案例进行展开,我们创建了一个名为tensorflow的虚拟环境,其对应的文件目录在~/anaconda/envs/tensorflow,我们将其打包:

cd ~/anaconda/envs/tensorflow
zip -r tensorflow.zip tensorflow

之后用spark-submit命令提交PredBTC.py代码文件时,用archives参数指定我们方才打包好的虚拟环境的zip文件即可。

cd  ~/jupyter/BitCoin/
spark-submit --master local[8] \
--archives file:///home/ZSX/anaconda3/envs/tensorflow.zip \
PredBTC.py

之后耐心等待pyspark程序运行,其运行结果与在jupyter页面中运行PredictBTC.ipynb的结果一致,只不过会输出很多日志信息;此外,使用matplotlib绘制的图像会以xmanager的d窗出现,会有一丝丝卡顿,所以不推荐在spark-submit提交的代码文件中绘图,运行结果如下图所示:


4 知识附录

考虑到读者在按教程进行配置的过程中会产生一些疑问,且仅仅复现案例程序会对程序内的代码一知半解,因此笔者会在本部分对一些重要的知识点进行补充,包括:

conda install与pip install的区别

PySpark架构与Spark架构的异同

pandas dataframe与spark DataFrame的区别与连续

pandas_udf的用法


4.1 conda install与pip install conda install xxx:这种方式安装的库都会放在anaconda3/pkgs目录下,这样的好处就是,当在某个环境下已经下载好了某个库,再在另一个环境中还需要这个库时,就可以直接从pkgs目录下将该库复制至新环境而不用重复下载。pip install xxx:该方式分两种情况,一种情况就是当前conda环境的python是conda安装的,和系统的不一样;另一种情况是当前conda环境使用的是系统的python。 前者会将xxx安装到anaconda3/envs/current_env/lib/python3.x/site-packages文件夹中,current_env即为当前激活的虚拟环境。后者通常会将xxx被安装到~/.local/lib/python3.x/site-packages文件夹中

两者大部分情况下没什么区别,笔者更喜欢用pip,因为即便用了国内源的情况下,国内用pip网速也比conda快,所以本文安装TensorFlow时用的也是pip。

conda默认自带一个名为(base)的虚拟环境,但在该环境下安装的第三方库包会直接安装到anaconda/lib/python3.x/site-packages路径下,而非anaconda3/envs/base/目录下。

在之前,我们将~/anaconda/lib/添加到了环境变量LD_LIBRARY_PATH中,因此base环境下安装的第三方库可用,而~/anaconda3/envs/tensorflow/lib/未添加到环境变量,所以在我们自己创建的tensorflow虚拟环境下,无法使用pandas_udf调用tensorflow。


4.2 PySpark架构

在介绍PySpark集群架构之前,需要先复习回顾一下Spark集群的架构:Spark集群由Driver, Cluster Manager(Master,Yarn 或 Mesos),以及Worker组成。

其中Driver是每个Spark应用程序的程序入口,提供了SparkContext,其一般于提交程序的Client节点启动,用于向Cluster Manager注册和申请资源,向Executor分发任务。值得一提到是,当集群的部署模式为standalone时,Cluster Manager为Master。

其中Worker节点是负责具体计算任务的从节点,其存在一个或多个Executor进程,而每个Executor进程中又包括多个Task线程,负责基于RDD的分区执行具体的计算任务。(Spark底层数据 *** 作对象是d性分布式数据集RDD,Spark会将一个大的数据集划分成多个分区,每个分区存储一部分数据,通过让分区与Task对应进行计算任务的调度可以实现并行计算来加快任务的执行效率)

PySpark集群的架构与Spark集群的架构非常类似,因为为了不破坏Spark已有的运行架构,PySpark在实现时仅在外围包装一层Python API。在Driver端,其借助Py4j实现Python和Java的交互,实现通过Python编写Spark应用程序;而在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的序列化后的字节码。


4.3 pandas DataFrame与spark DataFrame

两者的对比,读者可以从下表有个大概的了解:

pandasspark​​​​​​​
工作方式单机single machine tool,没有并行机制parallelism,不支持Hadoop,处理大量数据有瓶颈分布式并行计算框架,内建并行机制parallelism,所有的数据和 *** 作自动并行分布在各个集群结点上。以处理in-memory数据的方式处理distributed数据。支持Hadoop,能处理大量数据。
延迟机制not lazy-evaluatedlazy-evaluated
内存缓存单机缓存persist() or cache()将转换的RDDs保存在内存
DataFrame可变性pandas中DataFrame是可变的Spark中RDDs是不可变的,因此DataFrame也是不可变的
index索引自动创建没有index索引,若需要需要额外创建该列
行结构Series结构,属于pandas DataFrame结构Row结构,属于Spark DataFrame结构
列结构Series结构,属于pandas DataFrame结构Column结构,属于Spark DataFrame结构,如:DataFrame[name: string]
列名称不允许重名允许重名,修改列名采用alias方法
列值排序支持支持
索引排序支持不支持

当然,上表的对比虽然细致,但是不够直观,粗略地总结一下,两者最核心的区别在于:

pandas DataFrame仅支持单机处理,无分区,也不支持大数据场景,但也因此其无论在行方向还是列方向上都是有顺序的,且行和列都是一等公民,不会区分对待,所以我们可以很方便地使用数据切片,这显示了其矩阵方面的意义;此外,其在此基础上增加了行索引和列索引,可以将数据抽象为一张二维表,这显示了其关系表的语义。

Spark DataFrame的底层数据 *** 作对象是RDD,有分区且不可变,因此适合大数据环境下的分布式存储和并行计算,但因为分区的存在,其不便实现行索引index,因此默认无index,这也造成了在索引数据时,Column是一等公民,Row是二等公民,且无法进行数据切片。由此不难看出,其只包含了关系表的语义,并不包含矩阵方面的意义,也不保证数据顺序。


4.4 pandas_udf的用法

pandas_udf的用法与spark udf的用法非常类似,而且可实现的功能比spark udf还要强大,而且实现语法也非常多样,所以需要花很多篇幅才能讲清楚,因此本文仅提供几个不错的学习资料:

Spark官网文档–PySpark 3.2.1 documentation

基于Pyspark的Pandas_udf使用方法_BeKnown的博客-CSDN博客_


5 参考资料

Spark2.1.0+入门:Spark的安装和使用(Python版)_厦大数据库实验室博客

使用Jupyter Notebook调试PySpark程序_厦大数据库实验室博客

和鲸社区 - 10天吃掉那只pyspark

Spark DataFrame 不是真正的 DataFrame - 秦续业的文章 - 知乎

pandas-dataframe与spark-dataframe *** 作的区别_leeshutao的博客-CSDN博客

请问大神们,pip install 和conda install有什么区别吗? - 月踏的回答 - 知乎

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/1498367.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-25
下一篇 2022-06-25

发表评论

登录后才能评论

评论列表(0条)

保存