Apache Spark机器学习初级教程

  本文探讨使用Apache Spark对大数据集进行机器学习训练的方式,当我们创建一个机器学习模型后,需要数据进行训练和实践,但是数据量非常大,当前机器内存加载不了这么多数据,那么我们只能使用分布式计算工具如Hadoop和Apache Spark以很多机器集群方式进行计算,Apache Spark能够在本地机器单独处理数据,而且在数据量很大超过单个机器内存时以分布式集群方式运行构建你的模型。这里以对34.6G的数据量使用Spark创建二分类模型( binary classification model )为案例。可以在你的笔记本上直接运行,配置只需要4-8g内存和50G以上磁盘空间。

1. 输入数据获得预期结果

  首先我们从https://archive.org/details/stackexchange(种子文件)下载stackoverflow网站的Postx.xml文件为例,大概有34G未解压数据,70%是网站的问题解答部分。其XML文件格式是:

  1. Title标题 – 帖子标题
  2. Body内容 – 帖子内容
  3. Tags标签 – 帖子的表情列表
  4. 10+更多其他我们不需要的属性

这里有一个127M的小stackoverflow.com Posts.xml文件:Posts.smal.xml

小文件对于模型训练不是很好,但是对于代码调试还是很方便的。

我们的目标是创建一个可预期的模型,基于帖子内容和标题产生帖子的标签tag,为了简化问题,我们集中在帖子标题和内容两个字段上,并作为一个文本列表项。这个目标的工作原理类似于stackoverflow.com 网站,当一个用户提出问题后,网站会自动给出标签分类建议。

 

2.二进制Binary 和多标识 multi-label分类

   标签分类属于多标识分类,因为模型会预期产生许多类别,彼此并不排斥,比如同样的文本会被分类为“Java”和“多线程”两个类别。

   为例简化问题降低代码量,取代训练一个多标识分类器,我们将使用训练一个简单的二进制分类器来得到一个标签Tag,比如"Java"标签就会产生于有关Java语言的帖子中。

 

3. 以单机方式设置运行Apache Spark

   使用1.5.1以上Spark版本,可从http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz下载。Windows用户从 http://nishutayaltech.blogspot.in/2015/04/how-to-run-apache-spark-on-windows7-in.html下载。

运行Spark Master:

sbin/start-master.sh

运行Spark slave:

sbin/start-slaves.sh

运行Spark Shell:

bin/spark-shell

 

4. 导入库包

   对于我们这个端到端实战场景,我们需要使用Scala,导入库包:

// General purpose library
import scala.xml._

// Spark data manipulation libraries
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Spark machine learning libraries
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.ml.Pipeline

 

5.分析XML

   我们需要从XML文件中分解出内容和标题,做一些简单的加工和去除:

val fileName = "Posts.small.xml" 
val textFile = sc.textFile(fileName) 
val postsXml = textFile.map(_.trim). 
                    filter(!_.startsWith("<?xml version=")). 
                    filter(_ != "<posts>"). 
                    filter(_ != "</posts>") 

Spark已经有非常棒的功能分析JSON和CSV格式数据,对于XML我们则需要额外编写一些代码。

下面是对XML字符进行一些转换,去除不必要的字符:

val postsRDD = postsXml.map { s => 
            val xml = XML.loadString(s) 
 
            val id = (xml \ "@Id").text 
            val tags = (xml \ "@Tags").text 
 
            val title = (xml \ "@Title").text 
            val body = (xml \ "@Body").text 
            val bodyPlain = ("<\\S+>".r).replaceAllIn(body, " ") 
            val text = (title + " " + bodyPlain).replaceAll("\n", " ").replaceAll("( )+", " "); 
 
            Row(id, tags, text) 
        } 

为了创建一个data-frame,,schema必须应用到RDD:

val schemaString = "Id Tags Text" 
val schema = StructType( 
      schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) 
 
val postsDf = sqlContext.createDataFrame(postsRDD, schema) 

现在你可以看到你的data frame:

postsDf.show()

 

6. 准备训练和测试数据集

   现在我们为二分器创建一个二进制的标识label,我们使用“java”作为当前案例二分器预期产生的标识,也就是说,希望训练这个二分器产生"java"这样的标签分类结果。所有java标签的行都被标记为“1”,而没有的则为"0",下面是实现这个目标的代码:

val targetTag = "java"
val myudf: (String => Double) = (str: String) => {if (str.contains(targetTag)) 1.0 else 0.0}
val sqlfunc = udf(myudf)
val postsLabeled = postsDf.withColumn("Label", sqlfunc(col("Tags")) ) 

数据集能够被新的标识划分为正和负两种子集:

val positive = postsLabeled.filter('Label > 0.0) 
val negative = postsLabeled.filter('Label < 1.0) 

我们将使用数据的90%作为模型训练,而10%作为测试数据集,通过取样正负单独数据集来创建一个训练数据集。

val positiveTrain = positive.sample(false, 0.9) 
val negativeTrain = negative.sample(false, 0.9) 
val training = positiveTrain.unionAll(negativeTrain) 

测试数据集包括所有没有被包括在训练数据集中的记录,正负样本分离:

val negativeTrainTmp = negativeTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag) 
val negativeTest = negative.join( negativeTrainTmp, negative("Id") === negativeTrainTmp("Id"), "LeftOuter"). 
                            filter("Flag is null").select(negative("Id"), 'Tags, 'Text, 'Label) 
val positiveTrainTmp = positiveTrain.withColumnRenamed("Label", "Flag").select('Id, 'Flag) 
val positiveTest = positive.join( positiveTrainTmp, positive("Id") === positiveTrainTmp("Id"), "LeftOuter"). 
                            filter("Flag is null").select(positive("Id"), 'Tags, 'Text, 'Label) 
val testing = negativeTest.unionAll(positiveTest) 

 

7. 训练模型

   我们需要确定下面训练参数:

  • 特征的数量 Number of features
  • 回归参数Regression parameters
  • 梯度下降的世代数Number of epoch for gradient decent

Spark API创建一个来自data-frame和训练参数的列项的模型:

val numFeatures = 64000 
val numEpochs = 30 
val regParam = 0.02 
 
val tokenizer = new Tokenizer().setInputCol("Text").setOutputCol("Words") 
val hashingTF = new  org.apache.spark.ml.feature.HashingTF().setNumFeatures(numFeatures). 
          setInputCol(tokenizer.getOutputCol).setOutputCol("Features") 
val lr = new LogisticRegression().setMaxIter(numEpochs).setRegParam(regParam). 
                                    setFeaturesCol("Features").setLabelCol("Label"). 
                                    setRawPredictionCol("Score").setPredictionCol("Prediction") 
val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) 
 
val model = pipeline.fit(training 

 

8. 测试模型:

对于"Java"的分类结果会产生0.0或1.0数值:

val testTitle = "Easiest way to merge a release into one JAR file"
val testBoby = """Is there a tool or script which easily merges a bunch of 
                   href="http://en.wikipedia.org/wiki/JAR_%28file_format%29"
                   JAR files into one JAR file? A bonus would be to easily set the main-file manifest 
                   and make it executable. I would like to run it with something like:
                  As far as I can tell, it has no dependencies which indicates that it shouldn't be an easy 
                  single-file tool, but the downloaded ZIP file contains a lot of libraries."""
val testText = testTitle + testBody
val testDF = sqlContext.createDataFrame(Seq( (99.0, testText))).toDF("Label", "Text")
val result = model.transform(testDF)
val prediction = result.collect()(0)(6).asInstanceOf[Double]
print("Prediction: "+ prediction) 

让我们基于训练数据集聘雇一下模型质量:

val testingResult = model.transform(testing) 
val testingResultScores = testingResult.select("Prediction", "Label").rdd. 
                                    map(r => (r(0).asInstanceOf[Double], r(1).asInstanceOf[Double])) 
val bc = new BinaryClassificationMetrics(testingResultScores) 
val roc = bc.areaUnderROC
print("Area under the ROC:" + roc) 

如果我们使用的是小的数据集,那么这个模型质量也许不是最好,50%左右代表很差,而如果使用整个Posts.xml数据集,结果就不会这么差,在ROC以下区域是0.64。

以上Scala代码下载:spark_ml.scala

 

Spark - 大数据Big Data处理框架

机器学习流行算法一览

分解和组合的机器学习

Java机器学习软件介绍

Cassandra+Akka+Spark分布式机器学习架构

更多机器学习专题