NumPy版本的“指数加权移动平均线”,等效于pandas.ewm()。mean()

NumPy版本的“指数加权移动平均线”,等效于pandas.ewm()。mean(),第1张

NumPy版本的“指数加权移动平均线”,等效于pandas.ewm()。mean()

更新于08/06/2019

大型输入的纯,快速和保护的解决方案

out
用于就地计算的
dtype
参数,参数,索引
order
参数

此功能等效于pandas’

ewm(adjust=False).mean()
,但速度更快。
ewm(adjust=True).mean()
(熊猫的默认设置)可以在结果开始时产生不同的值。我正在努力
adjust
为该解决方案添加功能。

输入太大时,@Divakar的答案会导致浮点精度问题。这是因为,

(1-alpha)**(n+1) -> 0
n ->inf
和时
alpha -> 1
,会导致被零除,并且
NaN
在计算中会d出值。

我想我终于破解了!

这是

numpy_ewma
功能的向量化版本,据称它可以从
@RaduS's post
-产生正确的结果

def numpy_ewma_vectorized(data, window):    alpha = 2 /(window + 1.0)    alpha_rev = 1-alpha    scale = 1/alpha_rev    n = data.shape[0]    r = np.arange(n)    scale_arr = scale**r    offset = data[0]*alpha_rev**(r+1)    pw0 = alpha*alpha_rev**(n-1)    mult = data*pw0*scale_arr    cumsums = mult.cumsum()    out = offset + cumsums*scale_arr[::-1]    return out

进一步提升

我们可以通过重复使用一些代码来进一步增强它,例如-

def numpy_ewma_vectorized_v2(data, window):    alpha = 2 /(window + 1.0)    alpha_rev = 1-alpha    n = data.shape[0]    pows = alpha_rev**(np.arange(n+1))    scale_arr = 1/pows[:-1]    offset = data[0]*pows[1:]    pw0 = alpha*alpha_rev**(n-1)    mult = data*pw0*scale_arr    cumsums = mult.cumsum()    out = offset + cumsums*scale_arr[::-1]    return out

运行时测试

让我们将这两个时间与大型数据集的相同循环函数进行比较。

In [97]: data = np.random.randint(2,9,(5000))    ...: window = 20    ...:In [98]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized(data, window))Out[98]: TrueIn [99]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized_v2(data, window))Out[99]: TrueIn [100]: %timeit numpy_ewma(data, window)100 loops, best of 3: 6.03 ms per loopIn [101]: %timeit numpy_ewma_vectorized(data, window)1000 loops, best of 3: 665 µs per loopIn [102]: %timeit numpy_ewma_vectorized_v2(data, window)1000 loops, best of 3: 357 µs per loopIn [103]: 6030/357.0Out[103]: 16.89075630252101

加速约17倍!

这是我最快的解决方案,几乎没有向量问题,几乎没有向量化。它有点复杂,但是性能却很好,尤其是对于非常庞大的输入。不使用就地计算(可以使用

out
参数来节省内存分配时间):在相当老的PC上,对于100M元素输入向量为3.62秒,对于100K元素输入向量为3.2ms,对于5000个元素输入向量为293µs
(结果会因
alpha
/
row_size
值而异)。

