背景:每天批量同步800W左右数据到redis,需覆盖之前的数据
maven 引用:
com.redislabs spark-redis2.3.1-RC1 redis.clients jedis3.1.0
方式一: 大数据量导致redis集群cpu100%
case class Weather(adcode: Int, weather: String, temp:Int, humidity:Int) def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("SparkReadRedis") .master("local[*]") .config("spark.redis.host", 连接地址) .config("spark.redis.port", 端口,默认6379) .config("spark.redis.auth", 密码) //指定redis密码 .config("spark.redis.db", 库名) //指定redis库,库名1 .getOrCreate() val weatherSeq = Seq(Weather(111100, "雨",99,80), Weather(111101, "晴",-5,80)) val data = spark.createDataframe(weatherSeq) data.write .format("org.apache.spark.sql.redis") .option("table", "weather") .option("key.column", "aa") .mode(SaveMode.Overwrite) .save() val loadedDf = spark.read .format("org.apache.spark.sql.redis") .option("table", "weather") .option("key.column", "aa") .load() loadedDf.show(false) }
方式二
可以使用 foreachPartition 设置 每千条数据塞到pipeline,然后提交一次,然后Thread.sleep(1000) 等待一秒钟,可以大幅度降低redis集群的cpu
val data = new util.HashMap[String, String]() val jedis = new Jedis(redis_host, redis_port.toInt) jedis.auth(redis_auth) //密码 jedis.select(1) //库 val pipeline = jedis.pipelined() data.put("grade", "aa") //map塞数据 pipeline.hmset(key, data) //管道塞数据 pipeline.expire(key, 3600 * 50) //设置过期时间 pipeline.sync() //同步 pipeline.close() jedis.close()
自此未再深入研究
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)