日志和实时流计算处理
来自《我喜爱日志》的第三章。
编者注:本内容来自Jay Kreps所著的《我喜爱日志:事件数据、流计算处理和数据集成》一书的第三章。

到目前为止,我还仅仅只是描述了一些把数据从一个地方拷贝到其他地方的多种的方法。然而,在存储系统间挪动字节并不是故事的结尾。实际上我们发现,“日志”是“流”的另外一种说法,而日志(的处理)是流计算处理的核心。

但是先等一下,到底什么是流计算处理?

如果你是上世纪九十年代末和二十一世纪初的数据库或者数据基础设施产品的粉丝,你可能会把流计算处理和那些通过SQL引擎或者用“流程图”界面来进行数据驱动的处理过程联系起来。

而如果你是追随着爆炸性增长的开源数据系统的人,你可能就会把流计算处理和诸如StormAkkaS4Samza这样的系统联系起来。很多人会把这些系统看成是一个异步消息处理系统,和那些基于集群的RPC层上的应用没什么区别(事实上有些系统确实是这样)。我还曾经听有人过把流计算描述成一种模式,即立刻处理数据,随后就丢弃。

上述两种观点都有失偏颇。流计算处理与SQL毫无关系;同时也不局限于实时处理系统。没有任何的理由来限制你去用多种语言来处理昨天或者一个月以前的数据流;也没有说你必须(或者应该)把获得的原始数据丢弃掉。

我对流计算处理的看法则更加宽泛,即能做持续数据处理的基础设施。我认为流计算处理的计算模型可以是如同MapReduce那样的分布式处理框架一样的通用,只要它能提供低延迟的结果就可以。

而真正来驱动(或决定)处理模型的则是数据收集的方法。通过批次收集的数据则自然由按批次处理。对于持续流入的数据,就用持续的实时处理方式。

美国国家统计局的人口普查数据是一个按批次收集数据的好例子。统计局会定期启动人口普查,派专人上门去挨家挨户的收集美国公民的人口数据。这种方式对于在第一次普查开始的1790年(参见图1-1)来说是有道理的。那时候的数据收集在本质上就是批次的,因为要骑马去走访,再在纸上填写好统计记录,再把这些记录一批批地送到中心点去由人工来累加计算。而今时今日,在你给别人说这个人口普查的过程的时候,人们会立刻质疑为什么我们不记录一个出生和死亡的记录,然后可以随时随地的用任何的粒度来计算人口总数。

ihtl_0301-cbb19227e02573ad04557fa0f2e6c515

图1-1. 第一次美国人口普查是批次收集数据的,因为受限于当时的技术条件。然而,在一个数字化、网络化的世界里,批次数据收集早已不是必须的了

当然,这(人口普查)是一个极端的例子。但是现在很多的数据传输过程依然依赖于定期的收集和批次化传输与集成。显然批次收集数据的最自然的处理就是批次处理。随着这些过程逐步被实时输入收集所替代,我们也要相应的开始进行实时的数据处理,从而能平滑所需的处理资源,并降低延迟。

一个现在的互联网公司并不需要有任何批次数据收集。网站产生的数据或是用户行为数据或是数据库的改变,而两者都是持续发生的。事实上,如果你仔细想想,几乎任何的业务的本质的机制都几乎是一个持续的过程。如杰克 鲍尔所说,事件总是随时发生。出现是批次收集数据的情况,一般都是因为一些手工的过程或者是缺乏数字化,或者是因为历史原因造成的没法自动化或者数字化的材料。这时传递和对数据做出反馈一般都非常慢,如果整个过程需要运送纸张并由人来做处理。刚刚实现自动化后,一般还是会保留原来的处理流程,因此即便是媒介发生了改变,而(这种)流程还是会持续很长时间。

每天运行的批次处理数据的产品经常是有效地模拟了用一天为时间窗口的持续计算。而底层的数据当然是总在改变。

