HDFS副本存放策略

Posted by JustDoDT on April 14, 2018

概述

何为副本存放策略

首先这里要花一些篇幅来介绍什么是副本放置策略, 有人也会叫他为副本选择策略,这源于此策略的名称, BlockPlacementPolicy.所以这个策略类重在block placement.先来看下这个策略类的功能说明:

Apache官网描述

Apache官网对HDFS副本放置策略的描述

For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on the local machine if the writer is on a datanode, otherwise on a random datanode, another replica on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.

CDH官网对HDFS副本放置策略的描述

CDH官网对HDFS副本放置策略的描述

For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on the local machine if the writer is on a datanode, otherwise on a random datanode, another replica on a node in a different (remote) rack, and the last on a different node in the same remote rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.

从上面的可以看出,Apache版本的Hadoop和CDH版本的对于HDFS副本存放策略描述完全一样;在CDH-5.7.0的描述和Apache还是有差距的

CDH-5.7.0的HDFS副本放置策略

For the common case, when the replication factor is three, HDFS’s placement policy is to put one replica on one node in the local rack, another on a different node in the local rack, and the last on a different node in a different rack. This policy cuts the inter-rack write traffic which generally improves write performance. The chance of rack failure is far less than that of node failure; this policy does not impact data reliability and availability guarantees. However, it does reduce the aggregate network bandwidth used when reading data since a block is placed in only two unique racks rather than three. With this policy, the replicas of a file do not evenly distribute across the racks. One third of replicas are on one node, two thirds of replicas are on one rack, and the other third are evenly distributed across the remaining racks. This policy improves write performance without compromising data reliability or read performance.

为了探索一个究竟,唯一的出路就是查看HDFS的源代码。 HDFS源码地址

通过查看源码可以发现目前的副本放置策略有2大继承子类,分别是BlockPlacementPolicyDefault, BlockPlacementPolicyWithNodeGroup,其关系结构为:BlockPlacementPolicy —-> BlockPlacementPolicyDefault —> BlockPlacementPolicyWithNodeGroup

我们日常生活中提到最经典的3副本策略用的就是BlockPlacementPolicyDefault策略类.3副本如何存放在这个策略中得到了非常完美的实现.在BlockPlacementPolicyDefault.java 中的注释具体解释了3个副本的存放位置:

 The class is responsible for choosing the desired number of targets
 for placing block replicas.
 The replica placement strategy is that if the writer is on a datanode,
 the 1st replica is placed on the local machine, 
 otherwise a random datanode. The 2nd replica is placed on a datanode
 that is on a different rack. The 3rd replica is placed on a datanode
 which is on a different node of the rack as the second replica.

BlockPlacementPolicyDefault这个类中的选择目标节点的策略核心方法是chooseTargets

策略核心方法chooseTargets

在默认放置策略方法中,核心方法就是chooseTargets,但是在这里有2种同名实现方法,唯一的区别是有无favoredNodes参数.favoredNodes的意思是偏爱,喜爱的节点.这2个方法的介绍如下。

favoredNodes参数的介绍

  /**
   * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
   * to re-replicate a block with size <i>blocksize</i> 
   * If not, return as many as we can.
   *
   * @param srcPath the file to which this chooseTargets is being invoked.
   * @param numOfReplicas additional number of replicas wanted.
   * @param writer the writer's machine, null if not in the cluster.
   * @param chosen datanodes that have been chosen as targets.
   * @param returnChosenNodes decide if the chosenNodes are returned.
   * @param excludedNodes datanodes that should not be considered as targets.
   * @param blocksize size of the data to be written.
   * @return array of DatanodeDescriptor instances chosen as target
   * and sorted as a pipeline.
   */
  public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
                                             int numOfReplicas,
                                             Node writer,
                                             List<DatanodeStorageInfo> chosen,
                                             boolean returnChosenNodes,
                                             Set<Node> excludedNodes,
                                             long blocksize,
                                             BlockStoragePolicy storagePolicy);

/**
   * Same as {@link #chooseTarget(String, int, Node, Set, long, List, StorageType)}
   * with added parameter {@code favoredDatanodes}
   * @param favoredNodes datanodes that should be favored as targets. This
   *          is only a hint and due to cluster state, namenode may not be 
   *          able to place the blocks on these datanodes.
   */
  DatanodeStorageInfo[] chooseTarget(String src,
      int numOfReplicas, Node writer,
      Set<Node> excludedNodes,
      long blocksize,
      List<DatanodeDescriptor> favoredNodes,
      BlockStoragePolicy storagePolicy) {
      }          

在chooseTargets传入偏爱的节点参数会使得方法在选择节点时候优先选取偏爱节点参数中的节点.这是这个参数的最根本的影响.

chooseTarget无favoredNodes参数实现

此方法分为3个阶段

1. 初始化阶段

  /** This is the implementation. */
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
                                Node writer,
                                List<DatanodeStorageInfo> chosenStorage,
                                boolean returnChosenNodes,
                                Set<Node> excludedNodes,
                                long blocksize,
                                final BlockStoragePolicy storagePolicy) {
// 如果目标完成副本数为0或机器节点数量为0,返回空
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
  return DatanodeStorageInfo.EMPTY_ARRAY;
}
// 创建黑名单列表集  
if (excludedNodes == null) {
  excludedNodes = new HashSet<Node>();
}
// 计算每个机架所允许最大副本数
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
...

