1.掌握逻辑回归与决策树模型模型的原理及使用场景
2.掌握pyspark库使用
3.掌握模型预测结果保存的方法
下面以鸾尾花数据集(Iris)为例进行分析,Iris 以鸾尾花的特征作为数据来源,数据集包含 150 个数据,分为 3 类,每类 50 个数据,每个数据包含 4 个属性,是在数据挖掘、数据分类中非常常用的测试集、训练集。为了便于理解,这里主要用后两个属性(花瓣的长度和宽度)来进行分类。
首先来看一下逻辑回归分类器。逻辑斯蒂回归(Logistic Regression)是统计学习中的经典分 类方法,属于对数线性模型。逻辑回归的因变量可以是二分类的,也可以是多分类的。
其次看一下决策树(Decision Tree)分类器,决策树是一种基本的分类与回归方法,这里主要介绍用于分类的决策树。决策树模式呈树形结构,其中每个内部节 点表示一个属性上的测试,每个分支代表一个测试输出,每个叶节点代表一种类别。学习 时利用训练数据,根据损失函数最小化的原则建立决策树模型;预测时,对新的数据,利用 决策树模型进行分类。
实验原理1.逻辑回归模决策树模型原理型的原理
2.使用pyspark进行模型的创建、训练、评估、保存
ubuntu 16.04
Python 3.6
numpy 1.18.3
pandas 0.25.0
pyspark 2.4.4
wget http://i9000.net:8888/sgn/HUP/spark/iris.txt安装numpy库:
pip install numpy首先来看一下逻辑回归分类器
打开终端,输入指令打开pyspark
pyspark
(1)导入本地向量 Vector 和 Vectors,导入所需要的类
from pyspark.ml.linalg import Vector,Vectors from pyspark.sql import Row,functions from pyspark.ml.evaluation import MulticlassClassificationevaluator from pyspark.ml import Pipeline from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer,HashingTF,Tokenizer from pyspark.ml.classification import LogisticRegression,LogisticRegressionModel,BinaryLogisticRegressionSummary
(2)定制一个函数,来返回一个指定的数据,然后读取文本文件,第一个 map 把每行的数据用","隔开,如在数据集中,每行被分成了 5 部分,前 4 部分是鸾尾花的 4 个特征,最后一部分是鸾尾花的分类;这里把特征存储在 Vector 中,创建一个 Iris 模式的 RDD,然后转化成 dataframe;最后调用 show()方法来查看一下部分数据。
def f(x): rel = {} rel['features']=Vectors.dense(float(x[0]),float(x[1]),float(x[2]),float(x[3])) rel['label'] = str(x[4]) return rel data = spark.sparkContext.textFile("/home/ubuntu/iris.txt").map(lambda line:line.split(',')).map(lambda p:Row(**f(p))).toDF() data.show()
(3)分别获取标签列和特征列,进行索引和重命名
labelIndexer = StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(data) featureIndexer = VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").fit(data)
(4)设置Logistic Regression算法的参数。这里设置循环次数为100次,规范化项为0.3等。具体可以设置的参数,可以通过explainParams()来获取,还能看到程序已经设置的参数的结果。
lr = LogisticRegression().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(100).setRegParam(0.3).setElasticNetParam(0.8) print("LogisticRegression parameters:n"+ lr.explainParams())
(5)设置一个IndexToString的转换器,把预测的类别重新转化成字符型。构建一个机器学习流水线,设置各个阶段。上一个阶段的输出将是本阶段的输入。
labelConverter = IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels) lrPipeline = Pipeline().setStages([labelIndexer,featureIndexer,lr,labelConverter])
(6)把数据集随机分成训练集和测试集,其中训练集占70%。Pipeline本质上是一个预测器,当Pipeline调用fit()的时候就产生了一个PipelineModel,它是一个转换器。然后这个PipelineModel就可以调用transform来进行预测,生成一个新的Dataframe,即利用训练得到的模型对测试集进行验证
trainingData,testData = data.randomSplit([0.7,0.3]) lrPipelineModel = lrPipeline.fit(trainingData) lrPredictions = lrPipelineModel.transform(testData)
(7)输出预测结果,其中,select选择要输出的列,collect获取所有行数据,用foreach把每行打印出来
preRel = lrPredictions.select("predictedLabel","label","features","probability").collect() for item in preRel: print(str(item['label'])+','+ str(item['features'])+'- - > prob=' + str(item['probability'])+',predictedLabel'+ str(item['predictedLabel']))
(8)对训练的模型进行评估。创建一个MulticlassClassificationevaluator实例,使用setter方法把预测分类的列名和真实分类的列名进行设置,然后计算预测准确率
evaluator = MulticlassClassificationevaluator().setLabelCol("indexedLabel").setPredictionCol("prediction") lrAccuracy = evaluator.evaluate(lrPredictions) print(lrAccuracy)
(9)可以通过 model 来获取训练得到的逻辑斯蒂模型。lrPipelineModel 是一个 Pipeline-Model,因此,可以通过调用它的 stages 方法来获取模型,具体如下:
lrModel = lrPipelineModel.stages[2] print("Coefficients: n " + str(lrModel.coefficientMatrix) + "nintercept: " + str(lrModel.interceptVector) + "n numClasses: " + str(lrModel.numClasses) + "n numFeatures: " + str(lrModel.numFeatures))输出: 其次看一下决策树(Decision Tree)分类器
(1)导入所需要的包
from pyspark.ml.classification import DecisionTreeClassificationModel from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml import Pipeline,PipelineModel from pyspark. ml. evaluation import MulticlassClassificationevaluator from pyspark.ml.linalg import Vector,Vectors from pyspark.sql import Row from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer
(2)读取文本文件,第一个 map 把每行的数据用","隔开,如在数据集中,每行被分成了 5 部分,前 4 部分是鸾尾花的 4 个特征,最后一部分是鸾尾花的分类;这里把特征存储在Vector 中,创建一个 Iris 模式的 RDD,然后转化成 dataframe。
def f(x): rel = {} rel['features']= Vectors. dense(float(x[0]),float(x[1]),float(x[2]),float(x[3])) rel['label'] = str(x[4]) return rel data = spark.sparkContext. textFile("/home/ubuntu/iris.txt"). map(lambda line: line.split(',')). map(lambda p: Row(**f(p))). toDF()
(3)进一步处理特征和标签,把数据集随机分成训练集和测试集,其中训练集占 70% 。
labelIndexer = StringIndexer(). setInputCol("label"). setOutputCol("indexedLabel"). fit(data) featureIndexer = VectorIndexer(). setInputCol("features"). setOutputCol("indexedFeatures"). setMaxCategories(4). fit(data) labelConverter = IndexToString(). setInputCol("prediction"). setOutputCol("predictedLabel"). setLabels(labelIndexer.labels) trainingData, testData = data.randomSplit([0.7, 0.3])
(4)创建决策树模型 DecisionTreeClassifier,通过 setter 的方法来设置决策树的参数,也可以用 ParamMap 来设置。这里仅需要设置特征列(FeaturesCol)和待预测列(LabelCol)。具体可以设置的参数可通过 explainParams()来获取。
dtClassifier = DecisionTreeClassifier(). setLabelCol("indexedLabel"). setFeaturesCol("indexedFeatures")
(5)构建机器学习管道,在训练数据集上调用 fit()进行模型训练,并在测试数据集上调用 transform()方法进行预测。
dtPipeline = Pipeline(). setStages([labelIndexer,featureIndexer, dtClassifier, labelConverter]) dtPipelineModel = dtPipeline.fit(trainingData) dtPredictions = dtPipelineModel.transform(testData) dtPredictions.select("predictedLabel", "label", "features").show (20) evaluator = MulticlassClassificationevaluator(). setLabelCol("indexedLabel"). setPredictionCol("prediction") dtAccuracy = evaluator.evaluate(dtPredictions) print(dtAccuracy)
(6)可以通过调用 DecisionTreeClassificationModel 的 toDebugString 方法,查看训练的决策树模型结构。
treeModelClassifier = dtPipelineModel.stages[2] print("Learned classification tree model:n" + str(treeModelClassifier.toDebugString))实验总结
本次实验主要使用pyspark创建逻辑回归模型与决策树模型使用。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)