您可以包括spark-avro软件包,例如使用
--packages(调整版本以匹配spark安装):
bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0
并提供您自己的包装器:
from pyspark.sql.column import Column, _to_java_columndef from_avro(col, jsonFormatSchema): sc = SparkContext._active_spark_context avro = sc._jvm.org.apache.spark.sql.avro f = getattr(getattr(avro, "package$"), "MODULE$").from_avro return Column(f(_to_java_column(col), jsonFormatSchema))def to_avro(col): sc = SparkContext._active_spark_context avro = sc._jvm.org.apache.spark.sql.avro f = getattr(getattr(avro, "package$"), "MODULE$").to_avro return Column(f(_to_java_column(col)))
用法示例(从官方测试套件中采用):
from pyspark.sql.functions import col, structavro_type_struct = """{ "type": "record", "name": "struct", "fields": [ {"name": "col1", "type": "long"}, {"name": "col2", "type": "string"} ]}"""df = spark.range(10).select(struct( col("id"), col("id").cast("string").alias("id2")).alias("struct"))avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))avro_struct_df.show(3)+----------+| avro|+----------+|[00 02 30]||[02 02 31]||[04 02 32]|+----------+only showing top 3 rowsavro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)+------------------------------------------------+|from_avro(avro, struct<col1:bigint,col2:string>)|+------------------------------------------------+| [0, 0]|| [1, 1]|| [2, 2]|+------------------------------------------------+only showing top 3 rows
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)