Ray编程:给新用户的小指南
编者注:本文作者将在北京人工智能大会上带来教学辅导课Building reinforcement learning models and AI applications with Ray

Ray是一个用于在计算集群上编程的通用框架。 Ray使开发人员能够轻松地并行化他们的Python应用程序,构建新的应用,在任意大小规模的集群上(从笔记本电脑到大型集群)运行。 Ray提供了一个高度灵活,极简主义、易于使用的API。 表1 介绍了此API的核心功能。

在这篇博客中,我们描述了几个小技巧,可以帮助首次使用Ray的用户避免一些可能严重影响其程序性能的常见错误。

API 描述 示例
ray.init() 初始化Ray执行上下文。
@ ray.remote 函数或类装饰器,指定 函数将作为不同进程中的任务执行,或者类作为不同进程中的actor定义。 @ray.remote        @ray.remote

def fun(x):           class Actor(object):

…                            def method(y)

.remote Postfix到每个远程函数,远程声明类,或远程进行类方法的调用。 远程操作是异步。 ret_id = fun.remote(x)

a = Actor.remote()

ret_id = a.method.remote(y)

ray.put() 将对象存储在对象库中,并返回其ID。 此ID可用于将对象作为参数传递给任何远程函数或方法调用。 这是一种同步的操作。 x_id = ray.put(x)
ray.get() 从对象ID或对象ID列表返回对象或对象列表。 这是一种 同步 (即阻塞)操作。 x = ray.get(x_id)

objects = ray.get(object_ids)

ray.wait() 从对象ID列表返回(1)准备好的对象的ID列表,以及(2)尚未准备好的对象的ID列表。 默认情况下,它一次返回一个已准备好的对象ID。 ready_ids, not_ready_ids =    ray.wait(object_ids)
1 :我们在此博客中使用的核心Ray API。 完整的API可 在此处获得

本博客中报告的所有结果都是在配备2.7 GHz Core i7 CPU和16GB RAM的13英寸MacBook Pro上获得的。 虽然 ray.init() 在单个机器上运行时会自动检测核心数,但为了减少运行下面代码时在机器上观察到的结果的变化性,这里我们指定num_cpus = 4,即一台机器指定使用4个CPU。 由于每个任务默认请求一个CPU,因此该设置允许我们并行执行最多四个任务。 因此,我们的Ray系统由一个执行程序的驱动程序和最多四个运行远程任务或actor的工作程序组成。

延迟执行ray.get()

使用Ray,每个远程操作(例如,执行任务,定义actor方法)的调用都是异步的。 这意味着操作 立即 返回 promise / future,它实际上是操作结果的标识符(ID)。 这是实现并行性的关键,因为它允许驱动程序并行启动多个操作。 要获得实际结果,程序员需要 在结果的ID上 调用 ray.get() 。 此调用将阻塞,直到结果可用。 作为副作用,此操作还会阻止驱动程序调用其他操作,这可能会损害并行性。

不幸的是,新的Ray用户会很自然地无意中使用了 ray.get()。 为了说明这一点,请考虑以下简单的Python代码,该代码调用 do_some_work() 函数四次,每次调用大约需要1秒:

import time

def do_some_work(x):

    time.sleep(1) # 把这句替换为你想执行的代码.

    return x

start = time.time()

results = [do_some_work(x) for x in range(4)]

print(“duration =”, time.time() – start, “\nresults = “, results)

程序执行的输出如下。 正如预期的那样,该计划大约需要4秒:

duration = 4.0149290561676025 

results =  [0, 1, 2, 3]

现在,让我们将上述程序在Ray中并行化。 一些初次使用的用户只做了一个改动,就是把函数设置为远程执行,即

import time

import ray

ray.init(num_cpus = 4) # 此处定义了系统有4个CPU核.

@ray.remote

def do_some_work(x):

    time.sleep(1) # 把这句.

    return x

start = time.time()

results = [do_some_work.remote(x) for x in range(4)]

print(“duration =”, time.time() – start, “\nresults = “, results)

但是,执行上述程序时,会得到:

duration = 0.0003619194030761719 

