记一次spark写入redis

记一次spark写入redis,第1张

记一次spark写入redis jedis api中文文档_CYY941027的博客-CSDN博客_jedis文档

背景:每天批量同步800W左右数据到redis,需覆盖之前的数据

 maven 引用:


    com.redislabs
    spark-redis
    2.3.1-RC1



    redis.clients
    jedis
    3.1.0

方式一: 大数据量导致redis集群cpu100%

   case class Weather(adcode: Int, weather: String, temp:Int, humidity:Int)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("SparkReadRedis")
      .master("local[*]")
      .config("spark.redis.host", 连接地址)
      .config("spark.redis.port", 端口,默认6379)
      .config("spark.redis.auth", 密码) //指定redis密码
      .config("spark.redis.db", 库名) //指定redis库,库名1
      .getOrCreate()
 
    val weatherSeq =  Seq(Weather(111100, "雨",99,80), Weather(111101, "晴",-5,80))
    val data = spark.createDataframe(weatherSeq)
 
    data.write
      .format("org.apache.spark.sql.redis")
      .option("table", "weather")
      .option("key.column", "aa")
      .mode(SaveMode.Overwrite)
      .save()



    val loadedDf = spark.read
      .format("org.apache.spark.sql.redis")
      .option("table", "weather")
      .option("key.column", "aa")
      .load()
    loadedDf.show(false)


  }
 

方式二

可以使用 foreachPartition  设置 每千条数据塞到pipeline,然后提交一次,然后Thread.sleep(1000) 等待一秒钟,可以大幅度降低redis集群的cpu

 val data = new util.HashMap[String, String]()
 
val jedis = new Jedis(redis_host, redis_port.toInt)
    jedis.auth(redis_auth) //密码
    jedis.select(1) //库
    
val pipeline = jedis.pipelined()

    data.put("grade", "aa")  //map塞数据

   pipeline.hmset(key, data)  //管道塞数据
   pipeline.expire(key, 3600 * 50) //设置过期时间
   pipeline.sync()   //同步
   pipeline.close()
   jedis.close()
 

自此未再深入研究

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5681291.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存