bass 找拐点

bass 找拐点,第1张

bass 找拐点

1. 数据源

#data
data = spark.sql('''select * from data_pre.card_member_weeks''')
data_df = data.toPandas()
data_df['amt'] = data_df['amt'].apply(pd.to_numeric)

week1_data = data_df[data_df['type']=='week1'].sort_values(by = "weeks").reset_index().drop(index=[0])

2. func

from scipy.optimize import curve_fit


def func(x, m, p, q):
    return (m * (p + q) ** 2 / p) * (np.e ** ((-p - q) * x) / (1 + (q / p) * np.e ** ((-p - q) * x)) ** 2)


def find_turning_points(x, y):
    turning_points_x = []
    s1 = (x[1] - x[0]) * (y[2] - y[0]) + (y[0] - y[1]) * (x[2] - x[0])
    for i in range(2, len(x) - 1):
        s2 = (x[i] - x[i - 1]) * (y[i + 1] - y[i - 1]) + (y[i - 1] - y[i]) * (x[i + 1] - x[i - 1])
        if s1 * s2 < 0:
            turning_points_x.append(i)
        s1 = s2
    return turning_points_x

def bass_smooth_and_cal_turning_points(week_sl_df):
    res = pd.Dataframe([[-1, -1]], columns=['成长期-成熟期', '成熟期-衰退期'])
    
    week_sl = list(week_sl_df['amt'].values)
    x = [i for i in range(1, len(week_sl) + 1)]
    x = np.array(x)
    y = np.array(week_sl)
    try:
        popt, pcov = curve_fit(func, x, y)
        m = popt[0]
        p = popt[1]
        q = popt[2]
        yvals = func(x, m, p, q)
        yvals = np.array([max(0, i) for i in yvals])
    except:
        yvals = y
        
    if len(yvals) >= 4:
        x_turning_points = find_turning_points(x, yvals)
        x_turning_points = [i + 1 for i in x_turning_points]
#         if len(x_turning_points) == 2:
#             res['成长期-成熟期'] = x_turning_points[0]
#             res['成熟期-衰退期'] = x_turning_points[1]
    return x_turning_points


sku_new_label = whole_train_sp_week_sl.groupby(['sphh']).apply(lambda x: bass_smooth_and_cal_turning_points(x[['sphh', 'week_sl']])).reset_index().drop(['level_1'], axis=1)
sku_new_label = sku_new_label[(sku_new_label['成长期-成熟期']!=-1) & (sku_new_label['成熟期-衰退期']!=-1)].reset_index(drop=True)

bass_label = week3_data.apply(lambda x: bass_smooth_and_cal_turning_points(x[['type', 'amt']]))

sku_new_label = whole_train_sp_week_sl.groupby(['sphh']).apply(lambda x: bass_smooth_and_cal_turning_points(x[['sphh', 'week_sl']])).reset_index().drop(['level_1'], axis=1)
sku_new_label = sku_new_label[(sku_new_label['成长期-成熟期']!=-1) & (sku_new_label['成熟期-衰退期']!=-1)].reset_index(drop=True)

bass_label = week3_data.apply(lambda x: bass_smooth_and_cal_turning_points(x[['type', 'amt']]))


from kneed import KneeLocator
def knee_point_search(x, y):
    
    # 转为list以支持负号索引
    x, y = x.tolist(), y.tolist()
    output_knees = []
    for curve in ['convex', 'concave']:
        for direction in ['increasing', 'decreasing']:
            model = KneeLocator(x=x, y=y, curve=curve, direction=direction, online=True)
            if model.knee != x[0] and model.knee != x[-1]:
                output_knees.append((model.knee, model.knee_y, direction))
    
    if output_knees.__len__() != 0:
        print('发现拐点!')
        return output_knees
    else:
        print('未发现拐点!')


x=week3_data.index
y=week3_data['peo']
knee_point_search(x,y)

 3. 画图

import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib import style
plt.rcParams['font.sans-serif'] = ["SimHei"]
plt.rcParams["axes.unicode_minus"] = False

knee_info = knee_point_search(x=week3_data.index, 
                  y=week3_data['peo'])
fig, axe = plt.subplots(figsize=[20, 15])
axe.plot(week3_data.index, week3_data['peo'], 'k--')
axe.set_title('week3-peo', fontsize=20)
axe.set_xticks(week3_data.index)
axe.set_xticklabels([f"{week3_data.loc[i, 'weeks']}"
                            for i in week3_data.index], rotation=90)

for point in knee_info:
    axe.scatter(x=point[0], y=point[1], c='b', s=200, marker='^')
    axe.annotate(s=f'{point[0]+1}, {point[1]}, {point[2]}', xy=(point[0]+1, point[1]), fontsize=12)

 

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5495969.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-13
下一篇 2022-12-13

发表评论

登录后才能评论

评论列表(0条)

保存