results =  [ObjectID(0100000000bdf683fc3e45db42685232b19d2a61), ObjectID(01000000da69c40e1c2f43b391443ce23de46cda), ObjectID(010000007fe0954ac2b3c0ab991538043e8f37e0), ObjectID(01000000cf47d5ecd1e26b42624454c795abe89b)]

在查看此输出时,显示了两件事情。 首先,程序立即完成,即在不到1毫秒内完成。 其次,我们得到一堆标识符,而不是预期的结果(即[0,1,2,3])。 当然,这应该不足为奇。 回想一下,远程操作是异步的,它们返回future(即对象ID)而不是结果本身。 这正是我们在这里看到的。 我们只测量调用启动任务所花费的时间,而不是它们的运行时间,并且我们得到与四个任务相对应的结果的ID。

为了得到实际结果,我们需要使用 ray.get() ,这里的第一个灵感是 在远程操作调用上 调用 ray.get() ,即替换掉这一行“ results = [do_some_work .remote (x) for x in range(4)] “,把它用

 results = [ray.get(do_some_work.remote(x)) for x in range(4)]

来代替。( 注意:您必须只运行一次ray.init() 。如果您在Python解释器的同一实例中第二次运行它, 您将收到错误 。 )

通过在此更改后重新运行程序,我们得到:

duration = 4.018050909042358 

results =  [0, 1, 2, 3]

所以现在结果是正确的,但它仍然需要4秒,所以没有加速! 这是怎么回事? 细心的读者已经有了答案: ray.get() 是阻塞的,所以在每次远程操作后调用它意味着我们等待该操作完成,这实际上意味着我们一次执行一个操作,因此没有并行性!

要启用并行性,我们需要 在 调用所有任务 后 调用 ray.get() 。 我们可以在我们的示例中通过使用以下代码替换“ results = [do_some_work .remote (x)for x in range(4)] ”来轻松完成此操作:

  results = ray.get([do_some_work.remote(x)for x in range(4)])

通过在此更改后重新运行程序,我们现在得到:

 持续时间= 1.0064549446105957 

结果= [0,1,2,3]

最后,成功! 我们的Ray程序现在只运行1秒,这意味着 do_some_work()的 所有调用 都是并行运行的。

总而言之,请记住, ray.get() 是一个阻塞操作,因此如果急切地调用它会损害并行性。 相反,您应该尝试编写程序,以便尽可能晚地调用ray.get()。

提示1: 尽可能延迟调用ray.get()。

规避过于微小的任务

当第一次开发人员想要将他们的代码与Ray并行化时,自然的本能就是使每个函数或类都远程。 不幸的是,这会导致不良后果; 如果任务非常小,则Ray程序可能 比等效的Python程序 花费 更长的时间 。

让我们再次考虑上面的例子,但这次我们使任务更加短小(即,每个只需0.1ms),并将任务调用的数量急剧增加到100,000。

import time

def tiny_work(x):

    time.sleep(0.0001) # 把这行用你的代码替换

    return x

start = time.time()

results = [tiny_work(x) for x in range(100000)]

print(“duration =”, time.time() – start)

通过运行此程序,我们得到:

duration = 13.36544418334961

这个结果应该是可预期的,因为执行100,000个任务的下限每个需要0.1毫秒,这里显示是10秒,算上其他开销,如函数调用等等结果是合理的。

现在让我们使用Ray并行调用 do_some_work() 远程 调用此代码 :

import time

import ray

ray.init(num_cpus = 4)

@ray.remote

def tiny_work(x):

    time.sleep(0.0001) # 把这行用你的代码替换.

    return x

start = time.time()

result_ids = [tiny_work.remote(x) for x in range(100000)]

results = ray.get(result_ids)

print(“duration =”, time.time() – start)

运行此代码的结果是:

duration = 27.46447515487671

令人惊讶的是,不仅Ray没有改善执行时间,而且Ray程序实际上比顺序程序慢! 这是怎么回事? 嗯,这里的问题是每个任务调用都有一个无法忽略的开销(例如,调度,进程间通信,更新系统状态),这个开销占据了执行任务所需的实际时间。

