Spark核心知识点

Spark核心知识点,第1张

1,RDD的原理:

1,RDD是Spark中最基本的运算模式,它只负责处理运算逻辑,不存储数据本身,通过转化换算子链式的去处理数据,转换算子在执行程序过程中是不加载数据的(算子:scala中的map,flatmap等)在Spark中称为算子,可以处理通过它处理数据。


2,在RDD中最终的数据打印时通过调用,行动算子从前一个hashNext进行调用,最终第一个RDD1去hashNext数据,所以RDD的就是一个封装的迭代器,就是它的真面目,一次次迭代到每个RDD中,当有多次计算时,计算的逻辑会封装在每个计算步骤所在的RDD中,这样就成为了一个个RDD封装包,里面的逻辑就是通过调用上一个HashNext来进行一层层向上寻找逻辑,而每个RDD中都封装了上一个运算的逻辑,这样当一个RDD运算逻辑挂掉后,可以通过下一个运算重新计数,保证了容错性,提高了安全性能。


2,RDD的分区原理:

1,RDD可以根据分片大小,和设置的local的核数大小进行分区并行计算,如local[1],为本地计算机的一个核数,也就是一个分区可以说,local[*]为本地计算机的所有核数,运行速度有所提高。


    

 可以明显的看到时间有所提高。


3,RDD分区底层原理:

1,RDD在执行转换算子的过程中,根据文件大小,和本地设置核数进行分区,同样也可以自己指定分区个数,groupbykey(3),可以指定,同样的在RDD同样可以避免在转换算子运行过程中避免产生shuffle阶段,有利于Spark的性能提升,所以尽量不要改变分区个数,若个增加分区,会产生格外的网络传输,shuffle阶段,性能有所下滑。



 

4,RDD单节点模式:

1,在linux环境里运行,bin/spark-shell 进入。


2,用spark默认的对象进行调用命令,sc.textFile("hdfs://hadoop102:8020/data/wc.txt").flatMap(_.split("\\s+")).map((_,1)).reduceByKey(_+_).foreach(println)

3,在idea打包上传到linux上用命令调用(打包之前不要指定local),用spark自带的命令进行调用jar包,bin/spark-submit --master local[*] --class 指定类  ./wc.jar

5.spark单节点启动:

1,指定启动master节点,bin/spark-master 可以先启动master,然后再启动worder工作者,bin/spark-worder spark://hadoop102:7077(当增加了worker节点时要单节点启动)。


6,spark-shell命令:

1,spark-shell:启动和spark交互式命令,可以再启动的时候设置每个worker所需的内存大小,和所需核的大小。


2,spark-shell --master spark://linux01:7077,  默认使用集群中所有work的所有的核

3,spark-shell --master spark://linux01:7077  --total-executor-cores 8 ,指定程序所有的可用的核数。


4,spark-shell --master spark://linux01:7077  --total-executor-cores 8  --executor-memory 512M 

 默认情况下一个executor的内存是1G 可以指定executor的内存大小。


7,spark-submit命令:

1,必需知道的选项:

--master 需要指定任务运行的模式

--class 指定任务运行的main方法所在的类

2, spark-submit --master spark://linux01:7077 --total-executor-cores 8 --executor-memory 512M   --class  spark.Demo1  /wc.jar  

8,配置历史服务器:

 1,如果Spark-shell 没有退出, 是可以看到正在运行的任务日志的情况:http://linux01:4040. 但是退出 Spark-shell 之后, 执行的所有任务记录全部丢失.所以我们需要配置历史服务器, 来查看以前的shell日志。


2,修改配置文件:

mv spark-defaults.conf.template spark-defaults.conf

vi spark-defaults.conf
spark.eventLog.enabled           true
spark.eventLog.dir               hdfs://虚拟机名字:8020/logs

vi spark-env.sh
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=30 -Dspark.history.fs.logDirectory=hdfs://虚拟机名子:8020/logs"

3,分发配置文件到各个虚拟机上。


4,再hdfs上创建日志文件:hdfs dfs -mkdir /logs

5,启动历史服务:sbin/start-history-server.sh

6,访问WEB页面:http://hadoop102:18080

9,Spark连接数据库:

1,第一步封装字段:创建样例类,把结构化数据字段写全。


2,第二部连接代码:

val sc: SparkContext = SparkUtils.getSc

    val value: RDD[String] = sc.textFile("datas/Jdbc.csv")

    //把数据分割后返回一个元组
    val arr = value.map(line=>{

      val sp: Array[String] = line.split(",")

      Students1(sp(0).toInt,sp(1),sp(2).toInt)

    })

    arr.foreach(line=>{
      val conn=DriverManager.getConnection("jdbc:mysql://localhost:3306/do", "root", "root")

      val ps=conn.prepareStatement("insert  into tb_zu values (?,?,?)")

      ps.setInt(1,line.id)
      ps.setString(2,line.name)
      ps.setInt(3,line.age)
      ps.executeUpdate()

    })

    sc.stop()

 3,第三步优化代码:


    val sc: SparkContext = SparkUtils.getSc

    val rdd: RDD[String] = sc.textFile("datas/Jdbc.csv")

    //用样例类封装数据
   val arr= rdd.map(stu=>{

      val spt: Array[String] = stu.split(",")

      Students1(spt(0).toInt,spt(1),spt(2).toInt)

    })


    //第二部:从本地加载数据到数据库中
    arr.foreachPartition(arr1=>{

      arr1.foreach(stu=>{
        //第一步:连接数据库
        val conn: Connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/do", "root", "root")

        val sat: PreparedStatement = conn.prepareStatement("insert into tb_zu values (?,?,?)")

        sat.setInt(1,stu.id)
        sat.setString(2,stu.name)
        sat.setInt(3,stu.age)
        sat.executeUpdate()

      })

    })
    
    sc.stop()
    
  }
10,Spark连接redis:

1,连接redis第一步创建样例类封装结构化数据字段:

2,第二部写连接代码:

val sc: SparkContext = SparkUtils.getSc

    val value: RDD[String] = sc.textFile("datas/Jdbc.csv")

    val arr = value.map(line=>{

      val sp: Array[String] = line.split(",")

      Students1(sp(0).toInt,sp(1),sp(2).toInt)

    })

    arr.foreach(line=>{

      val jedis = new Jedis("hadoop102", 6379, 2000)

      jedis.set(line.name,line.toString)


    })


    sc.stop()
 11,mapPartitionsWithIndex:

1,将每个分区的元素和 分区号组装:

 val res1 = value.map(line=>{

      val sp: Array[String] = line.split(",")

      Students1(sp(0).toInt,sp(1),sp(2).toInt)

    })
    // 每个分区执行一次
    val res2: RDD[(Students1, Int)] = res1.mapPartitionsWithIndex((p, iters) => {
      //将每个分区的元素和 分区号组装
      iters.map((_, p))
    })

    res2.foreach(println)
  }

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/563531.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-02
下一篇 2022-04-02

发表评论

登录后才能评论

评论列表(0条)

保存