这里的问题是,我不知道一种方法,可以在不首先将值收集到列表中的情况下,告诉spark将当前行与Dataframe中的其他行进行比较。
UDF是不是一种选择,在这里(你不能引用分布
Dataframe在
udf)你的逻辑的直接翻译是笛卡尔乘积和汇总:
from pyspark.sql.functions import levenshtein, colresult = (spark_df.alias("l") .crossJoin(spark_df.alias("r")) .where(levenshtein("l.word", "r.word") < 2) .where(col("l.word") != col("r.word")) .groupBy("l.id", "l.word") .count())
但实际上,您应该尝试做一些更有效的事情:ApacheSpark中的有效字符串匹配
根据问题,您应尝试查找其他近似值以避免完整的笛卡尔积。
如果要保留不匹配的数据,则可以跳过一个过滤器:
(spark_df.alias("l") .crossJoin(spark_df.alias("r")) .where(levenshtein("l.word", "r.word") < 2) .groupBy("l.id", "l.word") .count() .withColumn("count", col("count") - 1))
或(速度较慢,但通用性更高),请参考加入:
(spark_df .select("id", "word") .distinct() .join(result, ["id", "word"], "left") .na.fill(0))
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)