如果要将行映射到其他RDD元素,可以使用df.map(row => …)将数据帧转换为RDD。
例如:
val df = Seq(("table1",432), ("table2",567), ("table3",987), ("table1",789)). toDF("tablename", "Code").toDF() df.show() +---------+----+|tablename|Code|+---------+----+| table1| 432|| table2| 567|| table3| 987|| table1| 789|+---------+----+ val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)] OR val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd //Type: RDD[(String,String)]
有关 AnalysisException,
请参阅https://community.hortonworks.com/questions/106500/error-in-spark-
streaming-kafka-integration-structu.html
:必须使用writeStream.start()执行带有流源的查询
您需要等待使用查询终止查询。 awaitTermination() 防止查询活动时退出进程。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)