上面的讨论有助于厘清对于流计算处理的常见误解。通常认为某些种类的数据处理不适合用流计算系统来实现,而必须用批处理系统。我听过的一个典型的例子就是计算百分位、最大值、均值和其他类似的需要用所有的数据来做的聚合统计。但是这往往带来了某种误解。确实,类似计算最大值这样的块操作需要用时间窗口内的所有数据。然后这样的计算绝对能够通过流计算系统来实现。事实上,如果你查看早期的流计算的学术文章,一般它们完成的第一件事就是给出简洁的语义来定义时间窗口,以便于针对于窗口的操作成为可能。

看到这里,很容易能统一我对于流计算处理的观点,即流计算是更宽泛。 它和是不是块和非块并没有关系,仅仅只是一个底层数据里包含了时间定义的处理机制,并不要求对于处理的数据需要有一个静态的快照。这意味着流计算处理系统以一个用户控制的频率来产出结果,而不是一直等到数据全部到达。从这个角度看,流计算是批次计算的一个更泛化的操作。考虑到现在实时数据的的普及,这应该是一个更重要的泛化。

为什么这种传统的对于流计算处理的观点成为一个先进的应用。我认为最大的原因是因为缺乏实时数据收集的方法,从而让持续处理成为某种理论上的想法。

我确实认为缺乏实时数据收集的方法是商用流计算处理系统的梦魇。它们的客户依然是在做面向文件的日复一日的ETL和数据集成。构建流计算处理系统的公司一般专注于提供计算引擎来处理实时数据,但却发现现实中很少有客户有实时数据流。事实上在我在领英的早期时光,有个公司试图卖给我们一套非常酷的流计算处理系统,但因为当时我们所有的数据都是按小时收集的文件,所以我们所能想到的就是把这些小时文件在每小时结束的时候喂给这个系统。这个公司的工程师发现这是一个非常常见的问题。唯一真实的例外就是金融界。在这个领域里流计算处理有一些成功的案例,而恰恰是因为这个领域里实时流数据才是主流,而如何处理这些实时数据流才是主要关注点。

即使是在健康的批处理生态系统里,实际上流计算处理作为基础架构类型的适用性也是很强的。它涵盖了实时处理/相应业务和离线批处理业务的基础架构上的鸿沟。对于现代的互联网企业,我认为大约25%的代码是关于这种鸿沟的。

现在发现日志(log)解决了流计算处理里的一些非常关键的技术问题。后面我会陆续介绍这些问题,但其中最大的问题它解决的就是它让数据成为了实时的多订阅者的数据导入机制。

对那些希望能更多了解日志和流计算处理间的关系人,我们提供了开源的Samza,一个专门为这些想法构建的实时流计算处理系统。在这个链接里面我们很详细地介绍了这些想法的应用。但这不是专门为了某个特定的流计算处理系统的,因为几乎所有的主要流计算处理系统都和Kafka有某种程度的集成,让Kafak来作为数据的日志来进行处理。

数据流图

关于流计算处理的最有趣的方面就是它和一个流计算处理系统的内部机制没有任何关系,想反的是,相关的是他扩展了我们前面数据集成讨论里的数据源的观点。我们主要讨论了主要数据源和主要数据的日志化。即事件和数据都是直接由各种应用运行中生成的。流计算处理让我们可以也包含从其他数据源里计算出的数据源。 这些计算出来的数据源对消费者而言与用来计算其的其他数据源没什么区别(请参看图1-2)。

ihtl_0302-045778850a38a1c8cc529aa697e1e89b

图1-2.来自多日志的多路流处理图

这些计算出来数据源可能会包含相当复杂和智力的成分在其处理过程里,因此也是极具价值的。例如,谷歌在这里描述了它是如何在一个流计算处理系统上重构它的网页爬取、处理和建索引的管道的过程。这可能是这个行星上最复杂、最大规模的数据处理系统之一了。

所以什么是流计算处理过程?对于我们的目的而言,一个流计算处理工作就是那些从日志中读取并输出到日志或其他系统的任务。那些作为输入和输出的日志把整个流程连接成了一个处理阶段的图。使用这种中心化日志的形式,你就能观察所有机构的数据的获取、转换和流动,其实就是一系列日志以及从他们中读出和写入他们的过程。

