首先创建一个scala工程叫做myhctest
因为运行会产生很多日志信息 着你喔导入一个叫log4j的文件进行消除
然后创建以下几个类和特质
首先SaveTrait
trait SaveTrait { def dfSave(indf:Dataframe, ctx:SparkSession, tableName:String):Unit } //构建方法 indf是传入的dataframe也就是传入你hive中的表 //因为spark sql是用的sparkSession中的所以后续要书写一个sparksession类 // tablenname也就是你要重新保存的表名称
hivesaveimpl
trait HiveSaveImpl extends SaveTrait { override def dfSave(indf: Dataframe, ctx:SparkSession, tableName: String): Unit = { indf.createOrReplaceTempView("hctest")//创建一个临时视图 ctx.sql("insert overwrite table "+tableName+" select * from hctest") } }
mysqlsaveimpl
trait MySqlSaveImpl extends SaveTrait { override def dfSave(indf: Dataframe, ctx:SparkSession, tableName:String)= { val prop = new Properties() prop.setProperty("user","root") prop.setProperty("password","root") prop.setProperty("driver","com.mysql.jdbc.Driver") indf.write.mode(SaveMode.Overwrite) .jdbc("jdbc:mysql://192.168.80.181:3306/mydemo" ,tableName,prop) } } dataframe 连接mysql 如果只连接hive的话这个可以不写
datasave
class DataSave { st:SaveTrait=> def save(indf:Dataframe,ctx:SparkSession,tableName:String) ={ st.dfSave(indf,ctx,tableName) } } //构建方法 indf是传入的dataframe也就是传入你hive中的表 //因为spark sql是用的sparkSession中的所以后续要书写一个sparksession类 // tablenname也就是你要重新保存的表名称
test
object Test { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]") .appName("test") .config("hive.metastore.uris","thirft://192.168.80.181:9083") .enableHiveSupport().getOrCreate() val df = spark.createDataframe(Seq((1, "zs"), (2, "ls"))) .toDF("id", "name") // (new DataSave() with MySqlSaveImpl).dfSave(df,spark,"demo") (new DataSave() with HiveSaveImpl) .dfSave(df,spark,"mydemo.demo") } }
这里可以简单的测试一下
数据处理 (未完结)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)