spark streaming作业心跳异常

问题现场

19/01/17 14:19:56 WARN executor.Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Error sending message [message = Heartbeat(20,[Lscala.Tuple2;@a40072b,BlockManagerId(20, hzxs-datanode-190, 65403, None))]
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:119)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:689)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:718)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:718)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:718)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1951)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:718)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
... 13 more
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

看了问题作业的container,发现CPU和内存并没有特别的消耗。

通过jstack查看进程,发现心跳的请求被写CAT的堵住了。

去掉这个逻辑,作业正常运行。

 

其他几个解决思路

增加container的内存 spark.executor.memory

增加心跳的超时时间: spark.executor.heartbeatInterval 默认是10s 。spark.executor.heartbeatInterval参数不能超过 spark.network.timeout(默认是120s)

每一个executor启动后都会触发一个定时器,定时发心跳,具体的类是org.apache.spark.executor.Executor

/**
 * Schedules a task to report heartbeat and partial metrics for active tasks to driver.
 */
private def startDriverHeartbeater(): Unit = {
  val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")

  // Wait a random interval so the heartbeats don't end up in sync
  val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]

  val heartbeatTask = new Runnable() {
    override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
  }
  heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}

 

 

发表评论?

0 条评论。

发表评论


注意 - 你可以用以下 HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>