# tested with python3 & numpy 1.15.2import numpy as npdef ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order='C', out=None):    """    Reshapes data before calculating EWMA, then iterates once over the rows    to calculate the offset without precision issues    :param data: Input data, will be flattened.    :param alpha: scalar float in range (0,1)        The alpha parameter for the moving average.    :param row_size: int, optional        The row size to use in the computation. High row sizes need higher precision,        low values will impact performance. The optimal value depends on the        platform and the alpha being used. Higher alpha values require lower        row size. Default depends on dtype.    :param dtype: optional        Data type used for calculations. Defaults to float64 unless        data.dtype is float32, then it will use float32.    :param order: {'C', 'F', 'A'}, optional        Order to use when flattening the data. Defaults to 'C'.    :param out: ndarray, or None, optional        A location into which the result is stored. If provided, it must have        the same shape as the desired output. If not provided or `None`,        a freshly-allocated array is returned.    :return: The flattened result.    """    data = np.array(data, copy=False)    if dtype is None:        if data.dtype == np.float32: dtype = np.float32        else: dtype = np.float    else:        dtype = np.dtype(dtype)    row_size = int(row_size) if row_size is not None     else get_max_row_size(alpha, dtype)    if data.size <= row_size:        # The normal function can handle this input, use that        return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)    if data.ndim > 1:        # flatten input        data = np.reshape(data, -1, order=order)    if out is None:        out = np.empty_like(data, dtype=dtype)    else:        assert out.shape == data.shape        assert out.dtype == dtype    row_n = int(data.size // row_size)  # the number of rows to use    trailing_n = int(data.size % row_size)  # the amount of data leftover    first_offset = data[0]    if trailing_n > 0:        # set temporary results to slice view of out parameter        out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))        data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))    else:        out_main_view = out        data_main_view = data    # get all the scaled cumulative sums with 0 offset    ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype, order='C', out=out_main_view)    scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1)    last_scaling_factor = scaling_factors[-1]    # create offset array    offsets = np.empty(out_main_view.shape[0], dtype=dtype)    offsets[0] = first_offset    # iteratively calculate offset for each row    for i in range(1, out_main_view.shape[0]):        offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]    # add the offsets to the result    out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]    if trailing_n > 0:        # process trailing data in the 2nd slice of the out parameter        ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1],  dtype=dtype, order='C', out=out[-trailing_n:])    return outdef get_max_row_size(alpha, dtype=float):    assert 0. <= alpha < 1.    # This will return the maximum row size possible on     # your platform for the given dtype. I can find no impact on accuracy    # at this value on my machine.    # Might not be the optimal value for speed, which is hard to predict    # due to numpy's optimizations    # Use np.finfo(dtype).eps if you  are worried about accuracy    # and want to be extra safe.    epsilon = np.finfo(dtype).tiny    # If this produces an OverflowError, make epsilon larger    return int(np.log(epsilon)/np.log(1-alpha)) + 1

一维ewma函数:

def ewma_vectorized(data, alpha, offset=None, dtype=None, order='C', out=None):    """    Calculates the exponential moving average over a vector.    Will fail for large inputs.    :param data: Input data:param alpha: scalar float in range (0,1)        The alpha parameter for the moving average.    :param offset: optional        The offset for the moving average, scalar. Defaults to data[0].    :param dtype: optional        Data type used for calculations. Defaults to float64 unless        data.dtype is float32, then it will use float32.    :param order: {'C', 'F', 'A'}, optional        Order to use when flattening the data. Defaults to 'C'.    :param out: ndarray, or None, optional        A location into which the result is stored. If provided, it must have        the same shape as the input. If not provided or `None`,        a freshly-allocated array is returned.    """    data = np.array(data, copy=False)    if dtype is None:        if data.dtype == np.float32: dtype = np.float32        else: dtype = np.float64    else:        dtype = np.dtype(dtype)    if data.ndim > 1:        # flatten input        data = data.reshape(-1, order)    if out is None:        out = np.empty_like(data, dtype=dtype)    else:        assert out.shape == data.shape        assert out.dtype == dtype    if data.size < 1:        # empty input, return empty array        return out    if offset is None:        offset = data[0]    alpha = np.array(alpha, copy=False).astype(dtype, copy=False)    # scaling_factors -> 0 as len(data) gets large    # this leads to divide-by-zeros below    scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype),         dtype=dtype)    # create cumulative sum array    np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1],     dtype=dtype, out=out)    np.cumsum(out, dtype=dtype, out=out)    # cumsums / scaling    out /= scaling_factors[-2::-1]    if offset != 0:        offset = np.array(offset, copy=False).astype(dtype, copy=False)        # add offsets        out += offset * scaling_factors[1:]    return out

2D ewma函数:

