更新于08/06/2019
大型输入的纯,快速和保护的解决方案
out
用于就地计算的 dtype
参数,参数,索引order
参数
此功能等效于pandas’
ewm(adjust=False).mean(),但速度更快。
ewm(adjust=True).mean()(熊猫的默认设置)可以在结果开始时产生不同的值。我正在努力
adjust为该解决方案添加功能。
当输入太大时,@Divakar的答案会导致浮点精度问题。这是因为,
(1-alpha)**(n+1) -> 0当
n ->inf和时
alpha -> 1,会导致被零除,并且
NaN在计算中会d出值。
我想我终于破解了!
这是
numpy_ewma功能的向量化版本,据称它可以从
@RaduS's post-产生正确的结果
def numpy_ewma_vectorized(data, window): alpha = 2 /(window + 1.0) alpha_rev = 1-alpha scale = 1/alpha_rev n = data.shape[0] r = np.arange(n) scale_arr = scale**r offset = data[0]*alpha_rev**(r+1) pw0 = alpha*alpha_rev**(n-1) mult = data*pw0*scale_arr cumsums = mult.cumsum() out = offset + cumsums*scale_arr[::-1] return out
进一步提升
我们可以通过重复使用一些代码来进一步增强它,例如-
def numpy_ewma_vectorized_v2(data, window): alpha = 2 /(window + 1.0) alpha_rev = 1-alpha n = data.shape[0] pows = alpha_rev**(np.arange(n+1)) scale_arr = 1/pows[:-1] offset = data[0]*pows[1:] pw0 = alpha*alpha_rev**(n-1) mult = data*pw0*scale_arr cumsums = mult.cumsum() out = offset + cumsums*scale_arr[::-1] return out
运行时测试
让我们将这两个时间与大型数据集的相同循环函数进行比较。
In [97]: data = np.random.randint(2,9,(5000)) ...: window = 20 ...:In [98]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized(data, window))Out[98]: TrueIn [99]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized_v2(data, window))Out[99]: TrueIn [100]: %timeit numpy_ewma(data, window)100 loops, best of 3: 6.03 ms per loopIn [101]: %timeit numpy_ewma_vectorized(data, window)1000 loops, best of 3: 665 µs per loopIn [102]: %timeit numpy_ewma_vectorized_v2(data, window)1000 loops, best of 3: 357 µs per loopIn [103]: 6030/357.0Out[103]: 16.89075630252101
加速约17倍!
这是我最快的解决方案,几乎没有向量问题,几乎没有向量化。它有点复杂,但是性能却很好,尤其是对于非常庞大的输入。不使用就地计算(可以使用
out参数来节省内存分配时间):在相当老的PC上,对于100M元素输入向量为3.62秒,对于100K元素输入向量为3.2ms,对于5000个元素输入向量为293µs
(结果会因
alpha/
row_size值而异)。
# tested with python3 & numpy 1.15.2import numpy as npdef ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order='C', out=None): """ Reshapes data before calculating EWMA, then iterates once over the rows to calculate the offset without precision issues :param data: Input data, will be flattened. :param alpha: scalar float in range (0,1) The alpha parameter for the moving average. :param row_size: int, optional The row size to use in the computation. High row sizes need higher precision, low values will impact performance. The optimal value depends on the platform and the alpha being used. Higher alpha values require lower row size. Default depends on dtype. :param dtype: optional Data type used for calculations. Defaults to float64 unless data.dtype is float32, then it will use float32. :param order: {'C', 'F', 'A'}, optional Order to use when flattening the data. Defaults to 'C'. :param out: ndarray, or None, optional A location into which the result is stored. If provided, it must have the same shape as the desired output. If not provided or `None`, a freshly-allocated array is returned. :return: The flattened result. """ data = np.array(data, copy=False) if dtype is None: if data.dtype == np.float32: dtype = np.float32 else: dtype = np.float else: dtype = np.dtype(dtype) row_size = int(row_size) if row_size is not None else get_max_row_size(alpha, dtype) if data.size <= row_size: # The normal function can handle this input, use that return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out) if data.ndim > 1: # flatten input data = np.reshape(data, -1, order=order) if out is None: out = np.empty_like(data, dtype=dtype) else: assert out.shape == data.shape assert out.dtype == dtype row_n = int(data.size // row_size) # the number of rows to use trailing_n = int(data.size % row_size) # the amount of data leftover first_offset = data[0] if trailing_n > 0: # set temporary results to slice view of out parameter out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size)) data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size)) else: out_main_view = out data_main_view = data # get all the scaled cumulative sums with 0 offset ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype, order='C', out=out_main_view) scaling_factors = (1 - alpha) ** np.arange(1, row_size + 1) last_scaling_factor = scaling_factors[-1] # create offset array offsets = np.empty(out_main_view.shape[0], dtype=dtype) offsets[0] = first_offset # iteratively calculate offset for each row for i in range(1, out_main_view.shape[0]): offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1] # add the offsets to the result out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :] if trailing_n > 0: # process trailing data in the 2nd slice of the out parameter ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1], dtype=dtype, order='C', out=out[-trailing_n:]) return outdef get_max_row_size(alpha, dtype=float): assert 0. <= alpha < 1. # This will return the maximum row size possible on # your platform for the given dtype. I can find no impact on accuracy # at this value on my machine. # Might not be the optimal value for speed, which is hard to predict # due to numpy's optimizations # Use np.finfo(dtype).eps if you are worried about accuracy # and want to be extra safe. epsilon = np.finfo(dtype).tiny # If this produces an OverflowError, make epsilon larger return int(np.log(epsilon)/np.log(1-alpha)) + 1
一维ewma函数:
def ewma_vectorized(data, alpha, offset=None, dtype=None, order='C', out=None): """ Calculates the exponential moving average over a vector. Will fail for large inputs. :param data: Input data:param alpha: scalar float in range (0,1) The alpha parameter for the moving average. :param offset: optional The offset for the moving average, scalar. Defaults to data[0]. :param dtype: optional Data type used for calculations. Defaults to float64 unless data.dtype is float32, then it will use float32. :param order: {'C', 'F', 'A'}, optional Order to use when flattening the data. Defaults to 'C'. :param out: ndarray, or None, optional A location into which the result is stored. If provided, it must have the same shape as the input. If not provided or `None`, a freshly-allocated array is returned. """ data = np.array(data, copy=False) if dtype is None: if data.dtype == np.float32: dtype = np.float32 else: dtype = np.float64 else: dtype = np.dtype(dtype) if data.ndim > 1: # flatten input data = data.reshape(-1, order) if out is None: out = np.empty_like(data, dtype=dtype) else: assert out.shape == data.shape assert out.dtype == dtype if data.size < 1: # empty input, return empty array return out if offset is None: offset = data[0] alpha = np.array(alpha, copy=False).astype(dtype, copy=False) # scaling_factors -> 0 as len(data) gets large # this leads to divide-by-zeros below scaling_factors = np.power(1. - alpha, np.arange(data.size + 1, dtype=dtype), dtype=dtype) # create cumulative sum array np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1], dtype=dtype, out=out) np.cumsum(out, dtype=dtype, out=out) # cumsums / scaling out /= scaling_factors[-2::-1] if offset != 0: offset = np.array(offset, copy=False).astype(dtype, copy=False) # add offsets out += offset * scaling_factors[1:] return out
2D ewma函数:
def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order='C', out=None): """ Calculates the exponential moving average over a given axis. :param data: Input data, must be 1D or 2D array. :param alpha: scalar float in range (0,1) The alpha parameter for the moving average. :param axis: The axis to apply the moving average on. If axis==None, the data is flattened. :param offset: optional The offset for the moving average. Must be scalar or a vector with one element for each row of data. If set to None, defaults to the first value of each row. :param dtype: optional Data type used for calculations. Defaults to float64 unless data.dtype is float32, then it will use float32. :param order: {'C', 'F', 'A'}, optional Order to use when flattening the data. Ignored if axis is not None. :param out: ndarray, or None, optional A location into which the result is stored. If provided, it must have the same shape as the desired output. If not provided or `None`, a freshly-allocated array is returned. """ data = np.array(data, copy=False) assert data.ndim <= 2 if dtype is None: if data.dtype == np.float32: dtype = np.float32 else: dtype = np.float64 else: dtype = np.dtype(dtype) if out is None: out = np.empty_like(data, dtype=dtype) else: assert out.shape == data.shape assert out.dtype == dtype if data.size < 1: # empty input, return empty array return out if axis is None or data.ndim < 2: # use 1D version if isinstance(offset, np.ndarray): offset = offset[0] return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order, out=out) assert -data.ndim <= axis < data.ndim # create reshaped data views out_view = out if axis < 0: axis = data.ndim - int(axis) if axis == 0: # transpose data views so columns are treated as rows data = data.T out_view = out_view.T if offset is None: # use the first element of each row as the offset offset = np.copy(data[:, 0]) elif np.size(offset) == 1: offset = np.reshape(offset, (1,)) alpha = np.array(alpha, copy=False).astype(dtype, copy=False) # calculate the moving average row_size = data.shape[1] row_n = data.shape[0] scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype), dtype=dtype) # create a scaled cumulative sum array np.multiply( data, np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype), dtype=dtype) / scaling_factors[np.newaxis, :-1], dtype=dtype, out=out_view ) np.cumsum(out_view, axis=1, dtype=dtype, out=out_view) out_view /= scaling_factors[np.newaxis, -2::-1] if not (np.size(offset) == 1 and offset == 0): offset = offset.astype(dtype, copy=False) # add the offsets to the scaled cumulative sums out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:] return out
用法:
data_n = 100000000data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100span = 5000 # span >= 1alpha = 2/(span+1) # for pandas` span parameter# com = 1000 # com >= 0# alpha = 1/(1+com) # for pandas` center-of-mass parameter# halflife = 100 # halflife > 0# alpha = 1 - np.exp(np.log(0.5)/halflife) # for pandas` half-life parameterresult = ewma_vectorized_safe(data, alpha)
只是一个提示
根据给定
alpha窗口中数据对平均值的贡献,很容易为给定的计算“窗口大小”(技术指数平均值具有无限的“窗口”)。例如,这对于选择由于边界效应而将结果的起始部分视为不可靠的情况很有用。
def window_size(alpha, sum_proportion): # Increases with increased sum_proportion and decreased alpha # solve (1-alpha)**window_size = (1-sum_proportion) for window_size return int(np.log(1-sum_proportion) / np.log(1-alpha))alpha = 0.02sum_proportion = .99 # window covers 99% of contribution to the moving averagewindow = window_size(alpha, sum_proportion) # = 227sum_proportion = .75 # window covers 75% of contribution to the moving averagewindow = window_size(alpha, sum_proportion) # = 68
alpha = 2 / (window_size +1.0)此线程中使用的关系(pandas的’span’选项)与上述函数(带有
sum_proportion~=0.87)的逆函数非常近似。
alpha = 1 -np.exp(np.log(1-sum_proportion)/window_size)更准确(pandas的“半衰期”选项等于
sum_proportion=0.5)。
在下面的示例中,
data代表一个连续的噪声信号。
cutoff_idx是第一个位置,
result其中至少99%的值取决于其中的单独值
data(即小于1%的值取决于data
[0])。直到
cutoff_idx最终的数据都被排除在数据之外,因为它太依赖于中的第一个值
data,因此可能会扭曲平均值。
result = ewma_vectorized_safe(data, alpha, chunk_size)sum_proportion = .99cutoff_idx = window_size(alpha, sum_proportion)result = result[cutoff_idx:]
为了说明上面解决的问题,您可以运行几次,请注意,红线经常出现的错误的开始,在以下位置被跳过
cutoff_idx:
data_n = 100000data = np.random.rand(data_n) * 100window = 1000sum_proportion = .99alpha = 1 - np.exp(np.log(1-sum_proportion)/window)result = ewma_vectorized_safe(data, alpha)cutoff_idx = window_size(alpha, sum_proportion)x = np.arange(start=0, stop=result.size)import matplotlib.pyplot as pltplt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], '-r', x[cutoff_idx:], result[cutoff_idx:], '-b')plt.show()
请注意,
cutoff_idx==window因为alpha是用
window_size()函数的反函数设置的,所以具有
sum_proportion。这类似于大熊猫的用法
ewm(span=window,min_periods=window)。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)