1,RDD是Spark中最基本的运算模式,它只负责处理运算逻辑,不存储数据本身,通过转化换算子链式的去处理数据,转换算子在执行程序过程中是不加载数据的(算子:scala中的map,flatmap等)在Spark中称为算子,可以处理通过它处理数据。
2,在RDD中最终的数据打印时通过调用,行动算子从前一个hashNext进行调用,最终第一个RDD1去hashNext数据,所以RDD的就是一个封装的迭代器,就是它的真面目,一次次迭代到每个RDD中,当有多次计算时,计算的逻辑会封装在每个计算步骤所在的RDD中,这样就成为了一个个RDD封装包,里面的逻辑就是通过调用上一个HashNext来进行一层层向上寻找逻辑,而每个RDD中都封装了上一个运算的逻辑,这样当一个RDD运算逻辑挂掉后,可以通过下一个运算重新计数,保证了容错性,提高了安全性能。
1,RDD可以根据分片大小,和设置的local的核数大小进行分区并行计算,如local[1],为本地计算机的一个核数,也就是一个分区可以说,local[*]为本地计算机的所有核数,运行速度有所提高。
可以明显的看到时间有所提高。
1,RDD在执行转换算子的过程中,根据文件大小,和本地设置核数进行分区,同样也可以自己指定分区个数,groupbykey(3),可以指定,同样的在RDD同样可以避免在转换算子运行过程中避免产生shuffle阶段,有利于Spark的性能提升,所以尽量不要改变分区个数,若个增加分区,会产生格外的网络传输,shuffle阶段,性能有所下滑。
1,在linux环境里运行,bin/spark-shell 进入。
2,用spark默认的对象进行调用命令,sc.textFile("hdfs://hadoop102:8020/data/wc.txt").flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_).foreach(println)
3,在idea打包上传到linux上用命令调用(打包之前不要指定local),用spark自带的命令进行调用jar包,bin/spark-submit --master local[*] --class 指定类 ./wc.jar
5.spark单节点启动:1,指定启动master节点,bin/spark-master 可以先启动master,然后再启动worder工作者,bin/spark-worder spark://hadoop102:7077(当增加了worker节点时要单节点启动)。
1,spark-shell:启动和spark交互式命令,可以再启动的时候设置每个worker所需的内存大小,和所需核的大小。
2,spark-shell --master spark://linux01:7077, 默认使用集群中所有work的所有的核
3,spark-shell --master spark://linux01:7077 --total-executor-cores 8 ,指定程序所有的可用的核数。
4,spark-shell --master spark://linux01:7077 --total-executor-cores 8 --executor-memory 512M
默认情况下一个executor的内存是1G 可以指定executor的内存大小。
1,必需知道的选项:
--master 需要指定任务运行的模式
--class 指定任务运行的main方法所在的类
2, spark-submit --master spark://linux01:7077 --total-executor-cores 8 --executor-memory 512M --class spark.Demo1 /wc.jar
8,配置历史服务器: 1,如果Spark-shell 没有退出, 是可以看到正在运行的任务日志的情况:http://linux01:4040. 但是退出 Spark-shell 之后, 执行的所有任务记录全部丢失.所以我们需要配置历史服务器, 来查看以前的shell日志。
2,修改配置文件:
mv spark-defaults.conf.template spark-defaults.conf
vi spark-defaults.conf
spark.eventLog.enabled true
spark.eventLog.dir hdfs://虚拟机名字:8020/logs
vi spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://虚拟机名子:8020/logs"
3,分发配置文件到各个虚拟机上。
4,再hdfs上创建日志文件:hdfs dfs -mkdir /logs
5,启动历史服务:sbin/start-history-server.sh
6,访问WEB页面:http://hadoop102:18080
9,Spark连接数据库:1,第一步封装字段:创建样例类,把结构化数据字段写全。
2,第二部连接代码:
val sc: SparkContext = SparkUtils.getSc
val value: RDD[String] = sc.textFile("datas/Jdbc.csv")
//把数据分割后返回一个元组
val arr = value.map(line=>{
val sp: Array[String] = line.split(",")
Students1(sp(0).toInt,sp(1),sp(2).toInt)
})
arr.foreach(line=>{
val conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/do", "root", "root")
val ps=conn.prepareStatement("insert into tb_zu values (?,?,?)")
ps.setInt(1,line.id)
ps.setString(2,line.name)
ps.setInt(3,line.age)
ps.executeUpdate()
})
sc.stop()
3,第三步优化代码:
val sc: SparkContext = SparkUtils.getSc
val rdd: RDD[String] = sc.textFile("datas/Jdbc.csv")
//用样例类封装数据
val arr= rdd.map(stu=>{
val spt: Array[String] = stu.split(",")
Students1(spt(0).toInt,spt(1),spt(2).toInt)
})
//第二部:从本地加载数据到数据库中
arr.foreachPartition(arr1=>{
arr1.foreach(stu=>{
//第一步:连接数据库
val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/do", "root", "root")
val sat: PreparedStatement = conn.prepareStatement("insert into tb_zu values (?,?,?)")
sat.setInt(1,stu.id)
sat.setString(2,stu.name)
sat.setInt(3,stu.age)
sat.executeUpdate()
})
})
sc.stop()
}
10,Spark连接redis:
1,连接redis第一步创建样例类封装结构化数据字段:
2,第二部写连接代码:
val sc: SparkContext = SparkUtils.getSc
val value: RDD[String] = sc.textFile("datas/Jdbc.csv")
val arr = value.map(line=>{
val sp: Array[String] = line.split(",")
Students1(sp(0).toInt,sp(1),sp(2).toInt)
})
arr.foreach(line=>{
val jedis = new Jedis("hadoop102", 6379, 2000)
jedis.set(line.name,line.toString)
})
sc.stop()
11,mapPartitionsWithIndex:
1,将每个分区的元素和 分区号组装:
val res1 = value.map(line=>{
val sp: Array[String] = line.split(",")
Students1(sp(0).toInt,sp(1),sp(2).toInt)
})
// 每个分区执行一次
val res2: RDD[(Students1, Int)] = res1.mapPartitionsWithIndex((p, iters) => {
//将每个分区的元素和 分区号组装
iters.map((_, p))
})
res2.foreach(println)
}
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)