hdfs dfsadmin setBalancerBandwidth 命令解析

hdfs dfsadmin -setBalancerBandwidth 这个命令是datanode做balance或者mover时候一个比较核心的参数配置

当敲下这个命令时候,hdfs这个shell中调用的是 org.apache.hadoop.hdfs.tools.DFSAdmin#setBalancerBandwidth

setBalancerBandwidth 方法中会遍历两个namenode,并调用namenode的setBalancerBandwidth
if (isHaEnabled) {
String nsId = dfsUri.getHost();
List<ProxyAndInfo<ClientProtocol>> proxies =
HAUtil.getProxiesForAllNameNodesInNameservice(dfsConf,
nsId, ClientProtocol.class);
for (ProxyAndInfo<ClientProtocol> proxy : proxies) {
proxy.getProxy().setBalancerBandwidth(bandwidth);
System.out.println(“Balancer bandwidth is set to ” + bandwidth +
” for ” + proxy.getAddress());
}
} else {
dfs.setBalancerBandwidth(bandwidth);
System.out.println(“Balancer bandwidth is set to ” + bandwidth);
}

namenode这边是 NameNodeRpcServer实现了ClientProtocol来处理setBalancerBandwidth请求
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer#setBalancerBandwidth

更新namenode中维护的DatanodeManager中的datanode信息(DatanodeDescriptor)
void setBalancerBandwidth(long bandwidth) throws IOException {
checkOperation(OperationCategory.UNCHECKED);
checkSuperuserPrivilege();
getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
}

修改完namenode中维护的每个datanode的balancer大小。因为datanode和namenode之间存在心跳
BalancerBandwidth是通过org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager#handleHeartbeat下放到datanode
// check for balancer bandwidth update
if (nodeinfo.getBalancerBandwidth() > 0) {
cmds.add(new BalancerBandwidthCommand(nodeinfo.getBalancerBandwidth()));
// set back to 0 to indicate that datanode has been sent the new value
nodeinfo.setBalancerBandwidth(0);
}

datanode通过 org.apache.hadoop.hdfs.server.datanode.BPServiceActor#run. 定时的同namenode之间发送心跳,并处理namenode通过心跳下放的命令
while (shouldRun()) {
try {
offerService();
} catch (Exception ex) {
LOG.error(“Exception in BPOfferService for ” + this, ex);
sleepAndLogInterrupts(5000, “offering service”);
}
}

org.apache.hadoop.hdfs.server.datanode.BPServiceActor#processCommand
最终进入的是 org.apache.hadoop.hdfs.server.datanode.BPOfferService#processCommandFromActive
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
LOG.info(“DatanodeCommand action: DNA_BALANCERBANDWIDTHUPDATE”);
long bandwidth =
((BalancerBandwidthCommand) cmd).getBalancerBandwidthValue();
if (bandwidth > 0) {
DataXceiverServer dxcs =
(DataXceiverServer) dn.dataXceiverServer.getRunnable();
LOG.info(“Updating balance throttler bandwidth from ”
+ dxcs.balanceThrottler.getBandwidth() + ” bytes/s ”
+ “to: ” + bandwidth + ” bytes/s.”);
dxcs.balanceThrottler.setBandwidth(bandwidth);
}
break;
default:

这个时候修改DataXceiverServer的balance throttler,而DataXceiverServer是整个datanode接受和发送数据的流控器

发表评论?

0 条评论。

发表评论


注意 - 你可以用以下 HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>