深入了解Python的Dask分布式调度程序 - selectfrom

22-04-15 banq

Dask 是一个强大的 Python 库,可让您使用一个代码将数据工程从一台机器扩展到多台机器,并具有 Python 的可扩展性。这种分布式电源的核心是 Dask 分布式调度程序。

从本质上讲,Dask调度器将工作交给某个工作者worker。如果该工作者worker看起来已经饱和,并且在一段时间后无法启动任务,Dask Scheduler就会将该工作拿走,并将其交给另一个工作者。
这方面的代码可以在dask.distributed>stealing.py中找到,你可以在GitHub.上进行跟踪。

WorkStealing类保持对几个work-stealing重要状态的跟踪:
  • stealable_all:跟踪计划中的任务,并且是 "可偷的"。有一个 "级别 "的概念,预先设置了15个级别的可steal状态,这有助于Scheduler决定需要重新分配工作的紧迫性。
  • stealable:记录我们可以steal工作的工人的情况。
  • key_stealable:跟踪当前被分配的工人,级别配对。
  • saturated: 一个已经饱和的工人列表。


Dask分布式调度器是如何重新平衡任务负载的?
dask.distributed > stealing.py > WorkStealing > balance()

还记得饱和saturated列表吗?我们取前10名饱和工人,并在其中找到那些分配的任务多于他们目前能处理的工人,其逻辑如下:
combined_occupancy(ws) > 0.2 and len(ws.processing) > ws.nthreads

Occupancy是一个双数,用于跟踪任务的中位运行时间,在某种意义上是一个启发式的数值,用于预测要花多长时间。nThreads是我们通常的线程数。随着工作者积累了更多的工作,饱和/闲置列表会保持最新,并且可以增加/减少长度。
我们现在引入级别和成本乘数的概念,以决定任务窃取优先权。我们有15个级别的任务,以及相应的成本乘数。
从现在开始(为了准确起见,总是参考源代码),成本乘数看起来是这样的,按照级别增加的顺序。

[1, 1.03125, 1.0625, 1.125, 1.25, 1.5, 2, 3, 5, 9, 17, 33, 65, 129, 257]

对于每个级别,我们都会浏览所有可窃取的任务,并做一个成本效益分析。
dask.distributed > stealing.py > WorkStealing > balance > maybe_move_task

目前的决策算法如下:
如果我们偷了任务,成本={闲置工人的占用率+(成本乘数*任务所需的估计时间)}。
如果我们没有偷窃,成本={饱和工人的占用率-(任务所需的估计时间/2}。
occ_idl + cost_multiplier * duration <= occ_sat - duration / 2

我现在不确定为什么要这样定义,但我要猜测,有一些选择是基于对时间的学习而做出的。

最后,我们转向实际的偷窃任务 让我们来看看重新分配任务的机制。
async def move_task_confirm

我们将偷窃方法称为我们想要偷窃的任务。该方法在in_flight任务队列中寻找该任务。然后检查 "stimulus ID",这似乎是在检查偷窃请求发出时任务的状态。如果请求是陈旧的,终止。如果不是,继续。
self.in_flight_occupancy[thief] -= d["thief_duration"]
self.in_flight_occupancy[victor]+= d["victor_duration"]

然后我们更新我们偷来并给了任务的工人的占用率(预期平均任务完成时间保持在两倍)。
ts.state != “processing” : abort steal
elif state in _WORKER_STATE_CONFIRM : only then we still


然后我们检查工作者的状态。如果受害者已经在执行,则中止偷窃。如果受害者仍未启动该任务,则偷窃。

这似乎是work-stealer与Spark和Hadoop类系统的关键区别之一。

Spark和Hadoop有能力推测地安排同一任务的多次运行,以应对可能的 "滞后 "任务,而Dask目前没有这样做。


self.remove_key_from_stealable(ts)
ts.processing_on = thief
duration = victim.processing.pop(ts)
victim.occupancy -= duration
thief.processing[ts] = d[“thief_duration”]
thief.occupancy += d[“thief_duration”]
self.scheduler.total_occupancy += d[“thief_duration”]
self.put_key_in_stealable(ts)


我们从'可窃取的'任务键中抹去任务,因为我们正在制定窃取。更新任务以指向新的'盗贼'工作者,更新占用把可窃取的任务放回盗贼,因为该工作者把任务发送给了工作者。

self.scheduler.check_idle_saturated(thief)
self.scheduler.check_idle_saturated(victim)

最后,我们检查并更新两个工作者的空闲饱和状态。空闲饱和状态是由调度器本身保持的,此时它做一些数学运算来计算出两个工作者的饱和状态。
至此,我对Dask调度器的工作窃取机制的理解结束了。我在这里的主要收获是。
抢工被集中地整合到Dask分布式调度器中。虽然作者注意到偷工减料并不是其性能的核心组成部分,但我对Dask原始论文中的概念的实现相当好奇。目前,Dask并不支持投机性调度。我们能够在没有投机性调度的情况下达到Spark的许多性能基准,这一事实令人印象深刻。
Dask分布式是一个相当强大的任务分配机制,但它具有明显的可读性,而且决策启发式对程序员是透明的。这一点我非常欣赏。


 

猜你喜欢