一个流计算处理过程并不必需要有一个时髦的框架。它可以是任何一个或多个读取和写入日志的过程。额外的基础架构和支持能够帮助管理和扩展这种近乎实时的处理过程程序,而这也就是流计算框架所做的。

日志和流计算处理

为什么你在所有的流计算处理里需要日志?为什么不是让处理单元通过简单的TCP或者其他轻量级的消息协议来更直接的通信?有多个理由来支持这一(日志)模式。

首先,这种模式可以让每个数据集都能为多订阅者所用。每个流处理过程的输入对任何需要的处理器都可用;同时每个输出也都对任何需要的都可用。这一点不仅对生产数据流很好用,而且也在复杂的数据处理管道里调试和监控阶段很有帮助。能快速的进入一个输出流并检查它的有效性,同时计算一些监控的统计数据,或者仅仅只是看看数据长什么样,这些都使得开发变的非常有可追踪性。

其次,这样使用日志能确保每个数据消费者处理过程中顺序可以被保留。某些事件数据可能被按时间戳松散地排序了,但是不是每种事件数据都这样。考虑从来自数据库的一个更新流,我们可能有一系列的流处理任务来处理这些数据并准备为搜索索引来做索引。如果对同一个记录同时做两次更新,那么我们最后可能在索引的最终结果出错。

这样使用日志的最后一个可能也是最重要(可探讨)的原因是它提供了缓存和对每个处理过程的隔离。如果一个处理器产生结果的速度比它后续的消费程序的处理能力快,我们可以有三种选择:

  • 我们可以先暂停上游的处理任务,直到下游的任务可以处理。如果只用TCP而没有使用日志,这种情况是最可能发生的。
  • 我们就把数据丢弃掉。
  • 我们可以在两个处理任务间缓存数据。

丢弃数据在某些场合可能没什么。但是基本都是不可接受的,也从来不被希望这样做。

暂停(上游)处理听起来似乎是一个可接受的选择。但实际中这会成为一个很大的问题。考虑到我们需要的不仅仅是对单一的应用流程建立模型,而是为整个机构建立全套的数据流模型。这就将不可避免的形成一个复杂的数据处理流网络,由不同的部门团队的不同的数据处理器来组成,并支持不同的SLA。在这样复杂的数据处理网络里,如果因为后续处理能力不足或者失败而导致上游的数据产生器被暂停,这都会级联地影响上游数据流程序,从而使得的很多处理器都被暂停。

这样看来唯一可用的选择:缓存。日志可以是非常非常大的缓存,可以让处理程序被重启,或者即使失效了也不会影响处理图里的其他部分。这也意味着某个数据消费程序可以停机很长时间,而不会影响上游的程序。只要在它重启后能及时处理完缓存的数据,大家都皆大欢喜。

在其他地方,这也不是一个不寻常的模式。巨大、复杂的MapReuce流就使用了文件作为检查点,并共享他们的中间处理结果。巨大、复杂的SQL处理管道也是创建了很多中间的临时表。这里仅仅只是运用了这种模式的抽象-日志-使得它适合于处理运动中的数据。

StormSamza是两种基于这个模式构建的流技术处理系统,也能使用Kafka或者其他类似的系统作为他们的日志部分。

处理数据:Lambda架构和一个可替换的方案

一个基于这种日志数据模型的有趣的应用就是Lambda架构,由内森·马兹提出。他写了一个广为传播的博客(《如何打败CAP定理》),其中介绍把流处理系统和批次处理相结合的方法。这个架构被证明是一个非常流行的想法。已经有专门的网站书籍了。

什么是Lambda架构?如何运用?

Lambda架构一般类似于图1-3

ihtl_0303-31b8b84dd34b45ead33c1eb85d32a6f2

图1-3. Lambda架构

它工作的方式是不可变的一系列数据记录被采集,并同时并行地送给批处理和流处理系统。数据转换的逻辑被实现两次,一次是在批处理系统里,一次是在流处理系统里。然后把二者处理的结果在查询的时候合并,给出一个完整的答案。