加速此程序的一种方法是使远程任务更大,以便分摊调用开销。 这是一个可能的解决方案,我们 在一个更大的远程函数 mega_work()中 聚合1000个 tiny_work() 函数调用 :

import time

import ray

ray.init(num_cpus = 4)

def tiny_work(x):

    time.sleep(0.0001) # 把这行用你的代码替换

    return x

@ray.remote

def mega_work(start, end):

    return [tiny_work(x) for x in range(start, end)]

start = time.time()

result_ids = []

[result_ids.append(mega_work.remote(x*1000, (x+1)*1000)) for x in range(100)]

results = ray.get(result_ids)

print(“duration =”, time.time() – start)

现在,如果我们运行上述程序,我们得到:

duration = 3.2539820671081543

这大约是顺序执行的四分之一,符合我们的预期(回想一下,我们可以并行执行四个任务)。 当然,一个很自然的疑问是,对于任务来说,分摊远程调用开销的大小是多大。 找到这个的一种方法是运行以下简单程序来估计每个任务的调用开销:

 @ray.remote

def no_work(x):

    return x 

start = time.time()

num_calls = 1000

[ray.get(no_work.remote(x)) for x in range(num_calls)]

print(“per task overhead (ms) =”, (time.time() – start)*1000/num_calls)

运行上述程序显示:

per task overhead (ms) = 0.4739549160003662 

换句话说,执行空任务需要将近半毫秒。 这表明我们需要确保任务至少花费几毫秒来分摊调用开销。 需要注意的是,每个任务的开销因机器而异,以及在同一台机器上运行的任务与远程运行的任务之间会有所不同。 这就是说,在开发Ray程序时,确保任务至少需要几毫秒是一个很好的经验法则。

提示2: 为了高效利用Ray的并行性,远程任务应该至少需要几毫秒。

避免将同一对象重复传递给远程任务

当我们将一个大对象作为参数传递给远程函数时,Ray调用引擎 下的 ray.put() 将该对象存储在本地对象库中。 当远程任务在本地执行时,这可以显着提高远程任务调用的性能,因为所有本地任务都共享对象存储。 但是,有时在任务调用时自动调用ray.put()会导致性能问题。 例如, 重复 传递相同的大对象作为参数 ,如下面的程序所示:

import time 

import numpy as np 

import ray 

ray.init(num_cpus = 4) 

@ray.remote 

def no_work(a): 

    return 

start = time.time() 

a = np.zeros((10000, 2000)) 

result_ids = [no_work.remote(a) for x in range(10)] 

results = ray.get(result_ids) 

print(“duration =”, time.time() – start) 

该程序输出:

duration = 1.0699057579040527 

( 注意:如果此程序花费的时间超过1秒,可能是因为您的机器没有足够的内存。如果是这种情况,请停止此程序并运行下一个程序。 )

对于只调用10个不执行任何操作的远程任务的程序,此运行时间非常大。 这种出人意料的高运行时间的原因是每次调用 no_work(a)时, Ray都会调用 ray.put(a) ,这会导致将数组 a 复制 到对象存储库。 由于数组a 有2000万行,复制它需要花费很多时间。 为了避免每次调用 no_work() 时 复制数组 ,一个简单的解决方案是显式调用 ray.put(a) ,然后将 一个 ID 传递 给 no_work() ,如下所示:

import time 

import numpy as np 

import ray 

ray.init(num_cpus = 4) 

@ray.remote 

def no_work(a): 

    return 

start = time.time() 

a_id = ray.put(np.zeros((10000, 2000))) 

result_ids = [no_work.remote(a_id) for x in range(10)] 

results = ray.get(result_ids) 

print(“duration =”, time.time() – start) 

运行此程序仅需:

 duration = 0.12425804138183594

这比预期的原始程序快8倍,因为调用 no_work(a) 的主要开销 是将数组a复制到对象存储,现在只发生一次。

可以说,避免同一对象的多个副本到对象存储器的一个更重要的优点是它排除了对象存储器过早填满并导致对象回收的成本。

技巧3: 将同一个对象作为参数重复传递给远程操作时,使用ray.put()将其存储在对象库中一次,然后传递其ID

将数据处理形成管线

