集群客户端的安装过程以华为的FusionInsight为列:
1.在juypter上安装集群的客户端,这一步可以看之前的文章,里面有
华为fusionInsight集群客户端安装_sansan_ge的博客-CSDN博客华为fusionInsight集群客户端安装https://blog.csdn.net/sansan_ge/article/details/122195514?spm=1001.2014.3001.55012.打开juypter的终端,source 集群的环境变量,我安装在/opt/client下,执行source /opt/client/bigdata_env
3.修改/opt/client/Spark2x/spark/conf/spark-defaults.conf,新增如下参数:
spark.driver.host=本机ip
spark.driver.bindAddress=docker容器ip
spark.driver.port=容器映射端口(非22或8080映射端口)
spark.blockManager.port= 按实际填写
4.source环境变量后,终端执行printenv > env.py
5.将生成的env.py进行如下修改:
主要引入集群的spark环境,同时将本身的变量,进行os.environ[]进行处理
非kerberos认证集群:
引用集群spark客户端环境,这里不细讲
具体代码如下:
1.访问本地文件
import os,sys from env import * import pyspark from pyspark import SparkConf from pyspark.sql import SparkSession # 修改为实际路径 logFile = "file:///data/env.py" print("1") sparksession = SparkSession.Builder().appName("app").master("local").getOrCreate() print("2") logData = sparksession.read.text(logFile) print("3") logData.show() print("4")
2.连接集群,访问hive表
from env import * import pyspark from pyspark import SparkConf from pyspark.sql import SparkSession print("1") sparksession = SparkSession.Builder().appName("app").master("yarn").config("spark.submit.deployMode", "client").config('spark.yarn.queue', 'dev').enableHiveSupport().getOrCreate() #上面master("local") 也是可以的,只是会使用到本地的客户端,写yarn的会使用spark-submit提交到到集群上运行 #show databases sparksession.sql("show databases ").show() print("2") # 替换为实际库 sparksession.sql("use demo" ) sparksession.sql("show tables").show() a.show # 替换为实际输入表 print("read.table") logData = sparksession.read.table("test_01") logData.show() # 替换为待生成的目的表 print("saveAsTable") logData = sparksession.write.saveAsTable("test_02") logData.show() print("3")
kerberos认证集群:
以华为FusionInsight为例
import os,sys from env import * import pyspark from pyspark import SparkConf from pyspark.sql import SparkSession #使用kinit生成认证,这步 *** 作之前,一定要先source到集群相关的环境变量 os.system('kinit -kt xxx.keytab xxx') print("1") sparksession = SparkSession.Builder().appName("app").master("yarn").config("spark.submit.deployMode", "client").config('spark.cleaner.ttl','2000').config('spark.yarn.queue', 'dev').enableHiveSupport().getOrCreate() #show databases sparksession.sql("show databases").show() #后面基本就跟上面是一样的,只是多了认证的过程
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)