>>> from sklearn.ensemble import ExtraTreesClassifier
>>> from sklearn.datasets import load_iris
>>> from sklearn.feature_selection import SelectFromModel
>>> iris = load_iris()
>>> X, y = iris.data, iris.target
>>> X.shape
(150, 4)
>>> clf = ExtraTreesClassifier()
>>> clf = clf.fit(X, y)
>>> clf.feature_importances_
array([ 0.04..., 0.05..., 0.4..., 0.4...])
>>> model = SelectFromModel(clf, prefit=True)
>>> X_new = model.transform(X)
>>> X_new.shape
(150, 2)
在spark 2.0之后,mllib的决策树算法都引入了计算特征重要性的方法featureImportances,而随机森林算法(RandomForestRegressionModel和RandomForestClassificationModel类)和gbdt算法(GBTClassificationModel和GBTRegressionModel类)均利用决策树算法中计算特征不纯度和特征重要性的方法来得到所使用模型的特征重要性。
而这些集成方法的实现类都集成了TreeEnsembleModel[M <: DecisionTreeModel]这个特质(trait),即featureImportances是在该特质中实现的。
- 针对每一棵决策树而言,特征j的重要性指标为所有通过特征j进行划分的树结点的增益的和
- 将一棵树的特征重要性归一化到1
- 将集成模型的特征重要性向量归一化到1
def featureImportances[M <: DecisionTreeModel](trees: Array[M], numFeatures: Int): Vector = {
val totalImportances = new OpenHashMap[Int, Double]()
// 针对每一棵决策树模型进行遍历
trees.foreach { tree =>
// Aggregate feature importance vector for this tree
val importances = new OpenHashMap[Int, Double]()
// 从根节点开始,遍历整棵树的中间节点,将同一特征的特征重要性累加起来
computeFeatureImportance(tree.rootNode, importances)
// Normalize importance vector for this tree, and add it to total.
// TODO: In the future, also support normalizing by tree.rootNode.impurityStats.count?
// 将一棵树的特征重要性进行归一化
val treeNorm = importances.map(_._2).sum
if (treeNorm != 0) {
importances.foreach { case (idx, impt) =>
val normImpt = impt / treeNorm
totalImportances.changeValue(idx, normImpt, _ + normImpt)
// Normalize importances
// 归一化总体的特征重要性
// Construct vector
// 构建最终输出的特征重要性向量
val d = if (numFeatures != -1) {
} else {
// Find max feature index used in trees
val maxFeatureIndex = trees.map(_.maxSplitFeatureIndex()).max
maxFeatureIndex + 1
if (d == 0) {
assert(totalImportances.size == 0, s"Unknown error in computing feature" +
s" importance: No splits found, but some non-zero importances.")
val (indices, values) = totalImportances.iterator.toSeq.sortBy(_._1).unzip
Vectors.sparse(d, indices.toArray, values.toArray)
// 这是计算一棵决策树特征重要性的递归方法
def computeFeatureImportance(
node: Node,
importances: OpenHashMap[Int, Double]): Unit = {
node match {
// 如果是中间节点,即进行特征划分的节点
case n: InternalNode =>
// 得到特征标记
val feature = n.split.featureIndex
// 计算得到比例化的特征增益值,信息增益乘上该节点使用的训练数据数量
val scaledGain = n.gain * n.impurityStats.count
importances.changeValue(feature, scaledGain, _ + scaledGain)
// 前序遍历二叉决策树
computeFeatureImportance(n.leftChild, importances)
computeFeatureImportance(n.rightChild, importances)
case n: LeafNode =>
// do nothing
def getCalculator(impurity: String, stats: Array[Double]): ImpurityCalculator = {
impurity match {
case "gini" => new GiniCalculator(stats)
case "entropy" => new EntropyCalculator(stats)
case "variance" => new VarianceCalculator(stats)
case _ =>
throw new IllegalArgumentException(
s"ImpurityCalculator builder did not recognize impurity type: $impurity")
override def calculate(counts: Array[Double], totalCount: Double): Double = {
if (totalCount == 0) {
return 0
val numClasses = counts.length
var impurity = 1.0
var classIndex = 0
while (classIndex < numClasses) {
val freq = counts(classIndex) / totalCount
impurity -= freq * freq
classIndex += 1
互信息可以看成是一个随机变量中包含的关于另一个随机变量的信息量,或者说是一个随机变量由于已知另一个随机变量而减少的不确定性。互信息本来是信息论中的一个概念,用于表示信息之间的关系, 是两个随机变量统计相关性的测度。

作为一个特例,变量之间的相关性(correlation)可以用统计学的依赖关系(dependency)来替代,而互信息(mutual information)是一种评价该依赖关系的度量方法。


- 将数据进行处理转换的过程(注:为了计算两个特征的联合分布和边缘分布,需要将数据归一化到[0,255]之间,并且将每一维特征使用合理的数据结构进行存储)
- 计算特征之间、特征与响应变量之间的分布及互信息
- 对特征进行mrmr得分,并进行排序

private[feature] def run(
data: RDD[LabeledPoint],
nToSelect: Int,
numPartitions: Int) = {
val nPart = if(numPartitions == 0) data.context.getConf.getInt(
"spark.default.parallelism", 500) else numPartitions
val requireByteValues = (l: Double, v: Vector) => {
val values = v match {
case SparseVector(size, indices, values) =>
case DenseVector(values) =>
val condition = (value: Double) => value <= 255 &&
value >= 0
if (!values.forall(condition(_)) || !condition(l)) {
throw new SparkException(s"Info-Theoretic Framework requires positive values in range [0, 255]")
val nAllFeatures = data.first.features.size + 1
// 将数据排列成栏状,其实是为每个数据都编上号
val columnarData: RDD[(Long, Short)] = data.zipWithIndex().flatMap ({
case (LabeledPoint(label, values: SparseVector), r) =>
requireByteValues(label, values)
// Not implemented yet!
throw new NotImplementedError()
case (LabeledPoint(label, values: DenseVector), r) =>
requireByteValues(label, values)
val rindex = r * nAllFeatures
val inputs = for(i <- 0 until values.size) yield (rindex + i, values(i).toShort)
val output = Array((rindex + values.size, label.toShort))
inputs ++ output
}).sortByKey(numPartitions = nPart) // put numPartitions parameter
require(nToSelect < nAllFeatures)
// 计算mrmr过程及对特征进行排序
val selected = selectFeatures(columnarData, nToSelect, nAllFeatures)
// Print best features according to the mRMR measure
val out = selected.map{case F(feat, rel) => (feat + 1) + "\t" + "%.4f".format(rel)}.mkString("\n")
println("\n*** mRMR features ***\nFeature\tScore\n" + out)
// Features must be sorted
new SelectorModel(selected.map{case F(feat, rel) => feat}.sorted.toArray)
* Perform a info-theory selection process.
* @param data Columnar data (last element is the class attribute).
* @param nToSelect Number of features to select.
* @param nFeatures Number of total features in the dataset.
* @return A list with the most relevant features and its scores.
private[feature] def selectFeatures(
data: RDD[(Long, Short)],
nToSelect: Int,
nFeatures: Int) = {
// 特征的下标
val label = nFeatures - 1
// 因为data是(编号,每个特征),所以这是数据数量
val nInstances = data.count() / nFeatures
// 将同一类特征放在一起,根据同一key进行分组,然后取出最大值加1(用于后续构建分布直方图的参数)
val counterByKey = data.map({ case (k, v) => (k % nFeatures).toInt -> v})
.distinct().groupByKey().mapValues(_.max + 1).collectAsMap().toMap
// calculate relevance
val MiAndCmi = IT.computeMI(
data, 0 until label, label, nInstances, nFeatures, counterByKey)
// 互信息池,用于mrmr判定,pool是(feat, Mrmr)
var pool = MiAndCmi.map{case (x, mi) => (x, new MrmrCriterion(mi))}
// Print most relevant features
val strRels = MiAndCmi.collect().sortBy(-_._2)
.map({case (f, mi) => (f + 1) + "\t" + "%.4f" format mi})
println("\n*** MaxRel features ***\nFeature\tScore\n" + strRels)
// get maximum and select it
// 得到了分数最高的那个特征及其mrmr
val firstMax = pool.maxBy(_._2.score)
var selected = Seq(F(firstMax._1, firstMax._2.score))
// 将firstMax对应的key从pool这个map中去掉
pool = pool - firstMax._1
while (selected.size < nToSelect) {
// update pool
val newMiAndCmi = IT.computeMI(data, pool.keys.toSeq,
selected.head.feat, nInstances, nFeatures, counterByKey)
.map({ case (x, crit) => (x, crit) })
pool.foreach({ case (k, crit) =>
// 从pool里拿出第k个特征,然后从newMiAndCmi中得到对应的mi
newMiAndCmi.get(k) match {
case Some(_) => crit.update(_)
case None =>
// get maximum and save it
val max = pool.maxBy(_._2.score)
// select the best feature and remove from the whole set of features
selected = F(max._1, max._2.score) +: selected
pool = pool - max._1
* Method that calculates mutual information (MI) and conditional mutual information (CMI)
* simultaneously for several variables. Indexes must be disjoint.
* @param rawData RDD of data (first element is the class attribute)
* @param varX Indexes of primary variables (must be disjoint with Y and Z)
* @param varY Indexes of secondary variable (must be disjoint with X and Z)
* @param nInstances Number of instances
* @param nFeatures Number of features (including output ones)
* @return RDD of (primary var, (MI, CMI))
def computeMI(
rawData: RDD[(Long, Short)],
varX: Seq[Int],
varY: Int,
nInstances: Long,
nFeatures: Int,
counter: Map[Int, Int]) = {
// Pre-requisites
require(varX.size > 0)
// Broadcast variables
val sc = rawData.context
val label = nFeatures - 1
// A boolean vector that indicates the variables involved on this computation
// 对应每个数据不同维度的特征的一个boolean数组
val fselected = Array.ofDim[Boolean](nFeatures)
fselected(varY) = true // output feature
varX.map(fselected(_) = true) // 将fselected置为true
val bFeatSelected = sc.broadcast(fselected)
val getFeat = (k: Long) => (k % nFeatures).toInt
// Filter data by these variables
// 根据bFeatSelected来过滤rawData
val data = rawData.filter({ case (k, _) => bFeatSelected.value(getFeat(k))})
// Broadcast Y vector
val yCol: Array[Short] = if(varY == label){
// classCol corresponds with output attribute, which is re-used in the iteration
classCol = data.filter({ case (k, _) => getFeat(k) == varY}).values.collect()
} else {
data.filter({ case (k, _) => getFeat(k) == varY}).values.collect()
// data是所有选择维度的特征,(varY, yCol)是y所在的列和y值数组
// 生成特征与y的对应关系的直方图
val histograms = computeHistograms(data, (varY, yCol), nFeatures, counter)
// 这里只是对数据规约成占比的特征和目标变量的联合分布
val jointTable = histograms.mapValues(_.map(_.toFloat / nInstances))
// sum(h(*, ::))计算每一行数据之和
val marginalTable = jointTable.mapValues(h => sum(h(*, ::)).toDenseVector)
// If y corresponds with output feature, we save for CMI computation
if(varY == label) {
marginalProb = marginalTable.cache()
jointProb = jointTable.cache()
val yProb = marginalTable.lookup(varY)(0)
// Remove output feature from the computations
val fdata = histograms.filter{case (k, _) => k != label}
// fdata是特征与y的联合分布,yProb是一个值
computeMutualInfo(fdata, yProb, nInstances)
private def computeHistograms(
data: RDD[(Long, Short)],
yCol: (Int, Array[Short]),
nFeatures: Long,
counter: Map[Int, Int]) = {
val maxSize = 256
val byCol = data.context.broadcast(yCol._2)
val bCounter = data.context.broadcast(counter)
// 得到y的最大值
val ys = counter.getOrElse(yCol._1, maxSize).toInt
// mapPartitions是对rdd每个分区进行操作,it为分区迭代器
// map得到的是(feature, matrix)的Map
data.mapPartitions({ it =>
var result = Map.empty[Int, BDM[Long]]
for((k, x) <- it) {
val feat = (k % nFeatures).toInt; val inst = (k / nFeatures).toInt
// 取得具体特征的最大值
val xs = bCounter.value.getOrElse(feat, maxSize).toInt
val m = result.getOrElse(feat, BDM.zeros[Long](xs, ys)) // 创建(xMax,yMax)的矩阵
m(x, byCol.value(inst)) += 1
result += feat -> m
}).reduceByKey(_ + _)
private def computeMutualInfo(
data: RDD[(Int, BDM[Long])],
yProb: BDV[Float],
n: Long) = {
val byProb = data.context.broadcast(yProb)
data.mapValues({ m =>
var mi = 0.0d
// Aggregate by row (x)
val xProb = sum(m(*, ::)).map(_.toFloat / n)
for(i <- 0 until m.rows){
for(j <- 0 until m.cols){
val pxy = m(i, j).toFloat / n
val py = byProb.value(j); val px = xProb(i)
if(pxy != 0 && px != 0 && py != 0) // To avoid NaNs
// I(x,y) = sum[p(x,y)log(p(x,y)/(p(x)p(y)))]
mi += pxy * (math.log(pxy / (px * py)) / math.log(2))
- 稀疏向量内积乘法运算速度快,计算结果方便存储,容易扩展。
- 离散化后的特征对异常数据有很强的鲁棒性:比如一个特征是年龄>30是1,否则0。如果特征没有离散化,一个异常数据“年龄300岁”会给模型造成很大的干扰。
- 逻辑回归属于广义线性模型,表达能力受限;单变量离散化为N个后,每个变量有单独的权重,相当于为模型引入了非线性,能够提升模型表达能力,加大拟合。
- 离散化后可以进行特征交叉,由M+N个变量变为M*N个变量,进一步引入非线性,提升表达能力。
- 特征离散化后,模型会更稳定,比如如果对用户年龄离散化,20-30作为一个区间,不会因为一个用户年龄长了一岁就变成一个完全不同的人。当然处于区间相邻处的样本会刚好相反,所以怎么划分区间是门学问。
李沐少帅指出,模型是使用离散特征还是连续特征,其实是一个“海量离散特征+简单模型” 同 “少量连续特征+复杂模型”的权衡。既可以离散化用线性模型,也可以用连续特征加深度学习。
