优步的任务是提供“对每个人来说,在任何地方都可以获得像自来水一样可靠的出行服务”。为了履行这一承诺,优步依赖于在每个层面做出数据驱动的决策。大部分的决策都得益于更快的数据处理。例如,使用数据来理解一个地区以便于增加业务,或城市运营团队对新数据的访问来运营每个城市。不用说,数据处理系统的选择和必要的服务水平协议是数据团队与优步用户之间日常交互的主题。
在本文中,我想基于在优步建立数据基础设施的经验和经历,讨论准实时案例中数据处理系统的选择。在本文中,我认为通过增加新的增量处理原语到现有的Hadoop技术中,将能以更少的开销和统一的方式解决很多问题。在优步,我们正在构建相应的系统来解决这里总结的问题,并且以开放的态度,希望与对这一领域有兴趣的、志同道合的组织进行合作。
准实时案例
首先,让我们定义这类案例。在一些场景里,长达一小时的延迟是可容忍的,他们在大部分情况下都可以通过在MapReduce/Spark上用传统的批处理来执行,同时数据一般会向Hadoop/S3上增量添加。与之相反的另一个极端的案例是:需要小于一到两秒的延迟,通常涉及到将你的数据输送到一个可扩展的键值存储(已经在这个上工作)并且进行查询。诸如Storm、Spark流处理以及Flink之类的流式处理系统已经相当好地构建了实际可以支持约一到五分钟延迟的应用。流式系统对于像欺诈检测、异常检测或者系统监控这些需要机器快速响应做出的决策或者以盯着计算机屏幕为日常工作的人来说是非常有用的。
这两个极端之间给我们留下了一个大鸿沟,即从五分钟到一小时的端到端的处理延迟。在本文里我将这种情况称为准实时。大部分准实时案例商业仪表板,或是一些辅助人工决策的应用。下面是一些准实时可能的应用场景:
1.观察过去X分钟内仪表板上是否有任何异常;
2.测量过去X分钟内在网站上执行的试验进行的如何;
3.商业指标在X分钟间隔内的更新;
4.对一个机器学习管道在过去X分钟内进行特征抽取;
图一:处理延迟的不同着色图以及相关的典型技术。由Vinoth Chandar提供
通过“迷你”批次进行增量处理
解决准实时案例的选择是相当开放的。流式处理能够提供低延迟,并有较为基本的SQL支持能力,但是需要预先定义查询来达到较好的效果。专有的数据仓库有许多特性(例如,事务、索引),并且能支持随机和预定义的查询,但是这种专有数据仓库在规模上有限制而且价格昂贵。批处理可以解决大规模数据的场景,并通过Spark SQL/Hive提供成熟的SQL支持。但是这种处理的方式通常会有比较高的延迟。由于各有利弊,最后用户通常基于可用的硬件和他们组织内部的运维支持的方式来做出选择。我们将在本文的结论处在回头来看这些挑战。
下面我会介绍通过使用Spark/MapReduce而不是运行流式处理任务,以每X分钟执行迷你批任务的方式来解决准实时场景的一些技术优点。类似于Spark流处理中的微批次(以秒粒度执行操作),迷你批次以分钟粒度来运行。在本文中,我将通篇使用“增量处理”这一术语来指代这种处理方式。
增加效率
增量的处理迷你批次中的新数据能更加有效地使用组织中的资源。让我们来举个具体的例子,我们有一个Kafka事件流以每秒一万条的速度涌入,我们想要计算过去15分钟在一些维度上的消息的数量。大部分流式处理管道使用一个外部结果存储系统(例如Cassandra, ElasticSearch)来保存聚合的计数,并让在YARN/Mesos等资源管理里的容器持续运行。这在小于五分钟的延迟窗口的场景下是说得通的。实际上,典型的YARN容器的启动开销大约是一分钟。此外,为了提升写操作到结果存储系统上的性能,我们通常进行缓存并进行批量更新,这种协议都需要容器持续地运行。
图二: 流式处理引擎和增量迷你批次任务处理的对比。由Vinoth Chandar提供
然而在准实时处理的场景里,这些选择可能不是最佳的。为了达到同样的效果,你可以使用短生命周期的容器并且优化整体的资源利用。在图二中,流式处理器在15分钟内执行了六百万次更新到结果存储系统上。但是在增量更新模型里,我们执行一次内存中的合并同时仅进行一次更新到结果存储系统中,这时只会使用资源容器五分钟。相比实时模式,增量处理模型有三倍的CPU效率提升,在更新到结果存储的方面有几个数量级的效率提升。基本上,这种处理方式按需获取资源,唤醒的间隔足以完成等待的任务,而不用长时间运行,一边等待任务,一边吞食CPU和内存。
建立在已有的SQL引擎之上
随着时间的推移,大量SQL引擎在Hadoop/big data领域演进并发展(例如,Hive, Presto, SparkSQL)。它们提供了更好的针对大数据的复杂问题的表达能力。这些系统已经被大规模地部署,并在查询计划、执行等方面得到逐步增强。另一方面,流式处理的SQL仍然处于早期阶段。通过使用在Hadoop生态圈内已有的、更加成熟的SQL引擎来执行增量处理,我们可以利用他们自身发展过程中形成的坚实基础。
例如,连接操作在流式处理中是非常棘手的,因为要在窗口间对齐流。在增量处理模型中,这一问题变得更简单,因为有着相对更长的窗口,这使得有更多的空间让流在处理窗口中对齐。另一方面,如果正确性更为重要,SQL提供了一个更加简单的方式来选择性地扩展连接的窗口并且重新处理。
这类SQL引擎的另一个重要进步是对诸如ORC/Parquet等列式文件格式的支持,这对于分析工作是有着显著好处的。例如,连接两个有Avro记录的Kafka主题将比连接两个通过ORC/Parquet文件格式存储的Hive/Spark的表的开销大得多。这是因为,对于Avro记录来说,你最终要反序列化整个记录,而列式文件中只需要读取在记录中会被查询所用到的列。如果我们简单地从一条编码的Kafka Avro事件中的1000个字段中投影出10个字段,我们仍然需要为所有字段花费CPU和I/O的开销。列式文件格式通常可以更为“聪明”地投影到存储层。
图三:Kafka事件和HDFS上列式文件,将10个字段从1000个字段中投影出来的CPU和I/O开销的对比。由Vinoth Chandar提供
较少的运动部件
现在被广泛实现的Lambda架构(一个基于MapReduce 和 Storm 构建的流式处理的应用架构)有两个模块:速度层和批处理层。它们通常由两个独立的实现(从代码到基础设施)来管理。例如,Storm是速度层上的一个热门选项,而MapReduce可以作为批处理层来提供服务。实际上,人们经常依赖速度层来提供更新的结果(可能并不准确),而一旦数据被认为是完整了之后,通过批处理层在稍后的时候里来纠正速度层的结果。随着增量处理的使用,我们有机会以统一的方式在代码层面和基础设施层面来实现Lambda架构。
图四:结果表的计算,背后是一个经由增量处理得到的快速视图和一个经由批处理得到的更完整的视图。由Vinoth Chandar提供
上图中描述的思想相当简洁。正如我们所说的,你可以使用SQL或者类似Spark这样的批处理框架来一致地实现你的处理逻辑。结果表增量地被建立,像流式处理那样在“新数据”上执行SQL来产生一个结果的快速视图。同样的SQL可以周期性的被执行在全数据上,来纠正任何不准确的结果(记住,连接操作总是棘手的!),并产生一个更加“完整”的结果的视图。在这两种情况下,我们都将使用同样的Hadoop基础设施来执行计算,这可以降低总体运营成本和复杂度。
增量处理的挑战
在罗列了增量处理架构的优点之后,让我们来讨论一下在现在的Hadoop生态系统中实现这一架构时会面临的挑战。
完整性和延迟之间的权衡
在计算时,随着我们在流式处理、增量处理和批处理之间变换,我们面临着相同的根本权衡。一些应用需要所有的数据,并产生更为完整和准确的结果,而一些则只需要低延迟的数据来产生相对可接受的结果即可。让我们来看几个例子。
图五:展示了不同的Hadoop应用对延迟和数据完整性的容忍度。由Vinoth Chandar提供
图五描绘了一些应用案例,根据它们对延迟和(不)完整性的容忍度来定位。商务仪表盘可以展示不同的粒度的各项指标。它们通常较为灵活,可展示最近时间内不完整但是有较低延迟的数据,并随着时间变得完整(这也使得它们成为Lambda架构的代表)。对于数据科学或机器学习的案例而言,从输入的数据中抽取特征的过程通常延迟较低,而模型用更完整的数据进行自我训练的延迟较高。其他的例子中,欺诈检测要求低延迟地处理可获取的最新数据。而实验性平台需要相当的数据量,并以一个相对较低的延迟来保证实验结果比较新。
最常见的导致不完整的原因是迟到的数据(正如在这篇谷歌云数据流的演示文稿中详细解释的)。在真实的环境中,迟到的数据可以是基础设施层存在问题,例如数据中心的连接断开了15分钟;或是用户层面的问题,例如移动应用由于在飞行中不良的连接质量而导致事件的延迟发送。在优步,我们面临着十分相似的挑战,正如我们今年早些时候在Strata + Hadoop World大会上所阐述的。
为了有效地支持如此多样的应用集合,编程模型需要以一等公民的方式来对待迟到的数据。然而,Hadoop的处理通常是基于在完整数据(例如Hive中的分区)上的批处理,有保证完整性的职责,也要完全依赖数据产生者。在如今复杂的数据生态系统里,这对于单个数据产生者来说职责简直太多了。大部分产生者最终通过在一个诸如Kafka这样的存储系统上使用流式处理来达到较低的延迟,而依赖Hadoop存储来达到更加“完整”的(重)处理。我们将在下一节对此展开来讲。
缺乏用于增量处理的原语
正如在这篇关于流式处理的文章中详细描述的,事件时间以及其相对的到达时间的定义和迟到数据的处理是低延迟计算中很重要的方面。迟到的数据要求重新计算时间窗口(通常就是Hadoop中的Hive分区),尽管这些时间窗口的结果可能已经被计算完成甚至是已经与终端用户进行过了交互。通常来说,在流式处理世界中这类重新计算是通过使用可扩展的键值存储,在记录/事件层面增量发生的,并针对点查询和更新进行优化。然而,在Hadoop中,重新计算通常意味着重写整个(不可变)的Hive分区(或者简而言之是一个HDFS中的文件夹),并且重新计算所有在那个Hive分区上已经被消费过的任务。
从延迟和资源利用角度来看,这些操作都是开销昂贵的。这一开销通常会级联地传导到整个Hadoop的数据流中,最终导致延迟增加了数小时。因此,增量处理需要使得这两种操作更加得快速,从而使我们可以有效地将改变包含到已有的Hive分区中,并且为下游的表数据消费者提供一个仅获取新改变的数据的方式。
有效地支持增量处理可以分解为以下几个原语操作:
更新插入:从概念上讲,重写整个分区可以被视作一个非常低效的更新插入操作,最终会写入比进来的数据多得多的数据。因此,对(批量)更新插入的首要支持成为非常的重要工具。事实上,像Kudu和Hive事务等最近的趋势的确是朝着这一方向发展的。谷歌的Mesa(谷歌的数据仓库系统)论文也谈论了几项技术,可以被应用到快速数据注入的场景里。
增量消费:尽管更新插入可以解决快速地向一个分区发布新数据的问题,下游的数据消费者并不知道从过去的哪一个时刻开始哪些数据被改变了。通常,消费者通过扫描整个分区/数据表并重新计算所有数据来得知改变的数据,这需要花费相当多的时间和资源。因此,我们也需要一种机制来更加高效地获取从上次分区被消费的时间点开始改变过的数据记录。有了上面两种原语操作,你可以通过更新插入一个数据集,然后从中增量消费,并建立(也是增量的)另外一个数据集来支持很多常见的案例。数据投影就是最好理解的案例(如图六所示):
图六:一个简单的例子,通过更新插入新的改变到表1(table_1),并通过增量消费建立一个简单的投影表(projected_table)。由Vinoth Chandar提供
借用Spark流式处理的说法(如,流-数据集连接,流-流连接),我们可以更高效地以较低的延迟来操作简单的投影和流-数据集连接。甚至是流-流连接也可以增量计算,只不过需要增加一些额外的逻辑来做窗口对齐。
图七:一个更为复杂的例子,将一个事实表连接到多个维度表,从而建立一个连接过的表。由Vinoth Chandar提供
这个案例是我们可以节省硬件花费的同时显著地降低延迟的不多见的场景之一。
思维模式的转变
最后的挑战严格来说并不是技术上的。在选择技术以应对不同的场景时,组织生态扮演着核心角色。在很多组织中,团队挑选那些在行业流行的模板化解决方案,并逐步习惯以特定的方式来使用这些系统。例如,典型的数据仓库的延迟需求是以小时计的。因此,即使底层技术可以在更低的延迟下解决不少问题,但是还是需要花费大量的功夫去实现数据仓库系统的最小化停机时间或者避免在维护过程中服务中断。如果你是在建立满足更低延迟的服务水平协议的系统,这些运维特点是很重要的。另一方面,能解决低延迟问题的团队也非常擅长运维那些有严格服务水平协议要求的系统,这就导致组织机构最后总是会为批处理和流式处理分别创建数据贮藏库。这就阻碍了在诸如Hadoop的系统上实现增量处理,从而无法获得上述的好处。
这绝不是要尝试来泛化组织生态的挑战。作为一个经历了推动领英的在线服务,以及推动了优步数据生态系统的人,这些仅仅是我自己的观察。
可带走的经验
我想要留给你以下可带走的经验教训:
1.对实际延迟需求有清晰的定义可以帮你节省很多钱。
2.Hadoop可以通过应用支持增量处理的原语来解决很多问题。
3.统一的架构(代码和基础设施)是未来的方向。
在优步,我们有非常直接和可测量的商业目标/动机去解决这些问题。我们正在着手构建一个可以解决这些需求的系统。如果你对项目合作感兴趣,请务必联系我们。