这个方式有很多中变形,这里我是有意地简化了许多。例如你可以使用多种类似的系统,如Kafka、Storm和Hadoop。而大家也经常会使用两种不同的数据库来存储输出结果,一种是专门为实时处理优化的数据库,而其他的则是为批处理所准备的。

Lambda架构的优点?

Lambda架构强调保留原始输入数据不变。我认为这是一个非常重要的特性。处理复杂数据流的能力得到了很大的加强,因为能够看到数据是什么样和输出是什么样。

我还喜欢这个架构强调了重复处理数据的问题。能重复处理数据对流计算系统来说是一个重大挑战,但却经常被忽略。

对于重复处理,我的意思是再次处理输入的数据从而再次计算出结果。这是一个太明显,但也经常被忽略的需求。代码总是在变。所以如果你的代码已经从输入流中计算出了结果,当代码改变后,你需要再次计算输出来检查代码修改的效果。

为什么代码会变?也许是因为你的应用在演进,你又想计算一些之前不需要的新的输出项。或者是你发现了一个代码缺陷并修好了。无论如何,当这件事发生的时候,你就需要重新计算你的输出结果。我发现很多的人试图去构建一个实时数据处理系统,但根本不去仔细思考这个问题,并最终导致系统不能很快的演进,仅是因为没有一个好的方法来解决重复处理的需求。

它的缺点

Lambda架构的一个问题就是需要维护两套复杂的分布式系统的代码,这看起来就很头疼。但我不认为这个问题不能解决。

如Storm和Hadoop这样的分布式框架的编程是很复杂。不可避免的是代码最后会专门为所使用的框架而特殊地构建。由此导致的运维的复杂性是被所有使用这个框架的人所一致同意的。

解决这一问题的一个方法是对实时和批次框架抽象出一种语言或框架。用这个高级框架来写你的代码,然后去“编译”成或是流计算处理代码,或是MapReduce的批处理代码。Summingbird就是这样做的一个框架。它确实让事情好了一些,但我不认为它真正解决了问题。最终,即便你能避免为应用写两套代码,运维两套系统的负担还是很重的。新的抽象仅仅是提供了两套系统的交集所支持的特征。更糟的是,使用这一统一的框架就把整个生态系统里那些使得Hadoop非常强大的工具和语言(如Hive、Pig、Crunch、Cascading、Oozie等)给排除在外了。

打个比方,想想那个使跨数据库对象关系映射(ORM)透明化的臭名昭著的难题。而这也还是对非常相同的系统进行抽象来用标准接口语言提供相同的能力哦。那么去抽象化两个完全不同的构建于刚刚稳定化的分布式系统上的编程模式将会更加的难。

一个备选方案

作为一个基础架构的设计者,我认为真正有意义的问题是:为什么不去改进流计算系统来解决所有的问题集?为什么你需要再粘贴一个别的系统(批处理)?为什么你不去把实时处理和在代码改变时需要的重复处理一起解决?流计算处理系统已经有了并行的概念,为什么不是去通过增加并行性来解决重复处理的问题,并很快的再现历史?答案是你可以这么做。我认为这就是如果你需要构建这样一个系统的一个合理的备选方案。

当我和别人讨论这个思想的时候,有时他们会告诉我流计算处理对于高吞吐率处理历史数据并不合适。但这是他们基于已经使用的系统的缺陷的直觉反应。这些系统要不就是很难扩展,或者是根本就没存历史数据。但没有理由认为这就是对的。流计算处理系统的基本抽象就是数据流的有向无环图(DAG)。这和传统的数据仓库(如Volcano)的基本抽象是一样的,并和MapReduce的后继Tez的基本底层抽象也是一样。流计算处理仅仅是对这一数据流模型的泛化,即对中间结果提供检查点(checkpointing)并持续地输出到最终的用户。

那么我们怎么才能从我们的流处理任务里完成重复处理?我最喜欢的方法其实非常的简单。

