测试案例类
case class Player(name:String,age:Int,gender:String)1、自建rdd,并完成rdd-dataframe-dataset之间的相互转化
def method1():Unit={ //master在本地设置为local,在线上集群应该设置为yarn val spark: SparkSession = SparkSession.builder().appName("test1").master("local[*]").getOrCreate() val sparkContext: SparkContext = spark.sparkContext //自建rdd,利用List区分行条目,利用元组区分列信息,自建rdd的api makeRDD实际调用的是parallelize val rdd1: RDD[(String, Int, String)] = sparkContext.makeRDD(List(("jack", 11, "male"), ("lisa", 12, "female"))) rdd1.foreach(println) //隐式参数 import spark.implicits._ //rdd转dataframe,toDF内可以指定列名 val df1: Dataframe = rdd1.toDF("name","age","gender") df1.show() //把rdd的元组映射为案例类,然后转为dataset val ds1: Dataset[Player] = rdd1.map(x => Player(x._1, x._2, x._3)).toDS() ds1.show() //把dataframe、dataset转为rdd后逐条打印 df1.rdd.foreach(println) ds1.rdd.foreach(println) //dataframe转为dataset val ds2: Dataset[Player] = ds1.as[Player] ds2.show() //dataset转为dataframe,再次用toDF指定新的列名 val df2: Dataframe = ds1.toDF("nm", "ag", "sex") df2.show() }2、利用rdd读取本地文件,实现WordCount
def method2():Unit={ val sparkConf: SparkConf = new SparkConf().setAppName("test2").setMaster("local[*]") val sparkContext: SparkContext = new SparkContext(sparkConf) //textFile读取文件,rdd把文件按照行区分条目 val rddFile: RDD[String] = sparkContext.textFile("src/main/resources/myfile/goodjob.txt") //对rdd内每条数据split按照空格拆分为数组,对rdd数据直接flatMap拍平为单词rdd集,然后map创建元组,在对元组reduceByKey以_1为基准,相同则把_2的值相加,得到最终结果 val rddResult: RDD[(String, Int)] = rddFile.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) rddResult.foreach(println) //saveAsTextFile写入文件 rddResult.saveAsTextFile("src/main/resources/myfile/result.txt") }3、spark *** 作Hive
def method3():Unit={ //注意spark *** 作hive时,需要加上enableHiveSupport val sparkSession: SparkSession = SparkSession.builder().appName("test3").master("local[*]").enableHiveSupport().getOrCreate() //先创建hive表player和player2 sparkSession.sql("create table meta.player(name string,age int,gender string);") sparkSession.sql("create table meta.player2(name string,age int,gender string);") //新增数据到player sparkSession.sql("insert into table meta.player values('wangming',11,'male');") sparkSession.sql("insert into table meta.player values('yuki',12,'female');") sparkSession.sql("insert into table meta.player values('lili',13,'female');") //测试故意修改列名 val df: Dataframe = sparkSession.sql("select * from meta.player where age < 12").toDF("a","b","c") //创建临时会话视图 df.createOrReplaceTempView("tv1") sparkSession.sql("insert into meta.player2(name,age,gender) select a,b,c from tv1") }4、spark *** 作Mysql
def method4():Unit={ val sparkSession: SparkSession = SparkSession.builder().appName("test3").master("local[*]").enableHiveSupport().getOrCreate() val df1: Dataframe = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_test").option("user", "root").option("password", "123456").option("dbtable", "ui").load() df1.createTempView("tv1") val df2: Dataframe = sparkSession.sql("select * from tv1 where age <20") df2.write.mode("append").format("jdbc").option("url", "jdbc:mysql://localhost:3306/my_test?useUnicode=true&characterEncoding=utf8&useSSL=false&zeroDateTimeBehavior=convertToNull&jdbcCompliantTruncation=false").option("user", "root").option("password", "123456").option("dbtable","ui2").save() }
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)