DataFrame到RDD [(String,String)]转换

DataFrame到RDD [(String,String)]转换,第1张

DataFrame到RDD [(String,String)]转换

如果要将行映射到其他RDD元素,可以使用df.map(row => …)将数据帧转换为RDD。

例如:

val df = Seq(("table1",432),      ("table2",567),      ("table3",987),      ("table1",789)).      toDF("tablename", "Code").toDF()    df.show()    +---------+----+|tablename|Code|+---------+----+|   table1| 432||   table2| 567||   table3| 987||   table1| 789|+---------+----+    val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]    OR    val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]

有关 AnalysisException,
参阅https://community.hortonworks.com/questions/106500/error-in-spark-
streaming-kafka-integration-structu.html
:必须使用writeStream.start()执行带有流源的查询

您需要等待使用查询终止查询。 awaitTermination() 防止查询活动时退出进程。



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5010154.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-11-14
下一篇 2022-11-14

发表评论

登录后才能评论

评论列表(0条)

保存