1. 使用Kafka或者其他的一些能帮你保存你想重复处理的数据的全部日志的系统,这些系统还要能支持多订阅者功能。例如,如果你想重复处理之前30天的数据,那就把Kafka的保存时间设成30天。

2. 当你想重复处理数据时,启动第二个你的流计算系统的实例,从保留数据的开始再次处理这些数据,不过把结束输出到新的表。

3. 当第二个实例已经可以赶上现有数据的进度了,就把应用定向到从新的输出表里读取数据。

4. 停止原有版本的任务实例,并删除旧的输出表。

这个架构类似图1-4里所示。

 

ihtl_0304-f837e10ea1d3270f632bdb455af6a2d8

图1-4. Lambda架构的一个备选替换方案,移除了批处理系统。

与Lambda架构不同,这个方法里你只是当你代码改变而确实需要再计算结果的时候你才需要重复处理。当然这样的重复计算只是用你代码的改进版本,使用相同的框架,并处理相同的数据。

很自然的,你希望能多给你的并行的重复处理任务更多的资源以便于让它能非常快地完成。

当然,你可以进一步优化这个方法。很多情况下你可以把两个输出表合并。然而我认为让两个输出表并存一段时间是有不少好处的。这可以及时回退回旧的逻辑,而你所需要做的仅仅只是把应用再重定向到旧的表。另外,对于一些非常重要的场景里,你可以使用自动的A/B测试或者多臂强盗算法来控制这个切换,保证新的代码确实是比旧有逻辑有改进而不是变的更差了。

需要注意的是,这个方法并不是说你的数据不能输出到Hadoop里。它仅仅是说你不必要在Hadoop里面做重复处理。Kafka和Hadoop有非常好的集成,所以从中导出任何Kafka的topic都很简单。通常把一个流计算处理任务的输出甚至是中间结果镜像到Hadoop中,从而让一些例如Hive这样的分析工具来处理,或者是作为其他人的输入,或是为离线数据处理流服务。这都是很有用的。

我们已经记录了如何实现这个方法,包括使用Samza实现重复处理的框架的其他变形。

这两种方法所对应的效率和资源之间的权衡是值得讨论的。Lambda架构需要同时一直运行重复处理和实时处理任务。而我提出的方法仅仅只是在需要重复处理的时候才运行第二个任务的实例。然而我的提议要求有临时的额外的输出数据库的存储。同时数据库也要能支持这样大容量的写操作。两种情况下,重复处理所造成的额外任务都可以被平均分布出去。如果你有很多这样的任务,他们不需要一下都重复处理完。所以在一个共享的集群里,如果有很多个这样的任务,你需要预留一部分容量来处理这些随时会发生的任务。

我提议方法的真正的优势不是效率,而是能让大家在一个单一的处理框架里开发、测试、调试和运维他们的系统。

所以在简介是很重要的场景里,可以把我的方法作为Lambda架构的一个备选方案。

有状态的实时处理

日志和流计算处理之间的关系不仅仅限于重复处理。如果实际的流计算处理系统需要维护状态信息,这时使用日志就可以有另一个新的用处了。

一些实时流处理系统仅仅是无状态的一次性数据转换。但是很多场景下都是比较复杂的计数、汇聚或窗口间的连接等操作。例如,你可能希望对事件流(比如点击流)进行增强,比如通过连接点击流和用户帐号数据库来给点击加上用户信息。不可避免的,类似这样的处理最终都会需要保存某种程度的状态信息。例如当计数的时候,你需要保留此前的计数值。这样的状态信息怎么样才能保留下来当处理器自身会发生失效?

最简单的方法就是把状态信息放到内存里。然而如果处理器崩溃了,这个信息就丢失了。如果状态信息仅仅是在一个窗口里维护的,这个处理器就可以从这个窗口开始的点重新再来。但是如果计算一小时的计数,这个方法可能就不行了。

另外一个方法就是把所有的状态信息都存储到一个异地系统,并通过网络获取。这个方法的问题是没有本地化的数据,还需要很多的网络传输。