def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order='C', out=None):    """    Calculates the exponential moving average over a given axis.    :param data: Input data, must be 1D or 2D array.    :param alpha: scalar float in range (0,1)        The alpha parameter for the moving average.    :param axis: The axis to apply the moving average on.        If axis==None, the data is flattened.    :param offset: optional        The offset for the moving average. Must be scalar or a        vector with one element for each row of data. If set to None,        defaults to the first value of each row.    :param dtype: optional        Data type used for calculations. Defaults to float64 unless        data.dtype is float32, then it will use float32.    :param order: {'C', 'F', 'A'}, optional        Order to use when flattening the data. Ignored if axis is not None.    :param out: ndarray, or None, optional        A location into which the result is stored. If provided, it must have        the same shape as the desired output. If not provided or `None`,        a freshly-allocated array is returned.    """    data = np.array(data, copy=False)    assert data.ndim <= 2    if dtype is None:        if data.dtype == np.float32: dtype = np.float32        else: dtype = np.float64    else:        dtype = np.dtype(dtype)    if out is None:        out = np.empty_like(data, dtype=dtype)    else:        assert out.shape == data.shape        assert out.dtype == dtype    if data.size < 1:        # empty input, return empty array        return out    if axis is None or data.ndim < 2:        # use 1D version        if isinstance(offset, np.ndarray): offset = offset[0]        return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order,         out=out)    assert -data.ndim <= axis < data.ndim    # create reshaped data views    out_view = out    if axis < 0:        axis = data.ndim - int(axis)    if axis == 0:        # transpose data views so columns are treated as rows        data = data.T        out_view = out_view.T    if offset is None:        # use the first element of each row as the offset        offset = np.copy(data[:, 0])    elif np.size(offset) == 1:        offset = np.reshape(offset, (1,))    alpha = np.array(alpha, copy=False).astype(dtype, copy=False)    # calculate the moving average    row_size = data.shape[1]    row_n = data.shape[0]    scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype),         dtype=dtype)    # create a scaled cumulative sum array    np.multiply(        data,        np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype),         dtype=dtype)        / scaling_factors[np.newaxis, :-1],        dtype=dtype, out=out_view    )    np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)    out_view /= scaling_factors[np.newaxis, -2::-1]    if not (np.size(offset) == 1 and offset == 0):        offset = offset.astype(dtype, copy=False)        # add the offsets to the scaled cumulative sums        out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]    return out

用法:

data_n = 100000000data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100span = 5000  # span >= 1alpha = 2/(span+1)  # for pandas` span parameter# com = 1000  # com >= 0# alpha = 1/(1+com)  # for pandas` center-of-mass parameter# halflife = 100  # halflife > 0# alpha = 1 - np.exp(np.log(0.5)/halflife)  # for pandas` half-life parameterresult = ewma_vectorized_safe(data, alpha)

只是一个提示

根据给定

alpha
窗口中数据对平均值的贡献,很容易为给定的计算“窗口大小”(技术指数平均值具有无限的“窗口”)。例如,这对于选择由于边界效应而将结果的起始部分视为不可靠的情况很有用。

def window_size(alpha, sum_proportion):    # Increases with increased sum_proportion and decreased alpha    # solve (1-alpha)**window_size = (1-sum_proportion) for window_size return int(np.log(1-sum_proportion) / np.log(1-alpha))alpha = 0.02sum_proportion = .99  # window covers 99% of contribution to the moving averagewindow = window_size(alpha, sum_proportion)  # = 227sum_proportion = .75  # window covers 75% of contribution to the moving averagewindow = window_size(alpha, sum_proportion)  # = 68

alpha = 2 / (window_size +1.0)
此线程中使用的关系(pandas的’span’选项)与上述函数(带有
sum_proportion~=0.87
)的逆函数非常近似。
alpha = 1 -np.exp(np.log(1-sum_proportion)/window_size)
更准确(pandas的“半衰期”选项等于
sum_proportion=0.5
)。

在下面的示例中,

data
代表一个连续的噪声信号。
cutoff_idx
是第一个位置,
result
其中至少99%的值取决于其中的单独值
data
(即小于1%的值取决于data
[0])。直到
cutoff_idx
最终的数据都被排除在数据之外,因为它太依赖于中的第一个值
data
,因此可能会扭曲平均值。

result = ewma_vectorized_safe(data, alpha, chunk_size)sum_proportion = .99cutoff_idx = window_size(alpha, sum_proportion)result = result[cutoff_idx:]

为了说明上面解决的问题,您可以运行几次,请注意,红线经常出现的错误的开始,在以下位置被跳过

cutoff_idx

data_n = 100000data = np.random.rand(data_n) * 100window = 1000sum_proportion = .99alpha = 1 - np.exp(np.log(1-sum_proportion)/window)result = ewma_vectorized_safe(data, alpha)cutoff_idx = window_size(alpha, sum_proportion)x = np.arange(start=0, stop=result.size)import matplotlib.pyplot as pltplt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], '-r',         x[cutoff_idx:], result[cutoff_idx:], '-b')plt.show()

请注意,

cutoff_idx==window
因为alpha是用
window_size()
函数的反函数设置的,所以具有
sum_proportion
。这类似于大熊猫的用法
ewm(span=window,min_periods=window)



欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5643595.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-16
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存