2. 选择目标节点

    ...
// 将所选节点加入到结果列表中,同时加入到移除列表中,意为已选择过的节点
final List<DatanodeStorageInfo> results = new ArrayList<DatanodeStorageInfo>(chosenStorage);
for (DatanodeStorageInfo storage : chosenStorage) {
  // add localMachine and related nodes to excludedNodes
  addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
}
// 计算是否需要避免旧的,未更新的节点
boolean avoidStaleNodes = (stats != null
    && stats.isAvoidingStaleDataNodesForWrite());
// 选择numOfReplicas规定副本数的目标机器,并返回其中第一个节点
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
    blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
// 如果不像返回初始选中的目标节点,则进行移除
if (!returnChosenNodes) {  
  results.removeAll(chosenStorage);
}
...

3. 排序目标节点列表,形成pipeline

    ...  
// sorting nodes to form a pipeline
// 根据最短距离排序目标节点列表,形成pipeline
return getPipeline(
    (writer != null && writer instanceof DatanodeDescriptor) ? writer
        : localNode,
    results.toArray(new DatanodeStorageInfo[results.size()]));

chooseTarget方法主逻辑

下面介绍chooseTarget主要选择逻辑,因为个人感觉是最复杂的,所以放在最后分析.首先,务必要明确以下几个涉及参数的作用和所代表的意义:

final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
        blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy,
        EnumSet.noneOf(StorageType.class), results.isEmpty());
  • numOfReplicas, 额外需要复制的副本数

  • excludedNodes,移除节点集合,此集合内的节点不应被考虑作为目标节点

  • results,当前已经选择好的目标节点集合

  • storagePolicy,存储类型选择策略

第一个节点的选择

首先是第一个节点的选择,第一个节点其实是最好选择的,因为他不用其他2个节点的位置影响,但是他同样要约束于请求方所在位置,这里满足3个原则:

  • 如果writer请求方本身位于集群中的一个datanode之上,则第一个副本的位置就在本地节点上,很好理解,这样直接就是本地写操作了。

  • 如果writer请求方纯粹来源于外界客户端的写请求时,则从已选择好的目标节点result列表中挑选第一个节点作为首个节点。

  • 如果result列表中还是没有任何节点,则会从集群中随机挑选1个node作为第一个localNode。

源码地址

在BlockPlacementPolicyDefault.java 中的395行

// 如果额外需要请求副本数为0,或者集群中没有可选节点
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
  // 如果writer请求者在其中一个datanode上则返回此节点,否则直接返回null
  return (writer instanceof DatanodeDescriptor) ? writer : null;
}
// 获取已经选择完成的节点数
final int numOfResults = results.size();
// 计算期望希望达到的副本总数
final int totalReplicasExpected = numOfReplicas + numOfResults;
// 如果writer为空或不在datanode上,则取出已选择好列表中的第一个位置所在节点,赋值给writer
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
  writer = results.get(0).getDatanodeDescriptor();
}

// Keep a copy of original excludedNodes
// 做一份移除列表名单的拷贝
final Set<Node> oldExcludedNodes = new HashSet<Node>(excludedNodes);

// choose storage types; use fallbacks for unavailable storages
// 根据存储策略获取副本需要满足的存储类型列表,如果有不可用的存储类型,会采用fallback的类型
final List<StorageType> requiredStorageTypes = storagePolicy
    .chooseStorageTypes((short) totalReplicasExpected,
        DatanodeStorageInfo.toStorageTypes(results),
        unavailableStorages, newBlock);
// 将存储类型列表进行计数统计,并存于map中
final EnumMap<StorageType, Integer> storageTypes =
    getRequiredStorageTypes(requiredStorageTypes);
if (LOG.isTraceEnabled()) {
  LOG.trace("storageTypes=" + storageTypes);
}
...

三副本位置的选择

