Pyspark 2.4.0,使用读取流从kafka读取avro-Python

Pyspark 2.4.0,使用读取流从kafka读取avro-Python,第1张

Pyspark 2.4.0,使用读取流从kafka读取avro-Python

您可以包括spark-avro软件包,例如使用

--packages
(调整版本以匹配spark安装):

bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0

并提供您自己的包装器:

from pyspark.sql.column import Column, _to_java_columndef from_avro(col, jsonFormatSchema):     sc = SparkContext._active_spark_context     avro = sc._jvm.org.apache.spark.sql.avro    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro    return Column(f(_to_java_column(col), jsonFormatSchema))def to_avro(col):     sc = SparkContext._active_spark_context     avro = sc._jvm.org.apache.spark.sql.avro    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro    return Column(f(_to_java_column(col)))

用法示例(从官方测试套件中采用):

from pyspark.sql.functions import col, structavro_type_struct = """{  "type": "record",  "name": "struct",  "fields": [    {"name": "col1", "type": "long"},    {"name": "col2", "type": "string"}  ]}"""df = spark.range(10).select(struct(    col("id"),    col("id").cast("string").alias("id2")).alias("struct"))avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))avro_struct_df.show(3)+----------+|      avro|+----------+|[00 02 30]||[02 02 31]||[04 02 32]|+----------+only showing top 3 rowsavro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)+------------------------------------------------+|from_avro(avro, struct<col1:bigint,col2:string>)|+------------------------------------------------+|         [0, 0]||         [1, 1]||         [2, 2]|+------------------------------------------------+only showing top 3 rows


欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5653655.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存