GitHub - RedisLabs/spark-redis: A connector for Spark that allows reading and writing to/from Redis cluster
大家给个星星o
pom下载https://mvnrepository.com/artifact/com.redislabs/spark-redis
import org.apache.spark.sql._ import com.redislabs.provider.redis._ object scala_redis { case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("redis-df") .master("local[1]") .config("spark.redis.host", "192.168.9.88") .config("spark.redis.port", 6379) .config("spark.ui.port","30101") //解决报错 SparkUI) to an available port or increasing spark.port.maxRetries .getOrCreate() val personSeq = Seq(Person("John", 30), Person("Peter", 45)) val df = spark.createDataframe(personSeq) df.write .format("org.apache.spark.sql.redis") .option("table", "person") .save() spark.stop() } }redis结果
key中person后随机数类似UUID,主要是为了不让重复.
数据读取import org.apache.spark.sql._ import com.redislabs.provider.redis._ object scala_redis { case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("redis-df") .master("local[1]") .config("spark.redis.host", "192.168.9.88") .config("spark.redis.port", 6379) .config("spark.ui.port","30101") //解决报错 SparkUI) to an available port or increasing spark.port.maxRetries .getOrCreate() val df = spark.read .format("org.apache.spark.sql.redis") .option("table", "person") .load() df.show() spark.stop() } }
返回:
sink支持 append overwrite等模式测试 append模式
import org.apache.spark.sql._ import com.redislabs.provider.redis._ import org.apache.spark.sql.streaming.OutputMode._ object scala_redis { case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("redis-df") .master("local[1]") .config("spark.redis.host", "192.168.9.88") .config("spark.redis.port", 6379) .config("spark.ui.port","30101") //解决报错 SparkUI) to an available port or increasing spark.port.maxRetries .getOrCreate() val personSeq = Seq(Person("Thomas", 2),Person("Peter", 100)) val df = spark.createDataframe(personSeq) df.write .format("org.apache.spark.sql.redis") .option("table", "person") .mode(SaveMode.Append) // 这里支持spark sink的多个模式 .save() spark.stop() } }
注意: 这里peter相当于写了两次. 那么redis怎么存储以及查询结果是什么?
redis key存储的情况:
可以看出来. 因为前面用了随机数,所以相同name
api查询结果:
两个peter都在
测试一下overwrite模式
import org.apache.spark.sql._ import com.redislabs.provider.redis._ import org.apache.spark.sql.streaming.OutputMode._ object scala_redis { case class Person(name: String, age: Int) def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName("redis-df") .master("local[1]") .config("spark.redis.host", "192.168.9.88") .config("spark.redis.port", 6379) .config("spark.ui.port","30101") //解决报错 SparkUI) to an available port or increasing spark.port.maxRetries .getOrCreate() val personSeq = Seq(Person("Thomas", 11),Person("Peter", 22)) val df = spark.createDataframe(personSeq) df.write .format("org.apache.spark.sql.redis") .option("table", "person") .mode(SaveMode.Overwrite) // 这里支持spark sink的多个模式 .save() spark.stop() } }
结果:
貌似并不支持update
从他存储结构也能看出来,支持update太困难...
schema
上述所有代码来自github项目 readme
问题执行报错注意看spark版本对应
scala版本对应
没有jedis包冲突
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)