1. 数据源
#data data = spark.sql('''select * from data_pre.card_member_weeks''') data_df = data.toPandas() data_df['amt'] = data_df['amt'].apply(pd.to_numeric) week1_data = data_df[data_df['type']=='week1'].sort_values(by = "weeks").reset_index().drop(index=[0])
2. func
from scipy.optimize import curve_fit def func(x, m, p, q): return (m * (p + q) ** 2 / p) * (np.e ** ((-p - q) * x) / (1 + (q / p) * np.e ** ((-p - q) * x)) ** 2) def find_turning_points(x, y): turning_points_x = [] s1 = (x[1] - x[0]) * (y[2] - y[0]) + (y[0] - y[1]) * (x[2] - x[0]) for i in range(2, len(x) - 1): s2 = (x[i] - x[i - 1]) * (y[i + 1] - y[i - 1]) + (y[i - 1] - y[i]) * (x[i + 1] - x[i - 1]) if s1 * s2 < 0: turning_points_x.append(i) s1 = s2 return turning_points_x def bass_smooth_and_cal_turning_points(week_sl_df): res = pd.Dataframe([[-1, -1]], columns=['成长期-成熟期', '成熟期-衰退期']) week_sl = list(week_sl_df['amt'].values) x = [i for i in range(1, len(week_sl) + 1)] x = np.array(x) y = np.array(week_sl) try: popt, pcov = curve_fit(func, x, y) m = popt[0] p = popt[1] q = popt[2] yvals = func(x, m, p, q) yvals = np.array([max(0, i) for i in yvals]) except: yvals = y if len(yvals) >= 4: x_turning_points = find_turning_points(x, yvals) x_turning_points = [i + 1 for i in x_turning_points] # if len(x_turning_points) == 2: # res['成长期-成熟期'] = x_turning_points[0] # res['成熟期-衰退期'] = x_turning_points[1] return x_turning_points sku_new_label = whole_train_sp_week_sl.groupby(['sphh']).apply(lambda x: bass_smooth_and_cal_turning_points(x[['sphh', 'week_sl']])).reset_index().drop(['level_1'], axis=1) sku_new_label = sku_new_label[(sku_new_label['成长期-成熟期']!=-1) & (sku_new_label['成熟期-衰退期']!=-1)].reset_index(drop=True) bass_label = week3_data.apply(lambda x: bass_smooth_and_cal_turning_points(x[['type', 'amt']])) sku_new_label = whole_train_sp_week_sl.groupby(['sphh']).apply(lambda x: bass_smooth_and_cal_turning_points(x[['sphh', 'week_sl']])).reset_index().drop(['level_1'], axis=1) sku_new_label = sku_new_label[(sku_new_label['成长期-成熟期']!=-1) & (sku_new_label['成熟期-衰退期']!=-1)].reset_index(drop=True) bass_label = week3_data.apply(lambda x: bass_smooth_and_cal_turning_points(x[['type', 'amt']])) from kneed import KneeLocator def knee_point_search(x, y): # 转为list以支持负号索引 x, y = x.tolist(), y.tolist() output_knees = [] for curve in ['convex', 'concave']: for direction in ['increasing', 'decreasing']: model = KneeLocator(x=x, y=y, curve=curve, direction=direction, online=True) if model.knee != x[0] and model.knee != x[-1]: output_knees.append((model.knee, model.knee_y, direction)) if output_knees.__len__() != 0: print('发现拐点!') return output_knees else: print('未发现拐点!') x=week3_data.index y=week3_data['peo'] knee_point_search(x,y)
3. 画图
import matplotlib as mpl import matplotlib.pyplot as plt from matplotlib import style plt.rcParams['font.sans-serif'] = ["SimHei"] plt.rcParams["axes.unicode_minus"] = False knee_info = knee_point_search(x=week3_data.index, y=week3_data['peo']) fig, axe = plt.subplots(figsize=[20, 15]) axe.plot(week3_data.index, week3_data['peo'], 'k--') axe.set_title('week3-peo', fontsize=20) axe.set_xticks(week3_data.index) axe.set_xticklabels([f"{week3_data.loc[i, 'weeks']}" for i in week3_data.index], rotation=90) for point in knee_info: axe.scatter(x=point[0], y=point[1], c='b', s=200, marker='^') axe.annotate(s=f'{point[0]+1}, {point[1]}, {point[2]}', xy=(point[0]+1, point[1]), fontsize=12)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)