Spark的新的ALPHA结构化流计算API已经引起了广泛的兴趣。因为它把Dataset、DataFrame和SQL的API都引入了流计算上下文。然而在这个初始版本的结构化流计算里面,机器学习的API并没有被集成进来。但这并没有阻止我们愉快地去探索集成这两个部分(请注意,这里介绍的工作都是探索性的,未来的版本里有可能会变化)。
为了集成结构化流计算和机器学习,我们启动了一个初步的概念验证项目。项目结果可以在spark-structured-streaming-ml库里找到。如果你对支持结构化流计算的Spark ML管道感兴趣,我希望你关注SPARK-16424改进项目,并对我们的早期设计版本提出你的意见和想法。
能够依赖于结构化流计算实现的最简单的流计算机器学习算法之一就是朴素贝叶斯算法。因为这个算法的很多计算都可以被简化为分组与聚合。实现的主要挑战来源于采用什么样的方法来收集聚合好的数据以便做预测。现有的流计算朴素贝叶斯算法所采用的方法并不能直接用。因为在Spark结构化流计算的ForeachSink方法是在Worker上执行的,所以无法用最新的计数值来更新本地的数据结构。
为此,需要使用(如下所示的)Spark结构化流计算提供的一个内存表的输出格式来存储聚合值。
// 使用Dataset的变换操作来计算计数值
val counts = ds.flatMap{
case LabeledPoint(label, vec) =>
vec.toArray.zip(Stream from 1).map(value => LabeledToken(label, value))
}.groupBy($”label”, $”value”).agg(count($”value”).alias(“count”))
.as[LabeledTokenCounts]
// 创建一个表名来存储输出
val tblName = “qbsnb” + java.util.UUID.randomUUID.toString.filter(_ != ‘-‘).toString
// 把聚合结果以完整的形式写入内存表
val query = counts.writeStream.outputMode(OutputMode.Complete())
.format(“memory”).queryName(tblName).start()
val tbl = ds.sparkSession.table(tblName).as[LabeledTokenCounts]
而这个实现朴素贝叶斯算法的方法并不容易被推广到其他的算法,因为不是所有的算法都是通过聚合Dataset里的数据来实现的。不过通过回顾早期基于DStream的Spark流计算API,我们能够获得一个可能的解决方法的一些思路。如果说你可以采用某种update的机制来把新数据并入到已有的模型里,DStream的foreachRDD方法就可以让你接触到底层的微批次的数据。不幸的是,在结构化流计算里面,并没有foreachRDD的对等方法。不过使用一个定制化的导出(sink),你是可以在结构化流计算里面得到类似的行为。
如下所示的导出API是由StreamSinkProvider和Sink类定义的。StreamSinkProvider主要基于指定的SQLContext和导出设置来创建一个Sink的实例;而继承自Sink类则是提供了方法来批次处理实际的数据。
abstract class ForeachDatasetSinkProvider extends StreamSinkProvider {
def func(df: DataFrame): Unit
def createSink(
sqlContext: SQLContext,
parameters: Map[String, String],
partitionColumns: Seq[String],
outputMode: OutputMode): ForeachDatasetSink = {
new ForeachDatasetSink(func)
}
}
case class ForeachDatasetSink(func: DataFrame => Unit)
extends Sink {
override def addBatch(batchId: Long, data: DataFrame): Unit = {
func(data)
}
}
与把DataFrame写出到定制化的格式类似,为了使用一个第三方的导出,你可以指定导出的全类名。因为你需要指定格式的全类名,你必须保证任意SinkProvider的实例都能更新这个模型,因为你无法直接操作这个构造的导出对象。而且你必须要让模型处于导出类的定义的外部。
object SimpleStreamingNaiveBayes {
val model = new StreamingNaiveBayes()
}
class StreamingNaiveBayesSinkProvider extends ForeachDatasetSinkProvider {
override def func(df: DataFrame) {
val spark = df.sparkSession
SimpleStreamingNaiveBayes.model.update(df)
}
}
在等待Spark ML来更新结构化流计算的API期间,你就可以用上面所示的定制化导出来集成机器学习算法到结构化流计算里。
// 使用SimpleStreamNaiveBayes对象内的模型来训练
// 如果同时多个流都使用这个对象,则大家都会去更新这个模型
// 或者使用写死的查询名字来防止多个流的同时更新。
def train(ds: Dataset[_]) = {
ds.writeStream.format(
“com.highperformancespark.examples.structuredstreaming.” +
“StreamingNaiveBayesSinkProvider”)
.queryName(“trainingnaiveBayes”)
.start()
}
如果你对此还有疑虑,你可以查看这里的Spark内部版本是如何构建一个sink来让它的行为更像原始的foreachRDD的操作的。如果你对定制化的导出支持有兴趣,你可以关注SPARK-16407改进项目或者这个PR(Pull Request,拉取请求)。
很酷的是,无论你是否想使用Spark内部版本的API,你现在都可以用Spark早期流计算机器学习的实现方法来处理批次更新了。
尽管这个增强项目肯定还没有完备到为生产系统所使用,你还是可以发现结构化流计算API已经提供了多种不同的方法来扩展并支持机器学习算法了。
你可以从《高性能Spark:扩展和优化阿帕奇Spark的最佳实践》里了解到更多内容。