`在BlockPlacementPolicyDefault.java 中的493 …

  // 如果已选择的目标节点数量为0,则表示3副本一个都还没开始选,首先从选本地节点开始
  if (numOfResults == 0) {
    writer = chooseLocalStorage(writer, excludedNodes, blocksize,
        maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
            .getDatanodeDescriptor();
    // 如果此时目标需求完成的副本数为降为0,代表选择目标完成,返回第一个节点writer
    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  // 取出result列表第一个节点
  final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
  // 前面的过程已经完成首个本地节点的选择,此时进行不同机房的节点选择
  if (numOfResults <= 1) {
    // 选择1个不同于dn0所在机房的一个目标节点位置
    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
    // 如果此时目标需求完成的副本数为降为0,代表选择目标完成,返回第一个节点writer
    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  // 如果经过前面的处理,节点选择数在2个以内,需要选取第3个副本
  if (numOfResults <= 2) {
    // 取出result列表第二个节点
    final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
    // 如果dn0,dn1所在同机房,
    if (clusterMap.isOnSameRack(dn0, dn1)) {
      // 则选择1个不同于dn0,dn1所在机房的副本位置
      chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else if (newBlock){
      // 如果是新的block块,则选取1个于dn1所在同机房的节点位置
      chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else {
      // 否则选取于writer同机房的位置
      chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    }
    // 如果此时目标需求完成的副本数为降为0,代表选择目标完成,返回第一个节点writer
    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  // 如果副本数已经超过2个,说明设置的block的时候,已经设置超过3副本的数量
  // 则剩余位置在集群中随机选择放置节点
  chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
      maxNodesPerRack, results, avoidStaleNodes, storageTypes);

看完这段逻辑,只要明白经典的3副本存放位置,多余的副本随机存放的原理即可.当然在其间选择的过程中可能会发生异常,因为有的时候我们没有配置机架感知,集群中都属于一个默认机架的default-rack,则会导致chooseRemoteRack的方法出错,因为没有满足条件的其余机架,这时需要一些重试策略.

  if (retry) {
    for (DatanodeStorageInfo resultStorage : results) {
      addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
          oldExcludedNodes);
    }
    // 剔除之前完成的选择的目标位置,重新计算当前需要复制的副本数
    numOfReplicas = totalReplicasExpected - results.size();
    // 重新调用自身方法进行复制
    return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
        maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
        newBlock);
  }

chooseLocalStorage,chooseLocalRack,chooseRemoteRack和chooseRandom方法

首先选择本地存储位置.如果没有满足条件的,再选择本地机架的节点,如果还是没有满足条件的,进一步降级选择不同机架的节点,最后随机选择集群中的节点

chooseLocalStorage —> chooseLocalRack —> chooseRemoteRack —> chooseRandom

注意:chooseLocalStorage方法,与其余的3个方法稍显不同,单独实现,而其余的方法是通过传入不同参数直接或间接调用 chooseRandom方法.

首先看下chooseLocalStorage方法实现

  protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
      Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
      EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack)
      throws NotEnoughReplicasException {
    // if no local machine, randomly choose one node
    if (localMachine == null) {
      // 如果本地节点为空,则降级选择1个随机节点
      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
          maxNodesPerRack, results, avoidStaleNodes, storageTypes);
    }
    if (preferLocalNode && localMachine instanceof DatanodeDescriptor) {
      DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine;
      // otherwise try local machine first
      if (excludedNodes.add(localMachine)) { // was not in the excluded list
        for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes
            .entrySet().iterator(); iter.hasNext(); ) {
          Map.Entry<StorageType, Integer> entry = iter.next();
          // 遍历本地节点可用的存储目录
          for (DatanodeStorageInfo localStorage : DFSUtil.shuffle(
              localDatanode.getStorageInfos())) {
            StorageType type = entry.getKey();
            // 加入满足条件的存储目录位置
            if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
                maxNodesPerRack, false, results, avoidStaleNodes, type) >= 0) {
              int num = entry.getValue();
              if (num == 1) {
                iter.remove();
              } else {
                entry.setValue(num - 1);
              }
              return localStorage;
            }
          }
        }
      } 
    }

    if (!fallbackToLocalRack) {
      return null;
    }
    // 本地节点没有满足条件的存储位置,则降级选取同机架的节点
    // try a node on local rack
    return chooseLocalRack(localMachine, excludedNodes, blocksize,
        maxNodesPerRack, results, avoidStaleNodes, storageTypes);
  }

chooseLocalRack和chooseRemoteRack比较类似

chooseLocalRack

   // no local machine, so choose a random machine
 if (localMachine == null) {
   return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
       maxNodesPerRack, results, avoidStaleNodes, storageTypes);
 }
 // 获取本地机架名
 final String localRack = localMachine.getNetworkLocation();
 
 try {
   // choose one from the local rack
   // 将机架名作为scope参数传入
   return chooseRandom(localRack, excludedNodes,
       blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);

chooseRemoteRack

    // no local machine, so choose a random machine
   try {
  // choose one from the local rack
  // 将机架名作为scope参数传入
  return chooseRandom(localRack, excludedNodes,
      blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes);

chooseRemoteRack

  // 获取本地机架名称,带上前缀字符~,作为scope参数传入
 chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
     excludedNodes, blocksize, maxReplicasPerRack, results,
     avoidStaleNodes, storageTypes);

从这里我们可以看到,这里最明显的区别就是chooseRandom的scope参数的传入,scope参数的直接作用就是会选择出是否属于此机架下的节点列表。

  // 获取本地机架名称,带上前缀字符~,作为scope参数传入
 chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(),
     excludedNodes, blocksize, maxReplicasPerRack, results,
     avoidStaleNodes, storageTypes);

总结

通过上面的分析也就明白一个问题:网上经常会看到,有人说第三个DN是与第二个DN是同rack的,也有人说第三个DN是与第一个DN同rack的。那么到底哪个说法对呢? 关键就看第二个DN的选择,我在上面写了,第二个DN可能是与第一个DN不在同一个rack,但也可能在同一个rack中,具体要根据当时集群中的情况来分析。 所以不能简单的认死理。

本文基本上属于转载,原创博文

参考博文 参考博文