这不是一个很好的解决方案,但是您可以调整我对类似Scala问题的答案。让我们从一个示例数据开始:
import numpy as npnp.random.seed(323)keys = ["foo"] * 50 + ["bar"] * 50values = ( np.vstack([np.repeat(-10, 500), np.repeat(10, 500)]).reshape(100, -1) + np.random.rand(100, 10))rdd = sc.parallelize(zip(keys, values))
不幸的
MultivariateStatisticalSummary是,它只是围绕JVM模型的包装,并且它并不是真正的Python友好。幸运的是,有了NumPy数组,我们可以使用standard
StatCounter通过键来计算统计信息:
from pyspark.statcounter import StatCounterdef compute_stats(rdd): return rdd.aggregateByKey( StatCounter(), StatCounter.merge, StatCounter.mergeStats ).collectAsMap()
最后我们可以
map归一化:
def scale(rdd, stats): def scale_(kv): k, v = kv return (v - stats[k].mean()) / stats[k].stdev() return rdd.map(scale_)scaled = scale(rdd, compute_stats(rdd))scaled.first()## array([ 1.59879188, -1.66816084, 1.38546532, 1.76122047, 1.48132643,## 0.01512487, 1.49336769, 0.47765982, -1.04271866, 1.55288814])
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)