如果我们对多个任务的结果使用ray.get() ,我们将不得不等到 这些任务中 的 最后 一个完成。 如果任务花费的时间差异很大,这可能会成为一个问题。 为了说明这个问题,请考虑以下示例,其中我们 并行 运行四个 do_some_work() 任务,每个任务在0到4秒之间均匀分布。 接下来,假设这些任务的结果由 process_results()处理 ,每个结果需要1秒。 然后,预期的运行时间是(1)执行最慢的 do_some_work() 任务所花费的时间加上(2)4秒,这是执行 process_results()所 花费的时间 。

import time 

import random 

import ray 

ray.init(num_cpus = 4) 

@ray.remote 

def do_some_work(x): 

    time.sleep(random.uniform(0, 4)) # 将此行替换为你要执行的代码. 

    return x 

def process_results(results): 

    sum = 0 

    for x in results: 

        time.sleep(1) # 将此行替换为你要执行的代码. 

        sum += x 

    return sum 

start = time.time() 

data_list = ray.get([do_some_work.remote(x) for x in range(4)]) 

sum = process_results(data_list) 

print(“duration =”, time.time() – start, “\nresult = “, sum) 

程序的输出显示运行需要接近8秒:

duration = 7.82636022567749

result = 6

当其他任务可能早已完成时,等待最后一个任务完成不必要地增加了程序运行时间。 更好的解决方案是在数据 可用后立即处理 。 幸运的是,Ray允许您通过 在对象ID列表上 调用 ray.wait() 来完成此操作 。 在不指定任何其他参数的情况下,只要参数列表中的对象准备就绪,此函数就会返回。 此调用有两个返回:(1)就绪对象的ID,以及(2)包含尚未准备好的对象的ID的列表。 修改后的程序如下。 请注意,我们需要做的一件事就是将 process_results() 替换 为一次处理一个结果的 process_incremental() 。

进口时间 

随机导入 

导入光线 

import time 

import random 

import ray 

ray.init(num_cpus = 4) 

@ray.remote 

def do_some_work(x): 

    time.sleep(random.uniform(0, 4)) # Replace this with work you need to do. 

    return x 

def process_incremental(sum, result): 

    time.sleep(1) # Replace this with some processing code. 

    return sum + result 

start = time.time() 

result_ids = [do_some_work.remote(x) for x in range(4)] 

sum = 0 

while len(result_ids): 

    done_id, result_ids = ray.wait(result_ids) 

    sum = process_incremental(sum, ray.get(done_id[0])) 

print(“duration =”, time.time() – start, “\nresult = “, sum) 

这个程序现在只需要4.8秒了,是一个显著改进:

duration = 4.852453231811523 

result = 6 

( 注意:不同的运行可能需要不同的时间,但“duration”仍应显著小于8秒。 )

为了帮助直觉理解, 图1 显示了两种情况下的执行时间表:使用 ray.get() 等待所有结果变为可用后再处理,或者使用 ray.wait() 在所有结果全部可用前就开始处理。

sI6-tbAQQ032tvCU7vqQisw

1 :(a) 在调用 process_results() 之前 使用 ray.get() 等待 do_some_work() 任务的所有结果完成后再处理的执行时间表 。 (b)使用 ray.wait() 在部分结果可用时立即处理的 行时间表
技巧4: 使用ray.wait()可以在结果可用时立即处理。

其他资源

Ray教程是了解有关Ray编程的更多信息的绝佳资源。

ION STOICA

Ion Stoica是加州大学伯克利分校电子工程和计算机科学(EECS)系的教授,他在那里研究云计算和网络计算机系统。之前他从事动态数据包状态、弦DHT、网间迂回基础设施(i3)、声明性网络及大型系统,包括Apache Spark、Apache Mesos和Alluxio。 他是Databricks及Conviva的联合创始人,Databricks是一家将Apache Spark商业化的初创公司,后者是一家将大规模视频分发技术商业化的初创公司。Ion是一位ACM研究员,并获得了众多奖项,包括SIGOPS名人堂(2015年)、SIGCOMM时间测试奖(2011年)和ACM博士论文奖(2001年)。

A cluster of center pivot irrigation fields. (source: Soil Science)