查询时返回的StreamingQuery() 对象可以对查询进行监控,对象包括recentProgress,lastProgress,status等多个属性。
代码举例
#!/usr/bin/env python3 from pprint import pprint import time from pyspark.sql import SparkSession from pyspark.sql.functions import split from pyspark.sql.functions import explode if __name__ == "__main__": spark = SparkSession .builder .appName("StructuredNetworkWordCount") .getOrCreate() spark.sparkContext.setLogLevel('WARN') lines = spark .readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() words = lines.select( explode( split(lines.value, " ") ).alias("word") ) wordCounts = words.groupBy("word").count() query = wordCounts .writeStream .outputMode("complete") .format("console") .queryName('write_to_console') .trigger(processingTime="8 seconds") .start() while True: if query.lastProgress: if query.lastProgress['numInputRows'] > 0: pprint(query.lastProgress) pprint(query.status) time.sleep(5)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)