Spark问题总结

Posted by JustDoDT on July 17, 2019

Streming 程序已经失败,进程不退出

用户提交到 Yarn 上的 Spark Streaming 程序容易受到别的因素影响而导致程序失败,有时候程序失败之后 driver 进程不退出,这样无法通过监控 driver 的进程来重启 Streaming 程序。推荐将 Streaming 程序运行在 Standalone 模式的集群之上,使用 cluster 部署模式,并启用 supervise 功能。使用这种方式的好处是 Streaming程序非正常退出之后,Spark 集群会自动重启 Streaming 的程序,无须人为干预。

Executor 只有少数几个运行

经常会碰到这样一种现象:只有少数 Executor 在运行,别的 Executor 长时间空闲。这种现象比较常见的原因是数据的分区比较少,可以使用 repartition 来提高并行度

另外一种原因和数据本地性有关,请看下面的例子:

用户申请了100个 executor ,每个 executor 的 cores 为 6,那么最多会有 600 个任务同时在运行,刚开始是 600 个任务在运行,接着正在运行的任务越来越少,只剩下 78 个任务在运行,像下图所示:

(2057 + 78)/ 6057

这个问题会导致 Spark 基于 yarn 的动态分配功能也无法使用了,Executor 长时间空闲之后会被杀死,然后报一大堆Error信息。

首先回顾一下 Spark作业的提交流程,如下图所示:

spark

  • 1.首先 DAGSchedular 会把作业分成多个 Stage,划分的依据:是否需要进行 shuffle操作。

  • 2.每个 Stage 由很多的 Tasks 组成,Tasks 的数量由这个 Stage 的 partition 数决定。Stage 之间可能有依赖关系,先提交没有前置依赖的 Stage。把 Stage 里的任务包装成一个 TaskSet,交给 TaskScheduler提交。

  • 3.把Task 发送给 Executor,让 Executor 执行 Task。

    这个问题是出在第2步,TaskScheduler 是怎么提交任务的。这块的逻辑主要是在CoarseGrainedSchedulerBackend 和 TaskSchedulerImpl。

    下面是 CoarseGrainedSchedulerBackend 里面的 makeOffer 方法的主要逻辑:

    • CoarseGrainedSchedulerBackend 筛选出来活跃的 Executors,交给 TaskSchedulerImpl。
    • TaskSchedulerImpl 返回一批 Task 描述给 CoarseGrainedSchedulerBackend。
    • 序列化之后的任务的大小没有超过 spark.akka.frameSize 就向 Executor 发送该任务。

问题是出在第二步,根据活跃的 Executors,返回可以执行的 Tasks。具体查看 TaskSchedulerImpl的 resourceOffers方法。

  • 1.在内存当中记录传入的 Executor 的映射关系,记录是否有薪的机器加入。

  • 2.如果有新的机器加入,要对所有的 TaskSetManager 重新计算本地性。

  • 3.遍历所有的 TaskSetManager,根据 TaskSetManager 计算得出的任务的本地性来分配任务。

    分配任务的优先级:

    • 同一个 Executor
    • 同一个节点
    • 没有优先级节点
    • 同一个机架
    • 任务节点

    如果上一个优先级的任务的最后发布时间不满足下面这个条件,任务将不会被分布出去,导致出现上面的现象。

    判断条件是:curTime -> lastLaunchTime --> localityWaits(currentLocalityIndex)

    这样设计的初衷是好的,希望先让本地性更好的执行任务,但是这里没有考虑到 Executor 的空闲时间以及每个 Task 的空闲时间。跳过了这个限制之后,它还是会按照优先级来分配任务的,所以不用担心本地性的问题。

    下面这几个参数在官方的配置介绍当中有,但是没有介绍清楚,默认都是 3 秒,减小这几个参数就可以绕过限制了。

    • spark.locality.wait.process 1ms #超过这个时间,可以执行 NODE_LOCAL 的任务

    • spark.locality.wait.node 3ms # 超过这个时间,可以执行 RACK_LOCAL的任务

    • spark.locality.wait.rack 1s # 超过这个时间,可以执行 ANY 的任务

    实践测试,以上问题解决了,并且速度也快了20%以上。

Task 在同一棵树上连续吊死

Spark 的任务在失败之后还在同一台机器上不断的重试,直至超过了设置的重试次数之后。在生产环境当中,因为各种各样的原因,比如网络原因,磁盘满了的等原因会使任务挂掉,在这个时候,在同一台机器上重试几乎没有成功的机会,把任务发到别的机器上运行是最明智的选择。

Spark 是有任务的黑名单机制的,但是这个配置在官方文档里面并没有写,可以设置下面的参数,比如设置成一分钟之内不要把任务发到这个 Executor 上了,单位是毫秒。

spark.scheduler.executorTaskBlacklistTime 60000 ms

参考资料