尽管Spark ML管道提供了各种各样的算法,你仍可能想要额外的功能,并且不脱离管道模型。在Spark Mllib中,这算不上什么问题,你可以通过RDD的变换来实现你自己的算法,并继续下去。对于Spark ML 管道来说,同样的方法是可行的,但是我们会失去一些管道所具备的优良特性,包括自动执行元算法的能力,例如交叉验证的参数搜索。在本文中,你会从标准的wordcount例子入手(在大数据方面,你是不可能真正躲开wordcount例子的),了解到如何扩展Spark ML 管道模型。
为了将你自己的算法加入Spark管道中来,你需要实现Estimator或者是Transformer,它们都实现了PipelineStage接口。对于那些不需要训练的算法,你可以实现Transformer接口,而对于那些需要训练的算法,你需要实现Estimator接口,它们都定义在org.apache.spark.ml下(都实现了基类 PipelineStage)。要说明的是,训练并不是只限于复杂的机器学习模型,即使是最大最小值区间缩放器也需要训练来确定范围。如果你的算法需要训练,它们必须以Estimator来构建而不是Transformer。
注:直接使用PipelineStage是不可行的,因为管道内部使用了反射机制,假定了所有的管道stage要么是一个Estimator,要么就是Transformer。
除了显而易见的transform和fit方法,所有的管道的stage需要提供transformSchema,以及一个copy构造器或者实现一个可以为你提供这些功能的类。copy是用来制作一个当前stage的拷贝,合并入任何新指定的参数,可以简称为defaultCopy(除非你的类对构造器有特别的考虑)。
class HardCodedWordCountStage(override val uid: String) extends Transformer {
def this() = this(Identifiable.randomUID(“hardcodedwordcount”))
def copy(extra: ParamMap): HardCodedWordCountStage = {
defaultCopy(extra)
}
一个管道stage的起始以及拷贝代理如下:transformSchema 必须基于任何参数和一个输入模式产生你的管道stage的期望输出。考虑到已有字段可能会被使用到,大部分管道stage只增加新的字段,很少的一些会去掉之前的一些字段。这有时候会导致输出的结果包含比下游所需的数据多,反而会降低性能。如果发现你的管道中有这样的问题,那么你可以创建你自己的stage来去掉不需要的字段。
除了产生输出模式之外,transformSchema 方法还应该验证输入模式是否适合于该stage(例如,输入列是否是期望的类型)。
这里也是你应该对stage的参数进行验证的地方。一个简单的输入为字符串输出为向量的并且写死编码的输出和输入列的transformSchema如下所示:
override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex(“happy_pandas”)
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)
}
// Add the return field
schema.add(StructField(“happy_panda_counts”, IntegerType, false))
}
不需要训练的算法可以通过Transformer接口非常容易地实现。由于这是最简单的管道stage,你可以从实现一个简单的transformer开始,计算在输入列中单词的数量。
def transform(df: Dataset[_]): DataFrame = {
val wordcount = udf { in: String => in.split(” “).size }
df.select(col(“*”),
wordcount(df.col(“happy_pandas”)).as(“happy_panda_counts”))
}
为了获得大部分的管道接口,你可能会想要使你的管道stage可以通过参数接口来达到可配置化。
尽管参数接口是公开的,不幸的是,常用的Spark中的默认参数都是私有的,所以你最后不得不写大段重复的代码。除了允许用户指定的值,参数也可以包含一些基本的验证逻辑(例如,正则化的参数必须是一个非负值)。两个最常用的参数是输入列和输出列,可以十分简单地加到你的模型上去。
除了字符串参数,其他的类型也可以使用。包括字符串列表来接收停止词,或浮点数来接收停止词。
class ConfigurableWordCount(override val uid: String) extends Transformer {
final val inputCol= new Param[String](this, “inputCol”, “The input column”)
final val outputCol = new Param[String](this, “outputCol”, “The output column”)
; def setInputCol(value: String): this.type = set(inputCol, value)
def setOutputCol(value: String): this.type = set(outputCol, value)
def this() = this(Identifiable.randomUID(“configurablewordcount”))
def copy(extra: ParamMap): HardCodedWordCountStage = {
defaultCopy(extra)
}
override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}
def transform(df: Dataset[_]): DataFrame = {
val wordcount = udf { in: String => in.split(” “).size }
df.select(col(“*”), wordcount(df.col($(inputCol))).as($(outputCol)))
}
}
不需要训练的算法可以通过Estimator接口来实现,尽管对于许多算法而言, org.apache.spark.ml.Predictor 或者 org.apache.spark.ml.classificationClassifier 这些帮助类更容易实现。Estimator 和 Transformer接口的主要区别是,它不再直接在输入上进行变换操作,而是会首先在一个train 方法里面进行一个步骤——训练。一个字符串索引器是你可以实现的最简单的estimator之一。尽管在Spark中可以直接使用了,它仍然是用于说明如何使用estimator接口的非常好的例子。
trait SimpleIndexerParams extends Params {
final val inputCol= new Param[String](this, “inputCol”, “The input column”)
final val outputCol = new Param[String](this, “outputCol”, “The output column”)
}
class SimpleIndexer(override val uid: String) extends Estimator[SimpleIndexerModel] with SimpleIndexerParams {
def setInputCol(value: String) = set(inputCol, value)
def setOutputCol(value: String) = set(outputCol, value)
def this() = this(Identifiable.randomUID(“simpleindexer”))
override def copy(extra: ParamMap): SimpleIndexer = {
defaultCopy(extra)
}
override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}
override def fit(dataset: Dataset[_]): SimpleIndexerModel = {
import dataset.sparkSession.implicits._
val words = dataset.select(dataset($(inputCol)).as[String]).distinct
.collect()
new SimpleIndexerModel(uid, words)
; }
}
class SimpleIndexerModel(
override val uid: String, words: Array[String]) extends Model[SimpleIndexerModel] with SimpleIndexerParams {
override def copy(extra: ParamMap): SimpleIndexerModel = {
defaultCopy(extra)
}
private val labelToIndex: Map[String, Double] = words.zipWithIndex.
map{case (x, y) => (x, y.toDouble)}.toMap
override def transformSchema(schema: StructType): StructType = {
// Check that the input type is a string
val idx = schema.fieldIndex($(inputCol))
val field = schema.fields(idx)
if (field.dataType != StringType) {
throw new Exception(s”Input type ${field.dataType} did not match input type StringType”)
}
// Add the return field
schema.add(StructField($(outputCol), IntegerType, false))
}
override def transform(dataset: Dataset[_]): DataFrame = {
val indexer = udf { label: String => labelToIndex(label) }
dataset.select(col(“*”),
indexer(dataset($(inputCol)).cast(StringType)).as($(outputCol)))
}
}
如果你正在实现一个迭代算法,你可能希望将还没有缓存的输入数据缓存起来,或者允许用户来指定一个持久化等级。
Predictor 接口增加了两个最常用的参数(输入和输出列)作为标记列、特征列和预测列——并且自动地帮我们处理模式的变换。
Classifier 接口基本上如出一辙,除了它还增加了一个rawPredictionColumn ,并且提供了工具来检测类别的数量(getNumClasses方法)以及将输入的 DataFrame 转化为一个LabeledPoints的RDD(使其更容易来封装传统的Mllib分类算法)。
如果你正在实现一个回归或者聚类接口,目前没有公开的基本接口可以使用,所以你需要使用通用的Estimator接口。
// Simple Bernouli Naive Bayes classifier – no sanity checks for brevity
// Example only – not for production use.
class SimpleNaiveBayes(val uid: String)
extends Classifier[Vector, SimpleNaiveBayes, SimpleNaiveBayesModel] {
def this() = this(Identifiable.randomUID(“simple-naive-bayes”))
override def train(ds: Dataset[_]): SimpleNaiveBayesModel = {
import ds.sparkSession.implicits._
ds.cache()
// Note: you can use getNumClasses and extractLabeledPoints to get an RDD instead
// Using the RDD approach is common when integrating with legacy machine learning code
// or iterative algorithms which can create large query plans.
// Here we use Datasets since neither of those apply.
// Compute the number of documents
val numDocs = ds.count
// Get the number of classes.
// Note this estimator assumes they start at 0 and go to numClasses
val numClasses = getNumClasses(ds)
// Get the number of features by peaking at the first row
val numFeatures: Integer = ds.select(col($(featuresCol))).head
.get(0).asInstanceOf[Vector].size
// Determine the number of records for each class
val groupedByLabel = ds.select(col($(labelCol)).as[Double]).groupByKey(x => x)
val classCounts = groupedByLabel.agg(count(“*”).as[Long])
.sort(col(“value”)).collect().toMap
// Select the labels and features so we can more easily map over them.
// Note: we do this as a DataFrame using the untyped API because the Vector
// UDT is no longer public.
val df = ds.select(col($(labelCol)).cast(DoubleType), col($(featuresCol)))
// Figure out the non-zero frequency of each feature for each label and
// output label index pairs using a case clas to make it easier to work with.
val labelCounts: Dataset[LabeledToken] = df.flatMap {
case Row(label: Double, features: Vector) =>
features.toArray.zip(Stream from 1)
.filter{vIdx => vIdx._2 == 1.0}
.map{case (v, idx) => LabeledToken(label, idx)}
}
// Use the typed Dataset aggregation API to count the number of non-zero
// features for each label-feature index.
val aggregatedCounts: Array[((Double, Integer), Long)] = labelCounts
.groupByKey(x => (x.label, x.index))
.agg(count(“*”).as[Long]).collect()
val theta = Array.fill(numClasses)(new Array[Double](numFeatures))
// Compute the denominator for the general prioirs
val piLogDenom = math.log(numDocs + numClasses)
// Compute the priors for each class
val pi = classCounts.map{case(_, cc) =>
math.log(cc.toDouble) – piLogDenom }.toArray
// For each label/feature update the probabilities
aggregatedCounts.foreach{case ((label, featureIndex), count) =>
// log of number of documents for this label + 2.0 (smoothing)
val thetaLogDenom = math.log(
classCounts.get(label).map(_.toDouble).getOrElse(0.0) + 2.0)
theta(label.toInt)(featureIndex) = math.log(count + 1.0) – thetaLogDenom
}
// Unpersist now that we are done computing everything
ds.unpersist()
// Construct a model
new SimpleNaiveBayesModel(uid, numClasses, numFeatures, Vectors.dense(pi),
new DenseMatrix(numClasses, theta(0).length, theta.flatten, true))
}
override def copy(extra: ParamMap) = {
defaultCopy(extra)
}
}
// Simplified Naive Bayes Model
case class SimpleNaiveBayesModel(
override val uid: String,
override val numClasses: Int,
override val numFeatures: Int,
val pi: Vector,
val theta: DenseMatrix) extends
ClassificationModel[Vector, SimpleNaiveBayesModel] {
override def copy(extra: ParamMap) = {
defaultCopy(extra)
}
// We have to do some tricks here because we are using Spark’s
// Vector/DenseMatrix calculations – but for your own model don’t feel
// limited to Spark’s native ones.
val negThetaArray = theta.values.map(v => math.log(1.0 – math.exp(v)))
val negTheta = new DenseMatrix(numClasses, numFeatures, negThetaArray, true)
val thetaMinusNegThetaArray = theta.values.zip(negThetaArray)
.map{case (v, nv) => v – nv}
val thetaMinusNegTheta = new DenseMatrix(
numClasses, numFeatures, thetaMinusNegThetaArray, true)
val onesVec = Vectors.dense(Array.fill(theta.numCols)(1.0))
val negThetaSum: Array[Double] = negTheta.multiply(onesVec).toArray
// Here is the prediciton functionality you need to implement – for ClassificationModels
// transform automatically wraps this – but if you might benefit from broadcasting your model or
// other optimizations you can also override transform.
def predictRaw(features: Vector): Vector = {
// Toy implementation – use BLAS or similar instead
// the summing of the three vectors but the functionality isn’t exposed.
Vectors.dense(thetaMinusNegTheta.multiply(features).toArray.zip(pi.toArray)
.map{case (x, y) => x + y}.zip(negThetaSum).map{case (x, y) => x + y}
)
}
}
注:如果你只是需要修改一个已有的算法,你可以(通过假装在org.apache.spark项目中来)扩展它。
现在你知道如何用你自己的管道stage来扩展Spark的ML管道API。如果你找不到头绪,一个好的参考是Spark本身内部的算法。尽管有时候使用了内部的API,但是大部分情况下它们实现公开接口的方式与你想要做的是同样的。
更多内容可以参考Strata北京2017的相关议题。