高效的字符串后缀检测

高效的字符串后缀检测,第1张

高效的字符串后缀检测

让我们扩展一下

domains
以获得更好的覆盖范围:

domains = spark.createDataframe([    "something.google.com",  # OK    "something.google.com.somethingelse.ac.uk", # NOT OK     "something.good.com.cy", # OK     "something.good.com.cy.mal.org",  # NOT OK    "something.bad.com.cy",  # NOT OK    "omgalsogood.com.cy", # NOT OK    "good.com.cy",   # OK     "sogood.example.com",  # OK Match for shorter redundant, mismatch on longer    "notsoreal.googleecom" # NOT OK], "string").toDF('domains')good_domains =  spark.createDataframe([    "google.com", "good.com.cy", "alsogood.com.cy",    "good.example.com", "example.com"  # Redundant case], "string").toDF('gooddomains')

现在… 仅使用Spark SQL原语的简单解决方案
是稍微简化当前的方法。既然您已经声明可以安全地假定它们是有效的公共域,那么我们可以定义如下函数:

from pyspark.sql.functions import col, regexp_extractdef suffix(c):     return regexp_extract(c, "([^.]+\.[^.]+$)", 1)

提取顶级域和第一级子域:

domains_with_suffix = (domains    .withColumn("suffix", suffix("domains"))    .alias("domains"))good_domains_with_suffix = (good_domains    .withColumn("suffix", suffix("gooddomains"))    .alias("good_domains"))domains_with_suffix.show()+--------------------+--------------------+|  domains|   suffix|+--------------------+--------------------+|something.google.com|          google.com||something.google....|    ac.uk||something.good.co...|   com.cy||something.good.co...|  mal.org||something.bad.com.cy|   com.cy||  omgalsogood.com.cy|   com.cy||         good.com.cy|   com.cy||  sogood.example.com|         example.com||notsoreal.googleecom|notsoreal.googleecom|+--------------------+--------------------+

现在我们可以外部联接:

from pyspark.sql.functions import (    col, concat, lit, monotonically_increasing_id, sum as sum_)candidates = (domains_with_suffix    .join(        good_domains_with_suffix,        col("domains.suffix") == col("good_domains.suffix"),         "left"))

并过滤结果:

is_good_expr = (    col("good_domains.suffix").isNotNull() &      # Match on suffix    (        # Exact match        (col("domains") == col("gooddomains")) |        # Subdomain match        col("domains").endswith(concat(lit("."), col("gooddomains")))    ))not_good_domains = (candidates    .groupBy("domains")  # .groupBy("suffix", "domains") - see the discussion    .agg((sum_(is_good_expr.cast("integer")) > 0).alias("any_good"))    .filter(~col("any_good"))    .drop("any_good"))not_good_domains.show(truncate=False)+----------------------------------------+|domains|+----------------------------------------+|omgalsogood.com.cy||notsoreal.googleecom         ||something.good.com.cy.mal.org||something.google.com.somethingelse.ac.uk||something.bad.com.cy         |+----------------------------------------+

这比直接与所需

LIKE
的笛卡尔积更好,但对蛮力而言并不令人满意,在最坏的情况下,需要两次混洗-
一种用于
join
(如果
good_domains
足够小,则可以跳过
broadcasted
),另一种用于
group_by
+
agg

不幸的是,Spark SQL不允许自定义分区程序仅对两者使用一个改组(但是,可以使用RDD
API中的复合键,并且优化器还不够聪明,无法进行优化

join(_,"key1")
.groupBy("key1", _)

如果您可以接受一些错误的否定,那么您就 可以提高概率
。首先,我们来建立概率计数器(此处在的

bounter
帮助下使用
toolz

from pyspark.sql.functions import concat_ws, reverse, splitfrom bounter import bounterfrom toolz.curried import identity, partition_all# This is only for testing on toy examples, in practice use more realistic valuesize_mb = 20      chunk_size = 100def reverse_domain(c):    return concat_ws(".", reverse(split(c, "\.")))def merge(acc, xs):    acc.update(xs)    return acccounter = sc.broadcast((good_domains    .select(reverse_domain("gooddomains"))    .rdd.flatMap(identity)    # Chunk data into groups so we reduce the number of update calls    .mapPartitions(partition_all(chunk_size))    # Use tree aggregate to reduce pressure on the driver,     # when number of partitions is large*    # You can use depth parameter for further tuning    .treeAggregate(bounter(need_iteration=False, size_mb=size_mb), merge, merge)))

接下来定义这样的用户定义函数功能

from pyspark.sql.functions import pandas_udf, PandasUDFTypefrom toolz import accumulatedef is_good_counter(counter):    def is_good_(x):        return any( x in counter.value  for x in accumulate(lambda x, y: "{}.{}".format(x, y), x.split("."))        )    @pandas_udf("boolean", PandasUDFType.SCALAR)    def _(xs):        return xs.apply(is_good_)    return _

并过滤

domains

domains.filter(    ~is_good_counter(counter)(reverse_domain("domains"))).show(truncate=False)+----------------------------------------+|domains|+----------------------------------------+|something.google.com.somethingelse.ac.uk||something.good.com.cy.mal.org||something.bad.com.cy         ||omgalsogood.com.cy||notsoreal.googleecom         |+----------------------------------------+

在Scala中, 可以使用

bloomFilter

import org.apache.spark.sql.Columnimport org.apache.spark.sql.functions._import org.apache.spark.util.sketch.BloomFilterdef reverseDomain(c: Column) = concat_ws(".", reverse(split(c, "\.")))val checker = good_domains.stat.bloomFilter(  // Adjust values depending on the data  reverseDomain($"gooddomains"), 1000, 0.001 )def isGood(checker: BloomFilter) = udf((s: String) =>   s.split('.').toStream.scanLeft("") {    case ("", x) => x    case (acc, x) => s"${acc}.${x}"}.tail.exists(checker mightContain _))domains.filter(!isGood(checker)(reverseDomain($"domains"))).show(false)+----------------------------------------+|domains|+----------------------------------------+|something.google.com.somethingelse.ac.uk||something.good.com.cy.mal.org||something.bad.com.cy         ||omgalsogood.com.cy||notsoreal.googleecom         |+----------------------------------------+

并且如果需要的话,应该不难从Python调用此类代码。

由于近似的性质,这可能仍不能完全令人满意。如果您需要精确的结果,则可以尝试 利用数据的冗余性质
,例如使用trie(此处使用

datrie
实现)。

如果

good_domains
相对较小,则可以按照与概率变体类似的方式创建单个模型:

import stringimport datriedef seq_op(acc, x):    acc[x] = True    return accdef comb_op(acc1, acc2):    acc1.update(acc2)    return acc1trie = sc.broadcast((good_domains    .select(reverse_domain("gooddomains"))    .rdd.flatMap(identity)    # string.printable is a bit excessive if you need standard domain    # and not enough if you allow internationalized domain names.    # In the latter case you'll have to adjust the `alphabet`    # or use different implementation of trie.    .treeAggregate(datrie.Trie(string.printable), seq_op, comb_op)))

定义用户定义的函数:

def is_good_trie(trie):    def is_good_(x):        if not x: return False        else: return any(     x == match or x[len(match)] == "."     for match in trie.value.iter_prefixes(x) )    @pandas_udf("boolean", PandasUDFType.SCALAR)    def _(xs):        return xs.apply(is_good_)    return _

并将其应用于数据:

domains.filter(    ~is_good_trie(trie)(reverse_domain("domains"))).show(truncate=False)+----------------------------------------+|domains|+----------------------------------------+|something.google.com.somethingelse.ac.uk||something.good.com.cy.mal.org||something.bad.com.cy         ||omgalsogood.com.cy||notsoreal.googleecom         |+----------------------------------------+

这种特定的方法在假设所有条件都

good_domains
可以压缩成单个特里的情况下工作,但可以轻松扩展以处理不满足此假设的情况。例如,您可以为每个顶级域或后缀(在天真的解决方案中定义)构建一个trie

(good_domains    .select(suffix("gooddomains"), reverse_domain("gooddomains"))    .rdd    .aggregateByKey(datrie.Trie(string.printable), seq_op, comb_op))

然后,可以从序列化版本中按需加载模型,也可以使用

RDD
*** 作。

可以根据数据,业务需求(例如近似解决方案中的假负容忍度)和可用资源(驱动程序内存,执行程序内存,基数

suffixes
,对分布式POSIX兼容分布式文件系统的访问)来进一步调整这两种非本机方法,
等等)。在将其应用于
Dataframes
和之间时
RDDs
(在内存使用,通信和序列化开销方面),在进行选择时还需要考虑一些权衡。



欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5646373.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存