yarn 设计之 Dispatcher 

Dispatcher是yarn代码中使用场景比较多的一个类,整体的设计思路是一个生产者和消费者模型,不过支持的多生产者和多消费者的模式。
不同的生产者和消费者之间通过一个Map<Class<? extends Enum>, EventHandler> eventDispatchers 来映射,数据传递通过一个无界的BlockingQueue来持有,消费是启动一个Thread来做处理。Thread是同步的根据注册的类型,切换不同的handler来处理,这个过程是同步的。
所有的handler实现EventHandler接口或者是MultiListenerHandler,MultiListenerHandler的实现是为了满足一个事件有不同的事件通知,其中EventHandler是一对一的消息模型,MultiListenerHandler是一对多的消息模型。
事件继承AbstractEvent类
具体实现上有 MultiThreadedDispatcher、AsyncDispatcher两种
AsyncDispatcher:单线程的事件生产和消费消息
MultiThreadedDispatcher : 持有多个AsyncDispatcher,也就是一个AsyncDispatcher集合,添加事件时候通过轮训的方式添加到对应的AsyncDispatcher中,每个AsyncDispatcher维护自己的异步消费线程。
落地的场景有以下几种:
ResourceManager — AsyncDispatcher
————————————————
注册的事件类型 –> 事件的处理类
RMAppEventType –> ApplicationEventDispatcher
RMAppAttemptEventType –> ApplicationAttemptEventDispatcher
NodesListManagerEventType –> NodesListManager
SchedulerEventType –> SchedulerEventDispatcher
RMNodeEventType –> NodeEventDispatcher
RMAppManagerEventType –> RMAppManager
AMLauncherEventType –> ApplicationMasterLauncher
RMFatalEventType –> ResourceManager.RMFatalEventDispatcher
RMApplicationHistoryWriter — MultiThreadedDispatcher
————————————————
WritingHistoryEventType –> ForwardingEventHandler
dispatcher个数 :yarn.resourcemanager.history-writer.multi-threaded-dispatcher.pool-size=10
SystemMetricsPublisher — MultiThreadedDispatcher
————————————————
SystemMetricsEventType –> ForwardingEventHandler
dispatcher个数 :yarn.resourcemanager.system-metrics-publisher.dispatcher.pool-size=10
CommonNodeLabelsManager — AsyncDispatcher
————————————————
NodeLabelsStoreEventType –> ForwardingEventHandler
NodeManager — AsyncDispatcher
————————————————
ContainerManagerEventType –> ContainerManagerImpl
NodeManagerEventType –> NodeManager
ContainerManagerImpl — AsyncDispatcher
————————————————
ContainerEventType –> ContainerEventDispatcher
ApplicationEventType –> ApplicationEventDispatcher
LocalizationEventType –> ResourceLocalizationService
ContainersMonitorEventType –> ContainersMonitor
ContainersLauncherEventType –> ContainersLauncher
LogHandlerEventType –> LogHandler
SharedCacheUploadEventType –> SharedCacheUploadService
ResourceLocalizationService 复用的是ContainerManagerImpl的AsyncDispatcher
————————————————
LocalizerEventType –> LocalizerTracker
如果发现AsyncDispatcher 异步下发性能不够的话,一般会看到这个日志
Size of event-queue is 1000
Size of event-queue is 2000
说明下游的handler处理性能不行了
原生的日志里面有一个问题,不好的点就是 不知道是哪块的逻辑处理性能不行,日志只到AsyncDispatcher级别,有点烦人



阅读全文…

slf4j + log4j + commons log 的相关配置

之前一直没有系统的整理 日志依赖相关的包,最近恰好查一个问题,需要在一个最简单的环境下,打印日志。就花了点时间整理了一下各个日志包之间的使用关系。

阅读全文…

Uber的HDFS治理

要点

阅读全文…

hdfs-namenode之间自动ha切换过程

整个namenode之间的ha保证是通过ZKFC这个组件来完成的。

阅读全文…

datanode同namenode之间的几个心跳

heartBeat : dfs.heartbeat.interval 默认是3秒

阅读全文…

hdfs dfsadmin setBalancerBandwidth 命令解析

hdfs dfsadmin -setBalancerBandwidth 这个命令是datanode做balance或者mover时候一个比较核心的参数配置

阅读全文…

hive on spark 折腾记

搞这个的原因是因为不想走spark sql,历史包袱太重。所以折衷的使用 hive on spark
有两种形式可以设置hive on spark .
一种是修改hive-site.xml 中的配置
另一种是讲spark目录下的spark-default.conf文件copy到$HIVE_HOME/conf 目录下
两者的效果是一样的
<property>
<name>hive.execution.engine</name>
<value>spark</value>
</property>
<property>
<name>hive.enable.spark.execution.engine</name>
<value>true</value>
</property>
<property>
<name>spark.home</name>
<value>/data/service/spark</value>
</property>
<property>
<name>spark.master</name>
<value>yarn</value>
</property>
<property>
<name>spark.submit.deployMode</name>
<value>cluster</value>
</property>
<property>
<name>spark.eventLog.enabled</name>
<value>true</value>
</property>
<property>
<name>spark.eventLog.compress</name>
<value>true</value>
</property>
<property>
<name>spark.eventLog.dir</name>
<value>viewfs://jssz-bigdata-cluster/tmp/sparklog</value>
</property>

