- 一、流水线Pipeline概念
- 二、流水线工作流程
- 2.1 训练过程
- 2.2 测试过程
- 三、Estimator, Transformer, Param实例
- 四、Pipeline实例
spark的流水线受 scikit-learn项目的启发,是对流水线式工作的一种高度抽象,通常可以包含多个机器学习流程,如:源数据ETL、数据预处理工作、指标提取、模型训练、模型验证、预测新数据等多个步骤。包含以下几个步骤:
1) Dataframe:spark.ml使用的数据格式是Dataframe,该格式可以容纳多种数据类型。Dataframe的列可以用来存储:文本,特征向量、真是标签、预测值等。(如果用过pandas,那么也可以将其类比于pandas的Dataframe)
2) Transformer: Transformer是一个抽象的概念,包含了 若干特征转换和学得模型。Transformer实现了transform()方法,该方法可以将一个Dataframe转换为另一个Dataframe。比如在一个Dataframe后面增加几个列,就属于Transformer;再比如通过一个模型为一个Dataframe增加标签,那么这个模型也属于Transformer。
3) Estimator:Estimator是一个学习算法或者算法拟合或者训练数据的一个抽象概念(有点绕,翻译的不好)。可以这么理解:由于Estimator实现了fit()方法,所以只要输入是Dataframe,然后产生一个模型,那么就可以将其理解为Estimator。比如说逻辑回归算法拟合某个Dataframe的数据,就会生成一个 LogisticRegressionModel,所以一个逻辑回归算法就是Estimator。Estimator同时也是Transformer。
4)Pipeline:将多个Transfomer和Estimator连接在一起构成一个机器学习工作流。
5) Parameter:所有Transformers和 Estimators共享用于指定参数的公共API。
一个Pipeline由若干个阶段( stage)组成,每个阶段要么是Transformer要么是Estimator。上一个阶段的输出是下一个阶段的输入,按顺序执行各给阶段,输入的Dataframe每经过一个阶段就会被转换一次:
1)Transformer阶段通过 transform()执行Dataframe的转换;
2)Estimator阶段先通过fit()方法转换为Transformer,转换后的这个Transformer再执行transform()方法将输入的Dataframe进行转换
上面这张图是一个处理文本的Pipeline训练示例:
1)图中上面那一行表示有三个阶段:Tokenizer(Transformer),HashingTF(Transformer),Logistic Regression(Estimator),两个Transformer,一个Estimator。
2)图中下面那一行表示Pipeline中数据的流向,其中每一个圆柱体表示一个Dataframe。当pipeline调用fit()方法时,从初始的Dataframe开始进行 *** 作:
a)Tokenizer.transform()将原始文本(raw text)转换为单词(words),在原始的Dataframe中新增单词列;
b)HashingTF.transform()将单词转为特征向量(feature vectors),将这个特征向量新增到上一步产生的Dataframe中
c)由于Logistic Regression 是一个Estimator,所以调用先LogisticRegression.fit() 生成一个 LogisticRegressionModel,然后这个 LogisticRegressionModel再调用transform()方法来对输入的Dataframe进行转换。
训练的过程做了简要介绍,接下来再对测试过程进行讲解。
在训练阶段,构建的流水线Pipeline调用fit()后生成一个 PipelineModel,这个 PipelineModel是一个Transformer。当要使用测试数据进行测试时,让训练阶段生成的PipelineModel调用transform()方法。上图是利用这个PipelineModel进行测试的过程,可以看到测试过程和预测过程是一样的流程,拥有相同的阶段数量,只是将测试阶段LogisticRegression这个Estimator变成了Transformer。
# 1.导包,利用逻辑回归来演示 from pyspark.ml.linalg import Vectors from pyspark.ml.classification import LogisticRegression # 2.构造训练数据,列表形式,其数据为元组:(label, features) training = spark.createDataframe( [(1.0, Vectors.dense([0.0, 1.1, 0.1])), (0.0, Vectors.dense([2.0, 1.0, -1.0])), (0.0, Vectors.dense([2.0, 1.3, 1.0])), (1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"]) # 3.创建LogisticRegression实例(迭代10次,正则化参数为0.01),该实例是一个Estimator lr = LogisticRegression(maxIter=10,regParam=0.01) # 查看该实例信息,会打印出具体信息,内容太多,就不在这里展示了 print("logisticRegression parameters:n" + lr.explainParams() +"n") # 4.训练模型:调用fit()方法,生成一个LogisticRegressionModel model1 = lr.fit(training) # 通过模型的extractParamMap()方法可以查看模型的详细信息 print("Model 1 was fit using parameters: ") print(model1.extractParamMap()) # 5.自定义设置参数 # 5.1 通过python字典可以指定模型参数,指定的参数会覆盖原有参数,下面是三种指定参数的方式: paramMap={lr.maxIter:20} paramMap[lr.maxIter]=30 paramMap.update({lr.regParam:0.1,lr.threshold:0.55}) # 5.2 修改参数名称,参数的名称也是可以修改的 paramMap2 = {lr.probabilityCol: "myProbability"} # 5.3 将2次修改进行合并 paramMapCombined = paramMap.copy() paramMapCombined.update(paramMap2) # 5.4 将自定义的参数作用于模型的生成 model2 = lr.fit(training,paramCombined) print("Model 2 was fit using parameters: ") print(model2.extractParamMap()) # 6.测试:训练阶段生成的模型调用transform() 方法来对测试数据进行预测 # 6.1 生成测试数据 test = spark.createDataframe([ (1.0, Vectors.dense([-1.0, 1.5, 1.3])), (0.0, Vectors.dense([3.0, 2.0, -0.1])), (1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"]) # 6.2测试 prediction = model2.transform(test) # 7. 查看测试结果,注意:在第5.2步中,将lr.probabilityCol重命名为myProbability, # 所以如果要查看该部分信息则需要将名称进行相应的更换 result = prediction.select("features", "label", "myProbability", "prediction").collect() for row in result: print("features=%s, label=%s -> prob=%s, prediction=%s" % (row.features, row.label, row.myProbability, row.prediction)) # 输出入下: features=[-1.0,1.5,1.3], label=1.0 -> prob=[0.0570730417103402,0.9429269582896599], prediction=1.0 features=[3.0,2.0,-0.1], label=0.0 -> prob=[0.9238522311704104,0.07614776882958958], prediction=0.0 features=[0.0,2.2,-1.5], label=1.0 -> prob=[0.10972776114779453,0.8902722388522054], prediction=1.0四、Pipeline实例
第三步演示了单独进行各项 *** 作的过程,在这里演示管道化处理。
# 1.导包 from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF,Tokenizer # 2.构造训练数据,列表形式,其数据为元组: (id, text, label) training = spark.createDataframe([ (0, "a b c d e spark", 1.0), (1, "b d", 0.0), (2, "spark f g h", 1.0), (3, "hadoop mapreduce", 0.0)],["id", "text", "label"]) # 3. 创建管道,包含三个阶段:tokenizer, hashingTF, and lr # 3.1 在初始Dataframe中新增words列,值为text列的数据转换为单词 tokenizer = Tokenizer(inputCol="text",outputCol="words") # 3.2 将上一步得到的Dataframe的输出列(words列)转为特征向量后作为新增的列,列名为features hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(),outputCol='features') # 3.3 创建逻辑回归实例 lr = LogisticRegression(maxIter=10,regParam=0.001) # 3.4 创建管道:将上述各个阶段依次结合 pipeline = Pipeline(stages=[tokenizer,hashingTF,lr]) # 4.训练模型:调用流水线的fit()方法,生成一个pipelinemodel,该模型是一个Transformer model = pipeline.fit(training) # 5.测试 # 5.1 生成测试数据 test = spark.createDataframe([ (4, "spark i j k"), (5, "l m n"), (6, "spark hadoop spark"), (7, "apache hadoop") ], ["id", "text"]) # 5.2 测试:用训练得到的模型调用transform()方法来预测 prediction = model.transform(test) # 6.查看预测情况 selected = prediction.select("id", "text", "probability", "prediction") for row in selected.collect(): rid, text, prob, prediction = row print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction)) # 结果如下: (4, spark i j k) --> prob=[0.15964077387874748,0.8403592261212525], prediction=1.000000 (5, l m n) --> prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000 (6, spark hadoop spark) --> prob=[0.06926633132976032,0.9307336686702398], prediction=1.000000 (7, apache hadoop) --> prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)