ID3
代码详解实现效果 C4.5
代码实现效果 代码学习技巧总结
本文以经典的西瓜问题为例,决策树原理详见文章 ML学习笔记——决策树
ID3 代码详解代码来源:https://blog.csdn.net/leaf_zizi/article/details/82848682
数据集
青绿 蜷缩 浊响 清晰 凹陷 硬滑 是 乌黑 蜷缩 沉闷 清晰 凹陷 硬滑 是 乌黑 蜷缩 浊响 清晰 凹陷 硬滑 是 青绿 蜷缩 沉闷 清晰 凹陷 硬滑 是 浅白 蜷缩 浊响 清晰 凹陷 硬滑 是 青绿 稍蜷 浊响 清晰 稍凹 软粘 是 乌黑 稍蜷 浊响 稍糊 稍凹 软粘 是 乌黑 稍蜷 浊响 清晰 稍凹 硬滑 是 乌黑 稍蜷 沉闷 稍糊 稍凹 硬滑 否 青绿 硬挺 清脆 清晰 平坦 软粘 否 浅白 硬挺 清脆 模糊 平坦 硬滑 否 浅白 蜷缩 浊响 模糊 平坦 软粘 否 青绿 稍蜷 浊响 稍糊 凹陷 硬滑 否 浅白 稍蜷 沉闷 稍糊 凹陷 硬滑 否 乌黑 稍蜷 浊响 清晰 稍凹 软粘 否 浅白 蜷缩 浊响 模糊 平坦 硬滑 否 青绿 蜷缩 沉闷 稍糊 稍凹 硬滑 否
ID3主代码
源代码基于python2,在python3环境下会报错,因此以下代码在源代码的基础上稍有修改并添加了详细的注释。
import numpy as np import pandas as pd import sklearn.tree as st import math import matplotlib import os import matplotlib.pyplot as plt # 以下注释主要针对西瓜数据集 # 计算信息熵 # 参数 - 数据集(二维list) # 返回值 - 传入的数据集的信息熵(浮点数) def calcEntropy(dataSet): mD = len(dataSet) # 数据集行数 dataLabelList = [x[-1] for x in dataSet] # 类别list(是/否) dataLabelSet = set(dataLabelList) # 类别list -> 类别set(元素不重复,只有‘是’和‘否’) ent = 0 for label in dataLabelSet: # 遍历两次,一次遍历‘是’类别,一次遍历‘否’类别,遍历结束得到信息熵ent mDv = dataLabelList.count(label) prop = float(mDv) / mD ent = ent - prop * np.math.log(prop, 2) return ent # 删除数据集包含某一特征的所有列数据,并返回经过删除的曾包含该特征的list的数据集(二维list) # 参数 # index - 要拆分的特征的下标(整型数),代表属性 # feature - 要拆分的特征(字符串) # 返回值 - dataSet中所有特征是feature的那一行数据中去掉该feature的数据集(二维list) def splitDataSet(dataSet, index, feature): splitedDataSet = [] # 初始化已拆分的数据集splitedDataSet,类型为list for data in dataSet: # 遍历数据集的每一行 if (data[index] == feature): # 如果是我们要拆分出来的特征feature sliceTmp = data[:index] # 新建一个列表sliceTmp,先取这一行feature前面的所有数据 sliceTmp.extend(data[index + 1:]) # 再取这一行feature后面的所有数据 splitedDataSet.append(sliceTmp) # 将sliceTmp加到splitedDataSet中去 return splitedDataSet # 选择最优划分属性(信息增益法) # 参数 - 数据集(二维list) # 返回值 - 最好的特征的下标(整型数) def chooseBestFeature(dataSet): entD = calcEntropy(dataSet) # 计算信息熵 mD = len(dataSet) # 数据集的行数 featureNumber = len(dataSet[0]) - 1 # 数据集的特征数,减1是因为数据集最后一列是类型而不是特征 maxGain = -100 # 初始化最大信息增益 maxIndex = -1 # 初始化最大信息增益对应的列号(属性) for i in range(featureNumber): # 遍历数据集的所有列(属性),i:0,1,...,featureNumber-1 entDCopy = entD featureI = [x[i] for x in dataSet] # 特征向量(第i列,即第i个属性) featureSet = set(featureI) # 不含重复元素的特征集合(第i列,即第i个属性) for feature in featureSet: # 遍历该特征集合中的特征feature splitedDataSet = splitDataSet(dataSet, i, feature) # 含feature的数据集,但不包含feature的一列 mDv = len(splitedDataSet) # 行数,即含feature的数据个数 entDCopy = entDCopy - float(mDv) / mD * calcEntropy(splitedDataSet) # 为什么这里可以直接计算splitedDataSet的信息熵? # 因为计算信息熵时只需要数据集中最后一列的数据(类型),有无含feature的一列对计算结果无影响 if (maxIndex == -1): maxGain = entDCopy maxIndex = i elif (maxGain < entDCopy): # 找到了更大的信息增益,即找到了更优的划分属性 maxGain = entDCopy maxIndex = i return maxIndex # 返回数量最多的类别 # 参数 - 类别向量(一维list) # 返回值 - 类别(字符串) def mainLabel(labelList): labelRec = labelList[0] maxLabelCount = -1 # 数量最多的类别的类别数 labelSet = set(labelList) # 无重复元素的类别集合 for label in labelSet: # 遍历该集合中的类别 if (labelList.count(label) > maxLabelCount): # 如果传入的类别向量中当前类别的数量比maxLabelCount大,更新数量最多的类别 maxLabelCount = labelList.count(label) labelRec = label return labelRec # 构建决策树 def createFullDecisionTree(dataSet, featureNames, featureNamesSet, labelListParent): labelList = [x[-1] for x in dataSet] # 取数据集最后一列为类别向量 if (len(dataSet) == 0): # 数据集为空集 return mainLabel(labelListParent) if (len(dataSet[0]) == 1): # 没有可划分的属性了 return mainLabel(labelList) # 选出最多的类别作为该数据集的标签 if (labelList.count(labelList[0]) == len(labelList)): # 全部都属于同一个类别 return labelList[0] bestFeatureIndex = chooseBestFeature(dataSet) # 最好的属性的下标 bestFeatureName = featureNames.pop(bestFeatureIndex) # 从featureNames中删去bestFeature,并返回bestFeature myTree = {bestFeatureName: {}} # 用字典构建决策树 featureList = featureNamesSet.pop(bestFeatureIndex) # 从featureNamesSet中删去bestFeature对应的子属性集,并返回这个最好的属性的子属性集 featureSet = set(featureList) # 转换成set类型 for feature in featureSet: featureNamesNext = featureNames[:] # 这里的featureNames已经删去了最好的属性 featureNamesSetNext = featureNamesSet[:][:] # 这里的featureNamesSet也已经删去了最好的属性 splitedDataSet = splitDataSet(dataSet, bestFeatureIndex, feature) # 递归建树 myTree[bestFeatureName][feature] = createFullDecisionTree(splitedDataSet, featureNamesNext, featureNamesSetNext, labelList) return myTree # 返回值 # dataSet 数据集 # featureNames 属性集(一维list) # featureNamesSet 一个二维list,其中每一个元素是对应的属性的所有子属性的集合list def readWatermelonDataSet(): fr = open(r'C:Users静如止水DesktopID3data.txt', encoding='utf-8') dataSet = [inst.strip().split(' ') for inst in fr.readlines()] # print("dataSet =") # print(dataSet) # print('n') featureNames = ['色泽', '根蒂', '敲击', '纹理', '脐部', '触感'] # 获取featureNamesSet featureNamesSet = [] for i in range(len(dataSet[0]) - 1): col = [x[i] for x in dataSet] colSet = set(col) featureNamesSet.append(list(colSet)) return dataSet, featureNames, featureNamesSet
基于matplotlib的可视化代码
以下代码与源代码一致,无删改。
# 能够显示中文 matplotlib.rcParams['font.sans-serif'] = ['SimHei'] matplotlib.rcParams['font.serif'] = ['SimHei'] # 分叉节点,也就是决策节点 decisionNode = dict(boxstyle="sawtooth", fc="0.8") # 叶子节点 leafNode = dict(boxstyle="round4", fc="0.8") # 箭头样式 arrow_args = dict(arrowstyle="<-") def plotNode(nodeTxt, centerPt, parentPt, nodeType): """ 绘制一个节点 :param nodeTxt: 描述该节点的文本信息 :param centerPt: 文本的坐标 :param parentPt: 点的坐标,这里也是指父节点的坐标 :param nodeType: 节点类型,分为叶子节点和决策节点 :return: """ createPlot.ax1.annotate(nodeTxt, xy=parentPt, xycoords='axes fraction', xytext=centerPt, textcoords='axes fraction', va="center", ha="center", bbox=nodeType, arrowprops=arrow_args) def getNumLeafs(myTree): """ 获取叶节点的数目 :param myTree: :return: """ # 统计叶子节点的总数 numLeafs = 0 # 得到当前第一个key,也就是根节点 firstStr = list(myTree.keys())[0] # 得到第一个key对应的内容 secondDict = myTree[firstStr] # 递归遍历叶子节点 for key in secondDict.keys(): # 如果key对应的是一个字典,就递归调用 if type(secondDict[key]).__name__ == 'dict': numLeafs += getNumLeafs(secondDict[key]) # 不是的话,说明此时是一个叶子节点 else: numLeafs += 1 return numLeafs def getTreeDepth(myTree): """ 得到数的深度层数 :param myTree: :return: """ # 用来保存最大层数 maxDepth = 0 # 得到根节点 firstStr = list(myTree.keys())[0] # 得到key对应的内容 secondDic = myTree[firstStr] # 遍历所有子节点 for key in secondDic.keys(): # 如果该节点是字典,就递归调用 if type(secondDic[key]).__name__ == 'dict': # 子节点的深度加1 thisDepth = 1 + getTreeDepth(secondDic[key]) # 说明此时是叶子节点 else: thisDepth = 1 # 替换最大层数 if thisDepth > maxDepth: maxDepth = thisDepth return maxDepth def plotMidText(cntrPt, parentPt, txtString): """ 计算出父节点和子节点的中间位置,填充信息 :param cntrPt: 子节点坐标 :param parentPt: 父节点坐标 :param txtString: 填充的文本信息 :return: """ # 计算x轴的中间位置 xMid = (parentPt[0] - cntrPt[0]) / 2.0 + cntrPt[0] # 计算y轴的中间位置 yMid = (parentPt[1] - cntrPt[1]) / 2.0 + cntrPt[1] # 进行绘制 createPlot.ax1.text(xMid, yMid, txtString) def plotTree(myTree, parentPt, nodeTxt): """ 绘制出树的所有节点,递归绘制 :param myTree: 树 :param parentPt: 父节点的坐标 :param nodeTxt: 节点的文本信息 :return: """ # 计算叶子节点数 numLeafs = getNumLeafs(myTree=myTree) # 计算树的深度 depth = getTreeDepth(myTree=myTree) # 得到根节点的信息内容 firstStr = list(myTree.keys())[0] # 计算出当前根节点在所有子节点的中间坐标,也就是当前x轴的偏移量加上计算出来的根节点的中心位置作为x轴(比如说第一次:初始的x偏移量为:-1/2W,计算出来的根节点中心位置为:(1+W)/2W,相加得到:1/2),当前y轴偏移量作为y轴 cntrPt = (plotTree.xOff + (1.0 + float(numLeafs)) / 2.0 / plotTree.totalW, plotTree.yOff) # 绘制该节点与父节点的联系 plotMidText(cntrPt, parentPt, nodeTxt) # 绘制该节点 plotNode(firstStr, cntrPt, parentPt, decisionNode) # 得到当前根节点对应的子树 secondDict = myTree[firstStr] # 计算出新的y轴偏移量,向下移动1/D,也就是下一层的绘制y轴 plotTree.yOff = plotTree.yOff - 1.0 / plotTree.totalD # 循环遍历所有的key for key in secondDict.keys(): # 如果当前的key是字典的话,代表还有子树,则递归遍历 if isinstance(secondDict[key], dict): plotTree(secondDict[key], cntrPt, str(key)) else: # 计算新的x轴偏移量,也就是下个叶子绘制的x轴坐标向右移动了1/W plotTree.xOff = plotTree.xOff + 1.0 / plotTree.totalW # 打开注释可以观察叶子节点的坐标变化 # print((plotTree.xOff, plotTree.yOff), secondDict[key]) # 绘制叶子节点 plotNode(secondDict[key], (plotTree.xOff, plotTree.yOff), cntrPt, leafNode) # 绘制叶子节点和父节点的中间连线内容 plotMidText((plotTree.xOff, plotTree.yOff), cntrPt, str(key)) # 返回递归之前,需要将y轴的偏移量增加,向上移动1/D,也就是返回去绘制上一层的y轴 plotTree.yOff = plotTree.yOff + 1.0 / plotTree.totalD def createPlot(inTree): """ 需要绘制的决策树 :param inTree: 决策树字典 :return: """ # 创建一个图像 fig = plt.figure(1, facecolor='white') fig.clf() axprops = dict(xticks=[], yticks=[]) createPlot.ax1 = plt.subplot(111, frameon=False, **axprops) # 计算出决策树的总宽度 plotTree.totalW = float(getNumLeafs(inTree)) # 计算出决策树的总深度 plotTree.totalD = float(getTreeDepth(inTree)) # 初始的x轴偏移量,也就是-1/2W,每次向右移动1/W,也就是第一个叶子节点绘制的x坐标为:1/2W,第二个:3/2W,第三个:5/2W,最后一个:(W-1)/2W plotTree.xOff = -0.5 / plotTree.totalW # 初始的y轴偏移量,每次向下或者向上移动1/D plotTree.yOff = 1.0 # 调用函数进行绘制节点图像 plotTree(inTree, (0.5, 1.0), '') # 绘制 plt.show()
执行代码
dataSet, featureNames, featureNamesSet = readWatermelonDataSet() testTree = createFullDecisionTree(dataSet, featureNames, featureNamesSet, featureNames) print(testTree) # 打印字典型决策树 createPlot(testTree) # 可视化决策树实现效果
终端:
{'纹理': {'清晰': {'根蒂': {'稍蜷': {'色泽': {'青绿': '是', '乌黑': {'触感': {'硬滑': '是', '软粘': '否'}}, '浅白': '是'}}, '蜷缩': '是', '硬挺': '否'}}, '模糊': '否', '稍糊': {'触感': {'硬滑': '否', '软粘': '是'}}}}
matplotlib:
对照西瓜书
代码来源:https://blog.csdn.net/leaf_zizi/article/details/84866918
本文在源代码的基础上进行了数据集的更换,并增加了连续值的处理与剪枝。
数据集
青绿 蜷缩 浊响 清晰 凹陷 硬滑 0.697 0.460 1 是 乌黑 蜷缩 沉闷 清晰 凹陷 硬滑 0.774 0.376 1 是 乌黑 蜷缩 浊响 清晰 凹陷 硬滑 0.634 0.264 1 是 青绿 蜷缩 沉闷 清晰 凹陷 硬滑 0.608 0.318 1 是 浅白 蜷缩 浊响 清晰 凹陷 硬滑 0.556 0.215 1 是 青绿 稍蜷 浊响 清晰 稍凹 软粘 0.403 0.237 1 是 乌黑 稍蜷 浊响 稍糊 稍凹 软粘 0.481 0.149 1 是 乌黑 稍蜷 浊响 清晰 稍凹 硬滑 0.437 0.211 1 是 乌黑 稍蜷 沉闷 稍糊 稍凹 硬滑 0.666 0.091 1 否 青绿 硬挺 清脆 清晰 平坦 软粘 0.243 0.267 1 否 浅白 硬挺 清脆 模糊 平坦 硬滑 0.245 0.057 1 否 浅白 蜷缩 浊响 模糊 平坦 软粘 0.343 0.099 1 否 青绿 稍蜷 浊响 稍糊 凹陷 硬滑 0.639 0.161 1 否 浅白 稍蜷 沉闷 稍糊 凹陷 硬滑 0.657 0.198 1 否 乌黑 稍蜷 浊响 清晰 稍凹 软粘 0.360 0.370 1 否 浅白 蜷缩 浊响 模糊 平坦 硬滑 0.593 0.042 1 否 青绿 蜷缩 沉闷 稍糊 稍凹 硬滑 0.719 0.103 1 否
主代码(C45)
与源代码一致,无删改。
# -*- coding: cp936 -*- from math import log import operator import os import re from numpy import inf import copy # 计算信息熵 def calcShannonEnt(dataSet, labelIndex): # type: (list) -> float numEntries = 0 # 样本数(按权重计算) labelCounts = {} for featVec in dataSet: # 遍历每个样本 if featVec[labelIndex] != 'N': weight = float(featVec[-2]) numEntries += weight currentLabel = featVec[-1] # 当前样本的类别 if currentLabel not in labelCounts.keys(): # 生成类别字典 labelCounts[currentLabel] = 0 labelCounts[currentLabel] += weight # 数据集的倒数第二个值用来标记样本权重 shannonEnt = 0.0 for key in labelCounts: # 计算信息熵 prob = float(labelCounts[key]) / numEntries shannonEnt = shannonEnt - prob * log(prob, 2) return shannonEnt def splitDataSet(dataSet, axis, value, LorR='N'): """ type: (list, int, string or float, string) -> list 划分数据集 axis:按第几个特征划分 value:划分特征的值 LorR: N 离散属性; L 小于等于value值; R 大于value值 """ retDataSet = [] featVec = [] if LorR == 'N': # 离散属性 for featVec in dataSet: if featVec[axis] == value: reducedFeatVec = featVec[:axis] reducedFeatVec.extend(featVec[axis + 1:]) retDataSet.append(reducedFeatVec) elif LorR == 'L': for featVec in dataSet: if featVec[axis] != 'N': if float(featVec[axis]) < value: retDataSet.append(featVec) elif LorR == 'R': for featVec in dataSet: if featVec[axis] != 'N': if float(featVec[axis]) > value: retDataSet.append(featVec) return retDataSet def splitDataSetWithNull(dataSet, axis, value, LorR='N'): """ type: (list, int, string or float, string) -> list 划分数据集 axis:按第几个特征划分 value:划分特征的值 LorR: N 离散属性; L 小于等于value值; R 大于value值 """ retDataSet = [] nullDataSet = [] featVec = [] totalWeightV = calcTotalWeight(dataSet, axis, False) # 非空样本权重 totalWeightSub = 0.0 if LorR == 'N': # 离散属性 for featVec in dataSet: if featVec[axis] == value: reducedFeatVec = featVec[:axis] reducedFeatVec.extend(featVec[axis + 1:]) retDataSet.append(reducedFeatVec) elif featVec[axis] == 'N': reducedNullVec = featVec[:axis] reducedNullVec.extend(featVec[axis + 1:]) nullDataSet.append(reducedNullVec) elif LorR == 'L': for featVec in dataSet: if featVec[axis] != 'N': if float(featVec[axis]) < value: retDataSet.append(featVec) elif featVec[axis] == 'N': nullDataSet.append(featVec) elif LorR == 'R': for featVec in dataSet: if featVec[axis] != 'N': if float(featVec[axis]) > value: retDataSet.append(featVec) elif featVec[axis] == 'N': nullDataSet.append(featVec) totalWeightSub = calcTotalWeight(retDataSet, -1, True) # 计算此分支中非空样本的总权重 for nullVec in nullDataSet: # 把缺失值样本按权值比例划分到分支中 nullVec[-2] = float(nullVec[-2]) * totalWeightSub / totalWeightV retDataSet.append(nullVec) return retDataSet def calcTotalWeight(dataSet, labelIndex, isContainNull): """ type: (list, int, bool) -> float 计算样本集对某个特征值的总样本树(按权重计算) :param dataSet: 数据集 :param labelIndex: 特征值索引 :param isContainNull: 是否包含空值的样本 :return: 返回样本集的总权重值 """ totalWeight = 0.0 for featVec in dataSet: # 遍历每个样本 weight = float(featVec[-2]) if isContainNull is False and featVec[labelIndex] != 'N': totalWeight += weight # 非空样本树,按权重计算 if isContainNull is True: totalWeight += weight # 总样本数,按权重计算 return totalWeight def calcGain(dataSet, labelIndex, labelPropertyi): """ type: (list, int, int) -> float, int 计算信息增益,返回信息增益值和连续属性的划分点 dataSet: 数据集 labelIndex: 特征值索引 labelPropertyi: 特征值类型,0为离散,1为连续 """ baseEntropy = calcShannonEnt(dataSet, labelIndex) # 计算根节点的信息熵 featList = [example[labelIndex] for example in dataSet] # 特征值列表 uniquevals = set(featList) # 该特征包含的所有值 newEntropy = 0.0 totalWeight = 0.0 totalWeightV = 0.0 totalWeight = calcTotalWeight(dataSet, labelIndex, True) # 总样本权重 totalWeightV = calcTotalWeight(dataSet, labelIndex, False) # 非空样本权重 if labelPropertyi == 0: # 对离散的特征 for value in uniquevals: # 对每个特征值,划分数据集, 计算各子集的信息熵 if value != 'N': subDataSet = splitDataSet(dataSet, labelIndex, value) totalWeightSub = 0.0 totalWeightSub = calcTotalWeight(subDataSet, labelIndex, True) prob = totalWeightSub / totalWeightV newEntropy += prob * calcShannonEnt(subDataSet, labelIndex) else: # 对连续的特征 uniquevalsList = list(uniquevals) if 'N' in uniquevalsList: uniquevalsList.remove('N') sortedUniquevals = sorted(uniquevalsList) # 对特征值排序 listPartition = [] minEntropy = inf if len(sortedUniquevals) == 1: # 如果只有一个值,可以看作只有左子集,没有右子集 totalWeightLeft = calcTotalWeight(dataSet, labelIndex, True) probLeft = totalWeightLeft / totalWeightV minEntropy = probLeft * calcShannonEnt(dataSet, labelIndex) else: for j in range(len(sortedUniquevals) - 1): # 计算划分点 partValue = (float(sortedUniquevals[j]) + float( sortedUniquevals[j + 1])) / 2 # 对每个划分点,计算信息熵 dataSetLeft = splitDataSet(dataSet, labelIndex, partValue, 'L') dataSetRight = splitDataSet(dataSet, labelIndex, partValue, 'R') totalWeightLeft = 0.0 totalWeightLeft = calcTotalWeight(dataSetLeft, labelIndex, True) totalWeightRight = 0.0 totalWeightRight = calcTotalWeight(dataSetRight, labelIndex, True) probLeft = totalWeightLeft / totalWeightV probRight = totalWeightRight / totalWeightV Entropy = probLeft * calcShannonEnt(dataSetLeft, labelIndex) + probRight * calcShannonEnt(dataSetRight, labelIndex) if Entropy < minEntropy: # 取最小的信息熵 minEntropy = Entropy newEntropy = minEntropy gain = totalWeightV / totalWeight * (baseEntropy - newEntropy) return gain def calcGainRatio(dataSet, labelIndex, labelPropertyi): """ type: (list, int, int) -> float, int 计算信息增益率,返回信息增益率和连续属性的划分点 dataSet: 数据集 labelIndex: 特征值索引 labelPropertyi: 特征值类型,0为离散,1为连续 """ baseEntropy = calcShannonEnt(dataSet, labelIndex) # 计算根节点的信息熵 featList = [example[labelIndex] for example in dataSet] # 特征值列表 uniquevals = set(featList) # 该特征包含的所有值 newEntropy = 0.0 bestPartValuei = None IV = 0.0 totalWeight = 0.0 totalWeightV = 0.0 totalWeight = calcTotalWeight(dataSet, labelIndex, True) # 总样本权重 totalWeightV = calcTotalWeight(dataSet, labelIndex, False) # 非空样本权重 if labelPropertyi == 0: # 对离散的特征 for value in uniquevals: # 对每个特征值,划分数据集, 计算各子集的信息熵 subDataSet = splitDataSet(dataSet, labelIndex, value) totalWeightSub = 0.0 totalWeightSub = calcTotalWeight(subDataSet, labelIndex, True) if value != 'N': prob = totalWeightSub / totalWeightV newEntropy += prob * calcShannonEnt(subDataSet, labelIndex) prob1 = totalWeightSub / totalWeight IV -= prob1 * log(prob1, 2) else: # 对连续的特征 uniquevalsList = list(uniquevals) if 'N' in uniquevalsList: uniquevalsList.remove('N') # 计算空值样本的总权重,用于计算IV totalWeightN = 0.0 dataSetNull = splitDataSet(dataSet, labelIndex, 'N') totalWeightN = calcTotalWeight(dataSetNull, labelIndex, True) probNull = totalWeightN / totalWeight if probNull > 0.0: IV += -1 * probNull * log(probNull, 2) sortedUniquevals = sorted(uniquevalsList) # 对特征值排序 listPartition = [] minEntropy = inf if len(sortedUniquevals) == 1: # 如果只有一个值,可以看作只有左子集,没有右子集 totalWeightLeft = calcTotalWeight(dataSet, labelIndex, True) probLeft = totalWeightLeft / totalWeightV minEntropy = probLeft * calcShannonEnt(dataSet, labelIndex) IV = -1 * probLeft * log(probLeft, 2) else: for j in range(len(sortedUniquevals) - 1): # 计算划分点 partValue = (float(sortedUniquevals[j]) + float( sortedUniquevals[j + 1])) / 2 # 对每个划分点,计算信息熵 dataSetLeft = splitDataSet(dataSet, labelIndex, partValue, 'L') dataSetRight = splitDataSet(dataSet, labelIndex, partValue, 'R') totalWeightLeft = 0.0 totalWeightLeft = calcTotalWeight(dataSetLeft, labelIndex, True) totalWeightRight = 0.0 totalWeightRight = calcTotalWeight(dataSetRight, labelIndex, True) probLeft = totalWeightLeft / totalWeightV probRight = totalWeightRight / totalWeightV Entropy = probLeft * calcShannonEnt( dataSetLeft, labelIndex) + probRight * calcShannonEnt(dataSetRight, labelIndex) if Entropy < minEntropy: # 取最小的信息熵 minEntropy = Entropy bestPartValuei = partValue probLeft1 = totalWeightLeft / totalWeight probRight1 = totalWeightRight / totalWeight IV += -1 * (probLeft1 * log(probLeft1, 2) + probRight1 * log(probRight1, 2)) newEntropy = minEntropy gain = totalWeightV / totalWeight * (baseEntropy - newEntropy) if IV == 0.0: # 如果属性只有一个值,IV为0,为避免除数为0,给个很小的值 IV = 0.0000000001 gainRatio = gain / IV return gainRatio, bestPartValuei # 选择最好的数据集划分方式 def chooseBestFeatureToSplit(dataSet, labelProperty): """ type: (list, int) -> int, float :param dataSet: 样本集 :param labelProperty: 特征值类型,1 连续, 0 离散 :return: 最佳划分属性的索引和连续属性的划分值 """ numFeatures = len(labelProperty) # 特征数 bestInfoGainRatio = 0.0 bestFeature = -1 bestPartValue = None # 连续的特征值,最佳划分值 gainSum = 0.0 gainAvg = 0.0 for i in range(numFeatures): # 对每个特征循环 infoGain = calcGain(dataSet, i, labelProperty[i]) gainSum += infoGain gainAvg = gainSum / numFeatures for i in range(numFeatures): # 对每个特征循环 infoGainRatio, bestPartValuei = calcGainRatio(dataSet, i, labelProperty[i]) infoGain = calcGain(dataSet, i, labelProperty[i]) if infoGainRatio > bestInfoGainRatio and infoGain > gainAvg: # 取信息增益高于平均增益且信息增益率最大的特征 bestInfoGainRatio = infoGainRatio bestFeature = i bestPartValue = bestPartValuei return bestFeature, bestPartValue # 通过排序返回出现次数最多的类别 def majorityCnt(classList, weightList): classCount = {} for i in range(len(classList)): if classList[i] not in classCount.keys(): classCount[classList[i]] = 0.0 classCount[classList[i]] += round(float(weightList[i]),1) # python 2.7 # sortedClassCount = sorted(classCount.iteritems(), # key=operator.itemgetter(1), reverse=True) sortedClassCount = sorted(classCount.items(), key=operator.itemgetter(1), reverse=True) if len(sortedClassCount) == 1: return (sortedClassCount[0][0],sortedClassCount[0][1],0.0) return (sortedClassCount[0][0], sortedClassCount[0][1], sortedClassCount[1][1]) # 创建树, 样本集 特征 特征属性(0 离散, 1 连续) def createTree(dataSet, labels, labelProperty): classList = [example[-1] for example in dataSet] # 类别向量 weightList = [example[-2] for example in dataSet] # 权重向量 if classList.count(classList[0]) == len(classList): # 如果只有一个类别,返回 totalWeiht = calcTotalWeight(dataSet,0,True) return (classList[0], round(totalWeiht,1),0.0) #totalWeight = calcTotalWeight(dataSet, 0, True) if len(dataSet[0]) == 1: # 如果所有特征都被遍历完了,返回出现次数最多的类别 return majorityCnt(classList) bestFeat, bestPartValue = chooseBestFeatureToSplit(dataSet, labelProperty) # 最优分类特征的索引 if bestFeat == -1: # 如果无法选出最优分类特征,返回出现次数最多的类别 return majorityCnt(classList, weightList) if labelProperty[bestFeat] == 0: # 对离散的特征 bestFeatLabel = labels[bestFeat] myTree = {bestFeatLabel: {}} labelsNew = copy.copy(labels) labelPropertyNew = copy.copy(labelProperty) del (labelsNew[bestFeat]) # 已经选择的特征不再参与分类 del (labelPropertyNew[bestFeat]) featValues = [example[bestFeat] for example in dataSet] uniquevalue = set(featValues) # 该特征包含的所有值 uniquevalue.discard('N') for value in uniquevalue: # 对每个特征值,递归构建树 subLabels = labelsNew[:] subLabelProperty = labelPropertyNew[:] myTree[bestFeatLabel][value] = createTree( splitDataSetWithNull(dataSet, bestFeat, value), subLabels, subLabelProperty) else: # 对连续的特征,不删除该特征,分别构建左子树和右子树 bestFeatLabel = labels[bestFeat] + '<' + str(bestPartValue) myTree = {bestFeatLabel: {}} subLabels = labels[:] subLabelProperty = labelProperty[:] # 构建左子树 valueLeft = 'Y' myTree[bestFeatLabel][valueLeft] = createTree( splitDataSetWithNull(dataSet, bestFeat, bestPartValue, 'L'), subLabels, subLabelProperty) # 构建右子树 valueRight = 'N' myTree[bestFeatLabel][valueRight] = createTree( splitDataSetWithNull(dataSet, bestFeat, bestPartValue, 'R'), subLabels, subLabelProperty) return myTree # 测试算法 def classify(inputTree, classList, featLabels, featLabelProperties, testVec): firstStr = list(inputTree.keys())[0] # 根节点 firstLabel = firstStr lessIndex = str(firstStr).find('<') if lessIndex > -1: # 如果是连续型的特征 firstLabel = str(firstStr)[:lessIndex] secondDict = inputTree[firstStr] featIndex = featLabels.index(firstLabel) # 跟节点对应的特征 classLabel = {} for classI in classList: classLabel[classI] = 0.0 for key in secondDict.keys(): # 对每个分支循环 if featLabelProperties[featIndex] == 0: # 离散的特征 if testVec[featIndex] == key: # 测试样本进入某个分支 if type(secondDict[key]).__name__ == 'dict': # 该分支不是叶子节点,递归 classLabelSub = classify(secondDict[key], classList, featLabels, featLabelProperties, testVec) for classKey in classLabel.keys(): classLabel[classKey] += classLabelSub[classKey] else: # 如果是叶子, 返回结果 for classKey in classLabel.keys(): if classKey == secondDict[key][0]: classLabel[classKey] += secondDict[key][1] else: classLabel[classKey] += secondDict[key][2] elif testVec[featIndex] == 'N': # 如果测试样本的属性值缺失,则进入每个分支 if type(secondDict[key]).__name__ == 'dict': # 该分支不是叶子节点,递归 classLabelSub = classify(secondDict[key], classList, featLabels, featLabelProperties, testVec) for classKey in classLabel.keys(): classLabel[classKey] += classLabelSub[key] else: # 如果是叶子, 返回结果 for classKey in classLabel.keys(): if classKey == secondDict[key][0]: classLabel[classKey] += secondDict[key][1] else: classLabel[classKey] += secondDict[key][2] else: partValue = float(str(firstStr)[lessIndex + 1:]) if testVec[featIndex] == 'N': # 如果测试样本的属性值缺失,则对每个分支的结果加和 # 进入左子树 if type(secondDict[key]).__name__ == 'dict': # 该分支不是叶子节点,递归 classLabelSub = classify(secondDict[key], classList, featLabels, featLabelProperties, testVec) for classKey in classLabel.keys(): classLabel[classKey] += classLabelSub[classKey] else: # 如果是叶子, 返回结果 for classKey in classLabel.keys(): if classKey == secondDict[key][0]: classLabel[classKey] += secondDict[key][1] else: classLabel[classKey] += secondDict[key][2] elif float(testVec[featIndex]) <= partValue and key == 'Y': # 进入左子树 if type(secondDict['Y']).__name__ == 'dict': # 该分支不是叶子节点,递归 classLabelSub = classify(secondDict['Y'], classList, featLabels, featLabelProperties, testVec) for classKey in classLabel.keys(): classLabel[classKey] += classLabelSub[classKey] else: # 如果是叶子, 返回结果 for classKey in classLabel.keys(): if classKey == secondDict[key][0]: classLabel[classKey] += secondDict['Y'][1] else: classLabel[classKey] += secondDict['Y'][2] elif float(testVec[featIndex]) > partValue and key == 'N': if type(secondDict['N']).__name__ == 'dict': # 该分支不是叶子节点,递归 classLabelSub = classify(secondDict['N'], classList, featLabels, featLabelProperties, testVec) for classKey in classLabel.keys(): classLabel[classKey] += classLabelSub[classKey] else: # 如果是叶子, 返回结果 for classKey in classLabel.keys(): if classKey == secondDict[key][0]: classLabel[classKey] += secondDict['N'][1] else: classLabel[classKey] += secondDict['N'][2] return classLabel # 存储决策树 def storeTree(inputTree, filename): import pickle fw = open(filename, 'w') pickle.dump(inputTree, fw) fw.close() # 读取决策树, 文件不存在返回None def grabTree(filename): import pickle if os.path.isfile(filename): fr = open(filename) return pickle.load(fr) else: return None # 测试决策树正确率 def testing(myTree, classList, data_test, labels, labelProperties): error = 0.0 for i in range(len(data_test)): classLabelSet = classify(myTree, classList, labels, labelProperties, data_test[i]) maxWeight = 0.0 classLabel = '' for item in classLabelSet.items(): if item[1] > maxWeight: classLabel = item[0] if classLabel != data_test[i][-1]: error += 1 return float(error) # 测试投票节点正确率 def testingMajor(major, data_test): error = 0.0 for i in range(len(data_test)): if major[0] != data_test[i][-1]: error += 1 # print 'major %d' %error return float(error) # 后剪枝 def postPruningTree(inputTree, classSet, dataSet, data_test, labels, labelProperties): firstStr = list(inputTree.keys())[0] secondDict = inputTree[firstStr] classList = [example[-1] for example in dataSet] weightList = [example[-2] for example in dataSet] featkey = copy.deepcopy(firstStr) if '<' in firstStr: # 对连续的特征值,使用正则表达式获得特征标签和value featkey = re.compile("(.+<)").search(firstStr).group()[:-1] featvalue = float(re.compile("(<.+)").search(firstStr).group()[1:]) labelIndex = labels.index(featkey) temp_labels = copy.deepcopy(labels) temp_labelProperties = copy.deepcopy(labelProperties) if labelProperties[labelIndex] == 0: # 离散特征 del (labels[labelIndex]) del (labelProperties[labelIndex]) for key in secondDict.keys(): # 对每个分支 if type(secondDict[key]).__name__ == 'dict': # 如果不是叶子节点 if temp_labelProperties[labelIndex] == 0: # 离散的 subDataSet = splitDataSet(dataSet, labelIndex, key) subDataTest = splitDataSet(data_test, labelIndex, key) else: if key == 'Y': subDataSet = splitDataSet(dataSet, labelIndex, featvalue, 'L') subDataTest = splitDataSet(data_test, labelIndex, featvalue, 'L') else: subDataSet = splitDataSet(dataSet, labelIndex, featvalue, 'R') subDataTest = splitDataSet(data_test, labelIndex, featvalue, 'R') if len(subDataTest) > 0: inputTree[firstStr][key] = postPruningTree(secondDict[key], classSet, subDataSet, subDataTest, copy.deepcopy(labels), copy.deepcopy( labelProperties)) if testing(inputTree, classSet, data_test, temp_labels, temp_labelProperties) <= testingMajor(majorityCnt(classList, weightList), data_test): return inputTree return majorityCnt(classList,weightList)
可视化代码(treePlotter)
与源代码一致,无删改。
# -*- coding: cp936 -*- import matplotlib.pyplot as plt # 能够显示中文 plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['font.serif'] = ['SimHei'] # 设置决策节点和叶节点的边框形状、边距和透明度,以及箭头的形状 decisionNode = dict(boxstyle="square,pad=0.5", fc="0.9") leafNode = dict(boxstyle="round4, pad=0.5", fc="0.9") arrow_args = dict(arrowstyle="<-", connectionstyle="arc3", shrinkA=0, shrinkB=16) def plotNode(nodeTxt, centerPt, parentPt, nodeType): newTxt = nodeTxt if type(nodeTxt).__name__ == 'tuple': newTxt = nodeTxt[0] + 'n' for strI in nodeTxt[1:-1]: newTxt += str(strI) + ',' newTxt+= str(nodeTxt[-1]) createPlot.ax1.annotate(newTxt, xy=parentPt, xycoords='axes fraction', xytext=centerPt, textcoords='axes fraction', va="top", ha="center", bbox=nodeType, arrowprops=arrow_args) def getNumLeafs(myTree): numLeafs = 0 firstStr = list(myTree.keys())[0] secondDict = myTree[firstStr] for key in secondDict.keys(): if type(secondDict[key]).__name__ == 'dict': numLeafs += getNumLeafs(secondDict[key]) else: numLeafs += 1 return numLeafs def getTreeDepth(myTree): maxDepth = 0 firstStr = list(myTree.keys())[0] secondDict = myTree[firstStr] for key in secondDict.keys(): if type(secondDict[key]).__name__ == 'dict': thisDepth = 1 + getTreeDepth(secondDict[key]) else: thisDepth = 1 if thisDepth > maxDepth: maxDepth = thisDepth return maxDepth def retrieveTree(i): listOfTrees = [ {'no surfacing': {0: 'no', 1: {'flippers': {0: 'no', 1: 'yes'}}}}, {'no surfacing': {0: 'no', 1: { 'flippers': {0: {'head': {0: 'no', 1: 'yes'}}, 1: 'no'}}}} ] return listOfTrees[i] def plotMidText(cntrPt, parentPt, txtString): xMid = (parentPt[0] - cntrPt[0]) / 2.0 + cntrPt[0] yMid = (parentPt[1] - cntrPt[1]) / 2.0 + cntrPt[1] createPlot.ax1.text(xMid, yMid, txtString) def plotTree(myTree, parentPt, nodeTxt): numLeafs = getNumLeafs(myTree) depth = getTreeDepth(myTree) firstStr = list(myTree.keys())[0] cntrPt = (plotTree.xOff + (1 + float(numLeafs)) / 2.0 / plotTree.totalW, plotTree.yOff) plotMidText(cntrPt, parentPt, nodeTxt) plotNode(firstStr, cntrPt, parentPt, decisionNode) secondDict = myTree[firstStr] plotTree.yOff = plotTree.yOff - 1.0 / plotTree.totalD for key in secondDict.keys(): if type(secondDict[key]).__name__ == 'dict': plotTree(secondDict[key], cntrPt, str(key)) else: plotTree.xOff = plotTree.xOff + 1.0 / plotTree.totalW plotNode(secondDict[key], (plotTree.xOff, plotTree.yOff), cntrPt, leafNode) plotMidText((plotTree.xOff, plotTree.yOff), cntrPt, str(key)) plotTree.yOff = plotTree.yOff + 1.0 / plotTree.totalD def createPlot(inTree): fig = plt.figure(1, facecolor='white') fig.clf() axprops = dict(xticks=[], yticks=[]) createPlot.ax1 = plt.subplot(111, frameon=False, **axprops) plotTree.totalW = float(getNumLeafs(inTree)) plotTree.totalD = float(getTreeDepth(inTree)) + 0.5 plotTree.xOff = -0.5 / plotTree.totalW plotTree.yOff = 1.0 plotTree(inTree, (0.5, 1.0), '') plt.show() def createPlot0(): fig = plt.figure(1, facecolor='white') fig.clf() createPlot.ax1 = plt.subplot(111, frameon=False) # plotNode('决策节点', (0.5, 0.1), (0.1, 0.5), decisionNode) # plotNode('叶节点', (0.8, 0.1), (0.3, 0.75), leafNode) '''an1 = createPlot.ax1.annotate(unicode("决策节点", 'cp936'), xy=(0.5, 0.5), xycoords="data", va="center", ha="center", bbox=dict(box, fc="w")) createPlot.ax1.annotate(unicode('叶节点', 'cp936'), xytext=(0.2, 0.3), arrowprops=dict(arrow), xycoords=an1, textcoords='axes fraction', va="bottom", ha="left", bbox=leafNode)''' an1 = createPlot.ax1.annotate("Test 1", xy=(0.5, 0.5), xycoords="data", va="center", ha="center", bbox=dict(boxstyle="round", fc="w")) an2 = createPlot.ax1.annotate("Test 2", xy=(0, 0.5), xycoords=an1, # (1,0.5) of the an1's bbox xytext=(-50, -50), textcoords="offset points", va="center", ha="center", bbox=dict(boxstyle="round", fc="w"), arrowprops=dict(arrowstyle="<-")) plt.show() ''' an1 = createPlot.ax1.annotate(unicode('决策节点', 'cp936'), xy=(0.5, 0.6), xycoords='axes fraction', textcoords='axes fraction', va="bottom", ha="center", bbox=decisionNode) createPlot.ax1.annotate(unicode('叶节点', 'cp936'), xy=(0.8, 0.1), xycoords=an1, textcoords='axes fraction', va="bottom", ha="center", bbox=leafNode, arrowprops=arrow_args) ''' if __name__ == '__main__': createPlot0()
执行代码
import C45 import treePlotter # 读取数据文件 fr = open(r'C:Users静如止水DesktopC4.5决策树data.txt', encoding='utf-8') # 生成数据集 lDataSet = [inst.strip().split(' ') for inst in fr.readlines()] # 样本特征标签 labels = ['色泽', '根蒂', '敲击', '纹理', '脐部', '触感', '密度', '含糖率'] # 样本特征类型,0为离散,1为连续 labelProperties = [0, 0, 0, 0, 0, 0, 1, 1] # 类别向量 classList = ['是', '否'] # 验证集 dataSet_test = [['青绿', '蜷缩', '浊响', '清晰', '凹陷', '硬滑', '0.697', '0.460', '是'], ['乌黑', '蜷缩', '沉闷', '清晰', '凹陷', '硬滑', '0.774', '0.376', '是']] # 构建决策树 trees = C45.createTree(lDataSet, labels, labelProperties) # 绘制决策树 treePlotter.createPlot(trees) # 利用验证集对决策树剪枝 C45.postPruningTree(trees, classList, lDataSet, dataSet_test, labels, labelProperties) # 绘制剪枝后的决策树 treePlotter.createPlot(trees)实现效果
对照西瓜书
- 如何提取数据集dataSet(二维list)某一列:
dataLabelList = [x[index] for x in dataSet] # index表示要提取的列的下标
- 如何实现不重复地提取出list中的元素:可将其转换成集合set类型
dataLabelSet = set(dataLabelList)
- 如何删除二维list中的某一列:遍历每一行,先取这一行要删除的列的前面的所有数据,再取这一行要删除的列的后面的所有数据,最后加到一个新的list中去。
# 代码有删改 splitedDataSet = [] # 初始化已拆分的数据集splitedDataSet,类型为list for data in dataSet: # 遍历数据集的每一行 sliceTmp = data[:index] # 新建一个列表sliceTmp,先取这一行index前面的所有数据 sliceTmp.extend(data[index + 1:]) # 再取这一行index后面的所有数据 splitedDataSet.append(sliceTmp)
- 如何在matplotlib中显示中文:在前面加上这两行代码即可。
plt.rcParams['font.sans-serif'] = ['SimHei'] plt.rcParams['font.serif'] = ['SimHei']
参考书籍:周志华《机器学习》
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)