juypter中使用pyspark *** 作集群与本地数据

juypter中使用pyspark *** 作集群与本地数据,第1张

juypter中使用pyspark *** 作集群与本地数据

集群客户端的安装过程以华为的FusionInsight为列:

1.在juypter上安装集群的客户端,这一步可以看之前的文章,里面有

华为fusionInsight集群客户端安装_sansan_ge的博客-CSDN博客华为fusionInsight集群客户端安装https://blog.csdn.net/sansan_ge/article/details/122195514?spm=1001.2014.3001.55012.打开juypter的终端,source 集群的环境变量,我安装在/opt/client下,执行source /opt/client/bigdata_env

3.修改/opt/client/Spark2x/spark/conf/spark-defaults.conf,新增如下参数:

  spark.driver.host=本机ip
  spark.driver.bindAddress=docker容器ip
  spark.driver.port=容器映射端口(非22或8080映射端口)

  spark.blockManager.port= 按实际填写

4.source环境变量后,终端执行printenv > env.py

5.将生成的env.py进行如下修改:

主要引入集群的spark环境,同时将本身的变量,进行os.environ[]进行处理 

非kerberos认证集群:

引用集群spark客户端环境,这里不细讲

具体代码如下:

1.访问本地文件

import os,sys
from env import *
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
# 修改为实际路径
logFile = "file:///data/env.py"
print("1")
sparksession = SparkSession.Builder().appName("app").master("local").getOrCreate()
print("2")
logData = sparksession.read.text(logFile)
print("3")
logData.show()
print("4")

2.连接集群,访问hive表

from env import *
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

print("1")
sparksession = SparkSession.Builder().appName("app").master("yarn").config("spark.submit.deployMode", "client").config('spark.yarn.queue', 'dev').enableHiveSupport().getOrCreate()
#上面master("local") 也是可以的,只是会使用到本地的客户端,写yarn的会使用spark-submit提交到到集群上运行
#show databases
sparksession.sql("show databases ").show()
print("2")
# 替换为实际库
sparksession.sql("use demo" )
sparksession.sql("show tables").show()
a.show
# 替换为实际输入表
print("read.table")
logData = sparksession.read.table("test_01")
logData.show()

# 替换为待生成的目的表
print("saveAsTable")
logData = sparksession.write.saveAsTable("test_02")
logData.show()
print("3")

kerberos认证集群:
以华为FusionInsight为例

import os,sys
from env import *
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession
#使用kinit生成认证,这步 *** 作之前,一定要先source到集群相关的环境变量
os.system('kinit -kt xxx.keytab xxx')

print("1")
sparksession = SparkSession.Builder().appName("app").master("yarn").config("spark.submit.deployMode", "client").config('spark.cleaner.ttl','2000').config('spark.yarn.queue', 'dev').enableHiveSupport().getOrCreate()
#show databases
sparksession.sql("show databases").show()

#后面基本就跟上面是一样的,只是多了认证的过程

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5709271.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存