我们怎么能支持一些如把一张表分片到处理里的操作?

回顾之前对于表和日志的二元性的讨论,这给我们提供了能把流转换成表并和我们的处理并存的方法。这还提供了一个对于表失效的解决机制。

流处理器可以把它的状态保持到一个本地的表或者索引里,如dbdRocksDN,或者一些更不寻常的机制,比如Lucenefastbit索引等。这些存储的内容是从输入流里导入的(可能是首先做一个强制转换后)。它对本地索引可以记录下修改日志(changelog),并保存这些修改日志,从而在系统奔溃或重启后可以恢复出状态信息来。这就可以提供一个通用的机制来保存状态信息在一个索引类型的本地内。

当(流)处理过程失效,它就从修改日志里恢复索引。日志在这里就变成了把本地的状态信息转换成一个增长的随时记录的备份。

这个状态管理的方法有一个优雅的特性,即处理器的状态也被维护成了一个日志。我们可以把这个日志想成是数据库表的修改日志。事实上,处理器有一些伴随着它的非常类似于同分片表的东西。因为状态本身就是日志,其他的处理器也可以订阅它,这就可以非常有用。比如整个处理过程的目标就是更新输出的最终状态的场景。

当把从数据库里输出的日志结合起来看,日志/表二元性的威力就非常清晰了。修改日志(changlog)可以从数据库里抽取,并被不同的流处理器用不同的方式检索来和事件流所连接。

我们提供了在Samza里面使用这种类型的状态管理的细节,以及很多实际的例子

日志压缩

当然,我们不能希望能保持所有时间的状态改变的完整日志。除非你有无限的空间,不然日志总是要被清理的。我会介绍如何在Kafka里面实现这个功能的。

在Kafka里,清理有两个选择,取决于数据是仅包含纯事件数据还是键值化的更新。对于事件数据,我的意思是没有相关的情形发生,比如网页浏览、点击或者其他你会在一个应用的日志里发现的东西。对于键值化的更新,我的意思是事件有特别记录的状态改变,而这被用某些键值所识别。数据库的修改就是一个典型的键值化更新的例子。

对于事件数据,Kafka支持保存数据的窗口。这个窗口可以是用时间(以天)或者是空间(以GB)为单位。大部分人仅仅只是使用它默认的一个星期为保存窗口。如果你希望有无限的保存期,就把这个窗口设成无限,你的数据就永远不会丢失。

然而,对于键值化的数据,一个完整日志的非常好的特征就是你可以重现源系统的状态。即,如果你有这个修改的日志,你可以在另外一个数据库里再现这个表,并重建这个表的任意时间点的状态。这对于不同系统也适用。你可以在另外一个数据库里再现源数据库里的更新,并维护数据的主键(一个搜索的索引、一个本地库等等)。

然而,随着时间的延长,保存完整的日志会消耗越来越多的空间,再现过程也会越来越久。因此在Kafka里,为了支持这种应用场景,我们支持不同类型的保存。其中一个例子就展示在图1-5里。不是简单地完全丢弃旧的日志,我们收集了日志末尾的部分里过时的记录作为垃圾。任何在日志末尾的记录有最近的更新就会适合于清理(只保留最新的更新)。这么做就可以保证日志保存了一个源系统的完整的备份,但是我们现在不必在完全重建所有的之前的状态了,而仅仅只是最近更新的状态。我们把这个特性称为日志压缩

ihtl_0305-e0e8541fe6aaddc447e2e95d09e85562

图1-5.日志压缩可以确保日志里只保留每个键值的最新的更新。这对模型更新成可变数据的日志是很有用的

Jay Kreps

Jay Kreps是Confluent的联合创始人和CEO。Confluent专注于Apache Kafka。在此之前,Jay是领英的主要架构师之一,专注于数据基础架构和数据驱动的产品。他是多个可扩展的数据系统空间的开源项目的作者之一,包括Voldemort、Azkaban、Kafka和Samza。

"Pair of Strutting Birds Dance a Jig" (source: Wikimedia Commons).