<property>
<name>spark.serializer</name>
<value>org.apache.spark.serializer.KryoSerializer</value>
</property>

<property>
<name>spark.yarn.historyServer.address</name>
<value>http://10.69.1.34:18080</value>
</property>
<property>
<name>spark.driver.cores</name>
<value>1</value>
</property>

<property>
<name>spark.driver.memeory</name>
<value>4g</value>
</property>

<property>
<name>spark.driver.extraJavaOptions</name>
<value>-Dspark.driver.log.level=INFO -XX:+UseG1GC</value>
</property>

<property>
<name>spark.executor.memeory</name>
<value>15g</value>
</property>
<property>
<name>spark.executor.cores</name>
<value>1</value>
</property>

<property>
<name>spark.executor.extraJavaOptions</name>
<value>-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseG1GC</value>
</property>

<property>
<name>spark.shuffle.service.enabled</name>
<value>true</value>
</property>
<property>
<name>spark.dynamicAllocation.enabled</name>
<value>true</value>
</property>
<property>
<name>spark.dynamicAllocation.minExecutors</name>
<value>1</value>
</property>
<property>
<name>spark.dynamicAllocation.maxExecutors</name>
<value>50</value>
</property>
<property>
<name>spark.dynamicAllocation.schedulerBacklogTimeout</name>
<value>10s</value>
</property>
<property>
<name>hive.spark.client.rpc.max.size</name>
<value>268435456</value>
</property>
<property>
<name>spark.rpc.message.maxSize</name>
<value>256</value>
</property>
<property>
<name>spark.scheduler.mode</name>
<value>FAIR</value>
</property>
<property>
<name>spark.blacklist.enabled</name>
<value>true</value>
</property>
<property>
<name>spark.speculation</name>
<value>true</value>
</property>
<property>
<name>spark.task.maxFailures</name>
<value>10</value>
</property>
<property>
<name>spark.rdd.compress</name>
<value>true</value>
</property>
<property>
<name>spark.executor.heartbeatInterval</name>
<value>20s</value>
</property>
<property>
<name>spark.network.timeout</name>
<value>120s</value>
</property>

<!-- 解决shuffle 的问题 -->
<property>
<name>spark.sql.tungsten.enabled</name>
<value>true</value>
</property>
<!-- 解决UDF的问题 -->
<property>
<name>spark.jars</name>
<value>/data/service/hive/lib/com.bilibili.hiveudf.jar</value>
</property>

在 添加 ${SPARK_LIB}/spark-*.jar 到 hive的classpath下。
问题一 cluster模式下 作业连接超时。
看了一下提交的作业信息
</div>
<div>SparkSubmit --properties-file /tmp/spark-submit.3467179554612125337.properties --class org.apache.hive.spark.client.RemoteDriver /data/src/hive/apache-hive-2.3.4-bin/lib/hive
-exec-2.3.4.jar --remote-host jssz-bigdata-hive-04 --remote-port 16317 --conf hive.spark.client.connect.timeout=1000 --conf hive.spark.client.server.connect.timeout=90000 --conf hive.spark.client.channel.
log.level=null --conf hive.spark.client.rpc.max.size=1195725856 --conf hive.spark.client.rpc.threads=8 --conf hive.spark.client.secret.bits=256 --conf hive.spark.client.rpc.server.address=null</div>
<div>
因为cluster模式下,AM是运行在远程的,数据通信是通过 –remote-host 这个参数来进行回传 。后来发现是nodemanager节点不能解析域名,配置加上后解决
问题二 无法识别hive 的UDF
在配置中添加 spark.jars 配置,把UDF的包给加上去。
问题三 Unable to acquire ** bytes of memory异常
修改配置
设置spark.sql.tungsten.enabled = false。
设置spark.executor.cores=1。
问题四 hive server 内存不足
因为hive on  spark 本质上也是通过 spark-submit命令来提交。所以提交一个作业时候会消耗一定的内存,那么就
export HADOOP_CLIENT_OPTS=”-Xmx1024m”


阅读全文…

yarn-label Scheduler

需要对于yarn集群进行实时和离线作业的分离,就单独搞一个yarn,实时对于资源的竞争比较敏感,所以用了标签来做机器上的隔离来保证资源的隔离性

阅读全文…

spark streaming作业心跳异常

问题现场

阅读全文…

kafka2.0源码计划之-broker clusterId

_clusterId 生成规则
在KafkaServer中调用的方式是getOrGenerateClusterId,生成唯一值

阅读全文…

Pages: 1 2 3 4 5 6 7 8 9 10 ... 24 25 26 Next