  • 一、手把手带你源码解析HDFS文件上传之write上传过程




    <dependencies>       <dependency>              <groupId>org.apache.hadoop</groupId>              <artifactId>hadoop-client</artifactId>              <version>3.1.3</version>       </dependency>        <dependency>              <groupId>org.apache.hadoop</groupId>              <artifactId>hadoop-hdfs</artifactId>              <version>3.1.3</version>       </dependency>        <dependency>              <groupId>org.apache.hadoop</groupId>              <artifactId>hadoop-hdfs-client</artifactId>              <version>3.1.3</version>              <scope>provided</scope>       </dependency>        <dependency>              <groupId>junit</groupId>              <artifactId>junit</artifactId>              <version>4.12</version>       </dependency>       <dependency>              <groupId>org.slf4j</groupId>              <artifactId>slf4j-log4j12</artifactId>              <version>1.7.30</version>       </dependency></dependencies>



    @Testpublic void testPut2() throws IOException {    FSDataOutputStream fos = fs.create(newPath("/input"));     fos.write("hello world".getBytes());}



    public void write(byte b[]) throws IOException{    write(b, 0, b.length);} public void write(byte b[], int off, int len)throws IOException {    if ((off | len | (b.length - (len + off)) |(off + len)) < 0)        throw new IndexOutOfBoundsException();     for (int i = 0 ; i < len ; i++) {        write(b[off + i]);    }} public void write(int b) throws IOException {    out.write(b);}



    public abstract void write(int b) throws IOException;

    ctrl + h 查找write实现类,选择FSOutputSummer.java,在该类中查找write


    public synchronized void write(int b) throws IOException {  buf[count++] = (byte)b;  if(count == buf.length) {    flushBuffer();  }} protected synchronized void flushBuffer() throws IOException {  flushBuffer(false, true);} protected synchronized int flushBuffer(boolean keep,    boolean flushPartial) throws IOException {  int bufLen = count;  int partialLen = bufLen %sum.getBytesPerChecksum();  int lenToFlush = flushPartial ? bufLen :bufLen - partialLen;   if (lenToFlush != 0) {// 向队列中写数据   // Directory=> File => Block(128M) => package(64K) => chunk(chunk 512byte + chunksum 4byte)writeChecksumChunks(buf, 0, lenToFlush);     if (!flushPartial || keep) {      count = partialLen;      System.arraycopy(buf, bufLen - count,buf, 0, count);    } else {      count = 0;    }  }   // total bytes left minus unflushed bytesleft  return count - (bufLen - lenToFlush);} private void writeChecksumChunks(byte b[], int off, int len)throws IOException {   // 计算chunk的校验和  sum.calculateChunkedSums(b, off, len, checksum,0);  TraceScope scope = createWriteTraceScope();   // 按照chunk的大小遍历数据  try {    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {      int chunkLen =Math.min(sum.getBytesPerChecksum(), len - i);      int ckOffset = i / sum.getBytesPerChecksum()* getChecksumSize();         // 一个chunk一个chunk的将数据写入队列      writeChunk(b, off + i,chunkLen, checksum, ckOffset,          getChecksumSize());    }  } finally {    if (scope != null) {      scope.close();    }  }} protected abstract void writeChunk(byte[] b, int bOffset, int bLen,   byte[] checksum, int checksumOffset, intchecksumLen) throws IOException;

    ctrl + h 查找writeChunk实现类DFSOutputStream.java

    protected synchronized void writeChunk(byte[] b, intoffset, int len,    byte[] checksum, int ckoff, int cklen)throws IOException {   writeChunkPrepare(len, ckoff, cklen);   // 往packet里面写chunk的校验和 4byte  currentPacket.writeChecksum(checksum, ckoff,cklen);   // 往packet里面写一个chunk 512 byte  currentPacket.writeData(b, offset, len);   // 记录写入packet中的chunk个数,累计到127个chuck,这个packet就满了  currentPacket.incNumChunks();  getStreamer().incBytesCurBlock(len);   // If packet is full, enqueue it fortransmission  if (currentPacket.getNumChunks() == currentPacket.getMaxChunks()||      getStreamer().getBytesCurBlock() ==blockSize) {    enqueueCurrentPacketFull();  }} synchronized void enqueueCurrentPacketFull() throws IOException {  LOG.debug("enqueue full {}, src={},bytesCurBlock={}, blockSize={},"          + " appendChunk={}, {}",currentPacket, src, getStreamer()          .getBytesCurBlock(), blockSize,getStreamer().getAppendChunk(),      getStreamer());   enqueueCurrentPacket();   adjustChunkBoundary();   endBlock();} void enqueueCurrentPacket() throws IOException {  getStreamer().waitAndQueuePacket(currentPacket);  currentPacket = null;} void waitAndQueuePacket(DFSPacket packet) throws IOException{  synchronized (dataQueue) {    try {        // 如果队列满了,等待      // If queue is full, then wait till wehave enough space      boolean firstWait = true;      try {        while (!streamerClosed &&dataQueue.size() + ackQueue.size() >            dfsClient.getConf().getWriteMaxPackets()) {          if (firstWait) {            Span span =Tracer.getCurrentSpan();            if (span != null) {             span.addTimelineAnnotation("dataQueue.wait");            }            firstWait = false;          }          try {            dataQueue.wait();          } catch (InterruptedException e) {            ... ...          }        }      } finally {        Span span = Tracer.getCurrentSpan();        if ((span != null) && (!firstWait)){         span.addTimelineAnnotation("end.wait");        }      }      checkClosed();        // 如果队列没满,向队列中添加数据      queuePacket(packet);    } catch (ClosedChannelException ignored) {    }  }}


    void queuePacket(DFSPacketpacket) {  synchronized (dataQueue) {    if (packet == null) return;   packet.addTraceParent(Tracer.getCurrentSpanId());        // 向队列中添加数据    dataQueue.addLast(packet);     lastQueuedSeqno = packet.getSeqno();    LOG.debug("Queued {}, {}",packet, this);        // 通知队列添加数据完成    dataQueue.notifyAll();  }}



    Ctrl + n全局查找DataStreamer,搜索run方法


    @Overridepublic void run() {        long lastPacket = Time.monotonicNow();       TraceScope scope = null;       while (!streamerClosed &&dfsClient.clientRunning) {        // if the Responder encountered an error, shutdown Responder        if (errorState.hasError()) {              closeResponder();         }         DFSPacket one;        try {              // process datanode IO errors ifany              boolean doSleep =processDatanodeOrExternalError();               final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;              synchronized (dataQueue) {               // wait for a packet to be sent.               long now = Time.monotonicNow();               while ((!shouldStop() && dataQueue.size() == 0 &&                       (stage !=BlockConstructionStage.DATA_STREAMING ||                              now - lastPacket < halfSocketTimeout)) ||doSleep) {                     long timeout =halfSocketTimeout - (now-lastPacket);                     timeout = timeout <= 0 ?1000 : timeout;                     timeout = (stage ==BlockConstructionStage.DATA_STREAMING)?                            timeout : 1000;                     try {                       // 如果dataQueue里面没有数据,代码会阻塞在这儿                       dataQueue.wait(timeout); // 接收到notify消息                     } catch(InterruptedException  e) {                       LOG.warn("Caught exception", e);                     }                     doSleep = false;                     now = Time.monotonicNow();               }               if (shouldStop()) {                     continue;               }               // get packet to be sent.               if (dataQueue.isEmpty()) {                     one =createHeartbeatPacket();               } else {                     try {                       backOffIfNecessary();                     } catch(InterruptedException e) {                       LOG.warn("Caught exception", e);                     }                     //  队列不为空,从队列中取出packet                     one = dataQueue.getFirst(); // regular data packet                     SpanId[] parents =one.getTraceParents();                     if (parents.length > 0){                       scope = dfsClient.getTracer().                              newScope("dataStreamer",parents[0]);                       scope.getSpan().setParents(parents);                     }               }              }               // get new block from namenode.              if (LOG.isDebugEnabled()) {               LOG.debug("stage=" + stage + ", " + this);              }              if (stage ==BlockConstructionStage.PIPELINE_SETUP_CREATE) {               LOG.debug("Allocating new block: {}", this);               // 步骤一:向NameNode 申请block 并建立数据管道               setPipeline(nextBlockOutputStream());               // 步骤二:启动ResponseProcessor用来监听packet发送是否成功               initDataStreaming();              } else if (stage ==BlockConstructionStage.PIPELINE_SETUP_APPEND) {               setupPipelineForAppendOrRecovery();               if (streamerClosed) {                     continue;               }               initDataStreaming();              }               long lastByteOffsetInBlock =one.getLastByteOffsetBlock();              if (lastByteOffsetInBlock >stat.getBlockSize()) {               throw new IOException("BlockSize " + stat.getBlockSize() +                       " < lastByteOffsetInBlock, " +this + ", " + one);              }              … …              // send the packet              SpanId spanId = SpanId.INVALID;              synchronized (dataQueue) {                // move packet from dataQueue to ackQueue               if (!one.isHeartbeatPacket()) {                     if (scope != null) {                       spanId = scope.getSpanId();                       scope.detach();                        one.setTraceScope(scope);                     }                     scope = null;                     // 步骤三:从dataQueue 把要发送的这个packet 移除出去                     dataQueue.removeFirst();                     // 步骤四:然后往ackQueue 里面添加这个packet                     ackQueue.addLast(one);                     packetSendTime.put(one.getSeqno(),Time.monotonicNow());                     dataQueue.notifyAll();               }              }               LOG.debug("{} sending{}", this, one);               // write out data to remotedatanode              try (TraceScope ignored =dfsClient.getTracer().                     newScope("DataStreamer#writeTo",spanId)) {               //  将数据写出去               one.writeTo(blockStream);               blockStream.flush();              } catch (IOException e) {               errorState.markFirstNodeIfNotMarked();               throw e;              }              … …}


    protected LocatedBlock nextBlockOutputStream() throws IOException {  LocatedBlock lb;  DatanodeInfo[] nodes;  StorageType[] nextStorageTypes;  String[] nextStorageIDs;  int count =dfsClient.getConf().getNumBlockWriteRetry();  boolean success;  final ExtendedBlock oldBlock =block.getCurrentBlock();  do {    errorState.resetInternalError();    lastException.clear();     DatanodeInfo[] excluded =getExcludedNodes();       // 向NN获取向哪个DN写数据    lb = locateFollowingBlock(        excluded.length > 0 ? excluded :null, oldBlock);     // 创建管道    success = createBlockOutputStream(nodes,nextStorageTypes, nextStorageIDs,          0L, false);    … …  } while (!success && --count >=0);   if (!success) {    throw new IOException("Unable to createnew block.");  }  return lb;} private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,    ExtendedBlock oldBlock) throws IOException{  return DFSOutputStream.addBlock(excluded, dfsClient, src,oldBlock,      stat.getFileId(), favoredNodes,addBlockFlags);} static LocatedBlock addBlock(DatanodeInfo[]excludedNodes,      DFSClient dfsClient, String src,ExtendedBlock prevBlock, long fileId,      String[] favoredNodes,EnumSet<AddBlockFlag> allocFlags)      throws IOException {        ... ...         //向NN获取向哪个DN写数据        return dfsClient.namenode.addBlock(src,dfsClient.clientName, prevBlock,            excludedNodes, fileId,favoredNodes, allocFlags);        ... ...} LocatedBlock addBlock(String src, String clientName,      ExtendedBlock previous, DatanodeInfo[]excludeNodes, long fileId,      String[] favoredNodes,EnumSet<AddBlockFlag> addBlockFlags)      throws IOException;

    ctrl + h 点击NameNodeRpcServer,在该类中搜索addBlock


    public LocatedBlock addBlock(String src, StringclientName,    ExtendedBlock previous, DatanodeInfo[]excludedNodes, long fileId,    String[] favoredNodes,EnumSet<AddBlockFlag> addBlockFlags)    throws IOException {  checkNNStartup();  LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,      clientName, previous, excludedNodes,favoredNodes, addBlockFlags);  if (locatedBlock != null) {    metrics.incrAddBlockOps();  }  return locatedBlock;}


    LocatedBlock getAdditionalBlock(    String src, long fileId, String clientName,ExtendedBlock previous,    DatanodeInfo[] excludedNodes, String[]favoredNodes,    EnumSet<AddBlockFlag> flags) throwsIOException {  final String operationName ="getAdditionalBlock";  NameNode.stateChangeLog.debug("BLOCK*getAdditionalBlock: {}  inodeId {}"+      " for {}", src, fileId,clientName);   ... ...  // 选择块存储位置  DatanodeStorageInfo[] targets =FSDirWriteFileOp.chooseTargetForNewBlock(      blockManager, src, excludedNodes,favoredNodes, flags, r);   ... ...  return lb;} staticDatanodeStorageInfo[] chooseTargetForNewBlock(    BlockManager bm, String src, DatanodeInfo[]excludedNodes,    String[] favoredNodes,EnumSet<AddBlockFlag> flags,    ValidateAddBlockResult r) throwsIOException {  ... ...  return bm.chooseTarget4NewBlock(src, r.numTargets,clientNode,                                 excludedNodesSet, r.blockSize,                                 favoredNodesList, r.storagePolicyID,                                  r.blockType,r.ecPolicy, flags);} public DatanodeStorageInfo[] chooseTarget4NewBlock(... ...) throws IOException {  ... ...          final DatanodeStorageInfo[] targets =blockplacement.chooseTarget(src,      numOfReplicas, client, excludedNodes,blocksize,      favoredDatanodeDescriptors,storagePolicy, flags);   ... ...  return targets;} DatanodeStorageInfo[] chooseTarget(String src,    int numOfReplicas, Node writer,    Set<Node> excludedNodes,    long blocksize,    List<DatanodeDescriptor>favoredNodes,    BlockStoragePolicy storagePolicy,    EnumSet<AddBlockFlag> flags) {   return chooseTarget(src, numOfReplicas, writer,      newArrayList<DatanodeStorageInfo>(numOfReplicas), false,      excludedNodes, blocksize, storagePolicy,flags);} public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,    int numOfReplicas,    Node writer,    List<DatanodeStorageInfo> chosen,    boolean returnChosenNodes,    Set<Node> excludedNodes,    long blocksize,    BlockStoragePolicy storagePolicy,EnumSet<AddBlockFlag>flags);

    Crtl + h 查找chooseTarget实现类BlockPlacementPolicyDefault.java

    public DatanodeStorageInfo[] chooseTarget(String srcPath,    int numOfReplicas,    Node writer,    List<DatanodeStorageInfo>chosenNodes,    boolean returnChosenNodes,    Set<Node> excludedNodes,    long blocksize,    final BlockStoragePolicy storagePolicy,    EnumSet<AddBlockFlag> flags) {        return chooseTarget(numOfReplicas, writer, chosenNodes,returnChosenNodes,      excludedNodes, blocksize, storagePolicy,flags, null);} private DatanodeStorageInfo[]chooseTarget(int numOfReplicas,  Node writer,  List<DatanodeStorageInfo> chosenStorage,  boolean returnChosenNodes,  Set<Node> excludedNodes,  long blocksize,  final BlockStoragePolicy storagePolicy,  EnumSet<AddBlockFlag> addBlockFlags,  EnumMap<StorageType, Integer> sTypes) {  … …    int[] result =getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);  numOfReplicas = result[0];  int maxNodesPerRack = result[1];     for (DatanodeStorageInfo storage :chosenStorage) {    // add localMachine and related nodes toexcludedNodes       // 获取不可用的DN    addToExcludedNodes(storage.getDatanodeDescriptor(),excludedNodes);  }   List<DatanodeStorageInfo> results =null;  Node localNode = null;  boolean avoidStaleNodes = (stats != null      &&stats.isAvoidingStaleDataNodesForWrite());  //    boolean avoidLocalNode = (addBlockFlags !=null      &&addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)      && writer != null      &&!excludedNodes.contains(writer));  // Attempt to exclude local node if theclient suggests so. If no enough  // nodes can be obtained, it falls back tothe default block placement  // policy.   // 有数据正在写,避免都写入本地  if (avoidLocalNode) {    results = new ArrayList<>(chosenStorage);    Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);    if (writer != null) {      excludedNodeCopy.add(writer);    }    localNode = chooseTarget(numOfReplicas,writer,        excludedNodeCopy, blocksize,maxNodesPerRack, results,        avoidStaleNodes, storagePolicy,        EnumSet.noneOf(StorageType.class),results.isEmpty(), sTypes);    if (results.size() < numOfReplicas) {      // not enough nodes; discard results andfall back      results = null;    }  }  if (results == null) {    results = newArrayList<>(chosenStorage);       // 真正的选择DN节点    localNode = chooseTarget(numOfReplicas, writer, excludedNodes,        blocksize, maxNodesPerRack, results,avoidStaleNodes,        storagePolicy,EnumSet.noneOf(StorageType.class), results.isEmpty(),        sTypes);  }   if (!returnChosenNodes) {     results.removeAll(chosenStorage);  }     // sorting nodes to form a pipeline  return getPipeline(      (writer != null && writerinstanceof DatanodeDescriptor) ? writer          : localNode,      results.toArray(newDatanodeStorageInfo[results.size()]));} private Node chooseTarget(int numOfReplicas,   ... ...) {         writer = chooseTargetInOrder(numOfReplicas, writer,excludedNodes, blocksize,          maxNodesPerRack, results,avoidStaleNodes, newBlock, storageTypes);   ... ...} protected Node chooseTargetInOrder(int numOfReplicas,                               Node writer,                               final Set<Node> excludedNodes,                               final longblocksize,                               final intmaxNodesPerRack,                               final List<DatanodeStorageInfo> results,                               final booleanavoidStaleNodes,                               final booleannewBlock,                              EnumMap<StorageType, Integer> storageTypes)                               throws NotEnoughReplicasException {  final int numOfResults = results.size();  if (numOfResults == 0) {       // 第一个块存储在当前节点    DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,        excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes,        storageTypes, true);     writer = (storageInfo != null) ?storageInfo.getDatanodeDescriptor()                                   : null;     if (--numOfReplicas == 0) {      return writer;    }  }  final DatanodeDescriptor dn0 =results.get(0).getDatanodeDescriptor();  // 第二个块存储在另外一个机架  if (numOfResults <= 1) {    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,        results, avoidStaleNodes, storageTypes);    if (--numOfReplicas == 0) {      return writer;    }  }  if (numOfResults <= 2) {    final DatanodeDescriptor dn1 =results.get(1).getDatanodeDescriptor();       // 如果第一个和第二个在同一个机架,那么第三个放在其他机架    if (clusterMap.isOnSameRack(dn0, dn1)) {      chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,          results, avoidStaleNodes,storageTypes);    } else if (newBlock){        // 如果是新块,和第二个块存储在同一个机架      chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,          results, avoidStaleNodes,storageTypes);    } else {        // 如果不是新块,放在当前机架      chooseLocalRack(writer,excludedNodes, blocksize, maxNodesPerRack,          results, avoidStaleNodes,storageTypes);    }    if (--numOfReplicas == 0) {      return writer;    }  }  chooseRandom(numOfReplicas, NodeBase.ROOT,excludedNodes, blocksize,      maxNodesPerRack, results,avoidStaleNodes, storageTypes);  return writer;}




    protected LocatedBlock nextBlockOutputStream() throws IOException {  LocatedBlock lb;  DatanodeInfo[] nodes;  StorageType[] nextStorageTypes;  String[] nextStorageIDs;  int count =dfsClient.getConf().getNumBlockWriteRetry();  boolean success;  final ExtendedBlock oldBlock =block.getCurrentBlock();  do {    errorState.resetInternalError();    lastException.clear();     DatanodeInfo[] excluded =getExcludedNodes();       // 向NN获取向哪个DN写数据    lb = locateFollowingBlock(        excluded.length > 0 ? excluded :null, oldBlock);     // 创建管道    success = createBlockOutputStream(nodes, nextStorageTypes,nextStorageIDs,          0L, false);    … …  } while (!success && --count >=0);   if (!success) {    throw new IOException("Unable tocreate new block.");  }  return lb;} boolean createBlockOutputStream(DatanodeInfo[]nodes,      StorageType[] nodeStorageTypes, String[]nodeStorageIDs,      long newGS, boolean recoveryFlag) {    ... ...       // 和DN创建socket       s = createSocketForPipeline(nodes[0],nodes.length, dfsClient);             // 获取输出流,用于写数据到DN       OutputStreamunbufOut =NetUtils.getOutputStream(s, writeTimeout);       // 获取输入流,用于读取写数据到DN的结果    InputStreamunbufIn = NetUtils.getInputStream(s,readTimeout);          IOStreamPair saslStreams =dfsClient.saslClient.socketSend(s,        unbufOut, unbufIn, dfsClient,accessToken, nodes[0]);    unbufOut = saslStreams.out;    unbufIn = saslStreams.in;    out = new DataOutputStream(new BufferedOutputStream(unbufOut,       DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));    blockReplyStream = new DataInputStream(unbufIn);             // 发送数据       new Sender(out).writeBlock(blockCopy,nodeStorageTypes[0], accessToken,            dfsClient.clientName, nodes,nodeStorageTypes, null, bcs,            nodes.length, block.getNumBytes(),bytesSent, newGS,            checksum4WriteBlock,cachingStrategy.get(), isLazyPersistFile,            (targetPinnings != null &&targetPinnings[0]), targetPinnings,            nodeStorageIDs[0], nodeStorageIDs);       ... ...} public void writeBlock(... ...) throws IOException{  ... ...   send(out, Op.WRITE_BLOCK, proto.build());}



    Ctrl +n 全局查找DataXceiverServer.java,在该类中查找run方法

    public void run(){  Peer peer = null;  while (datanode.shouldRun &&!datanode.shutdownForUpgrade) {    try {        // 接收socket的请求      peer = peerServer.accept();       // Make sure the xceiver count is notexceeded      int curXceiverCount =datanode.getXceiverCount();      if (curXceiverCount > maxXceiverCount){        throw new IOException("Xceivercount " + curXceiverCount            + " exceeds the limit ofconcurrent xcievers: "            + maxXceiverCount);      }        // 客户端每发送一个block,都启动一个DataXceiver去处理block      new Daemon(datanode.threadGroup,          DataXceiver.create(peer, datanode, this))          .start();    } catch (SocketTimeoutException ignored) {      ... ...    }  }   ... ...}


    public void run() {  int opsProcessed = 0;  Op op = null;   try {    synchronized(this) {      xceiver = Thread.currentThread();    }    dataXceiverServer.addPeer(peer,Thread.currentThread(), this);    peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);    InputStream input = socketIn;    try {      IOStreamPair saslStreams =datanode.saslServer.receive(peer, socketOut,        socketIn,datanode.getXferAddress().getPort(),       return;    }       super.initialize(newDataInputStream(input));       do {      updateCurrentThreadName("Waiting foroperation #" + (opsProcessed + 1));       try {        if (opsProcessed != 0) {          assert dnConf.socketKeepaliveTimeout> 0;         peer.setReadTimeout(dnConf.socketKeepaliveTimeout);        } else {         peer.setReadTimeout(dnConf.socketTimeout);        }              // 读取这次数据的请求类型        op = readOp();      } catch (InterruptedIOException ignored){        // Time out while we wait for clientrpc        break;      } catch (EOFException |ClosedChannelException e) {        // Since we optimistically expect thenext op, it's quite normal to        // get EOF here.        LOG.debug("Cached {} closing after {}ops.  " +            "This message is usuallybenign.", peer, opsProcessed);        break;      } catch (IOException err) {        incrDatanodeNetworkErrors();        throw err;      }       // restore normal timeout      if (opsProcessed != 0) {       peer.setReadTimeout(dnConf.socketTimeout);      }       opStartTime = monotonicNow();        // 根据操作类型处理我们的数据      processOp(op);      ++opsProcessed;    } while ((peer != null) &&        (!peer.isClosed() &&dnConf.socketKeepaliveTimeout > 0));  } catch (Throwable t) {    ... ...  }} protected finalvoid processOp(Op op) throws IOException {  switch(op) {  ... ...  case WRITE_BLOCK:    opWriteBlock(in);    break;  ... ...  default:    throw new IOException("Unknown op" + op + " in data stream");  }} private void opWriteBlock(DataInputStream in) throwsIOException {  final OpWriteBlockProto proto =OpWriteBlockProto.parseFrom(vintPrefixed(in));  final DatanodeInfo[] targets = PBHelperClient.convert(proto.getTargetsList());  TraceScope traceScope =continueTraceSpan(proto.getHeader(),      proto.getClass().getSimpleName());  try {    writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),       PBHelperClient.convertStorageType(proto.getStorageType()),       PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),        proto.getHeader().getClientName(),        targets,       PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(),targets.length),       PBHelperClient.convert(proto.getSource()),        fromProto(proto.getStage()),        proto.getPipelineSize(),        proto.getMinBytesRcvd(),proto.getMaxBytesRcvd(),        proto.getLatestGenerationStamp(),       fromProto(proto.getRequestedChecksum()),        (proto.hasCachingStrategy() ?           getCachingStrategy(proto.getCachingStrategy()) :          CachingStrategy.newDefaultStrategy()),        (proto.hasAllowLazyPersist() ?proto.getAllowLazyPersist() : false),        (proto.hasPinning() ?proto.getPinning(): false),       (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),        proto.getStorageId(),       proto.getTargetStorageIdsList().toArray(new String[0]));  } finally {   if (traceScope != null) traceScope.close();  }}

    Ctrl +alt +b 查找writeBlock的实现类DataXceiver.java

    public void writeBlock(... ...) throws IOException{  ... ...  try {    final Replica replica;    if (isDatanode ||        stage !=BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {      // open a block receiver        // 创建一个BlockReceiver      setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,          peer.getRemoteAddressString(),          peer.getLocalAddressString(),          stage, latestGenerationStamp,minBytesRcvd, maxBytesRcvd,          clientname, srcDataNode, datanode,requestedChecksum,          cachingStrategy, allowLazyPersist,pinning, storageId));      replica = blockReceiver.getReplica();    } else {      replica = datanode.data.recoverClose(          block, latestGenerationStamp,minBytesRcvd);    }    storageUuid = replica.getStorageUuid();    isOnTransientStorage = replica.isOnTransientStorage();     //    // Connect to downstream machine, ifappropriate    // 继续连接下游的机器    if (targets.length > 0) {      InetSocketAddress mirrorTarget = null;      // Connect to backup machine      mirrorNode =targets[0].getXferAddr(connectToDnViaHostname);      LOG.debug("Connecting to datanode{}", mirrorNode);      mirrorTarget =NetUtils.createSocketAddr(mirrorNode);         // 向新的副本发送socket      mirrorSock = datanode.newSocket();      try {         ... ...        if (targetPinnings != null &&targetPinnings.length > 0) {               // 往下游socket发送数据          new Sender(mirrorOut).writeBlock(originalBlock,targetStorageTypes[0],              blockToken, clientname, targets,targetStorageTypes,              srcDataNode, stage, pipelineSize,minBytesRcvd, maxBytesRcvd,              latestGenerationStamp,requestedChecksum, cachingStrategy,              allowLazyPersist,targetPinnings[0], targetPinnings,              targetStorageId, targetStorageIds);        } else {          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],              blockToken, clientname, targets,targetStorageTypes,              srcDataNode, stage, pipelineSize,minBytesRcvd, maxBytesRcvd,              latestGenerationStamp,requestedChecksum, cachingStrategy,              allowLazyPersist, false,targetPinnings,              targetStorageId,targetStorageIds);        }         mirrorOut.flush();        DataNodeFaultInjector.get().writeBlockAfterFlush();         // read connect ack (only for clients,not for replication req)        if (isClient) {          BlockOpResponseProto connectAck =           BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));          mirrorInStatus =connectAck.getStatus();          firstBadLink =connectAck.getFirstBadLink();          if (mirrorInStatus != SUCCESS) {            LOG.debug("Datanode {} gotresponse for connect" +                "ack  from downstream datanode with firstbadlink as{}",                targets.length, firstBadLink);          }        }       … …   //update metrics datanode.getMetrics().addWriteBlockOp(elapsed());  datanode.getMetrics().incrWritesFromClient(peer.isLocal(),size);} BlockReceiver getBlockReceiver(    final ExtendedBlock block, finalStorageType storageType,    final DataInputStream in,    final String inAddr, final String myAddr,    final BlockConstructionStage stage,    final long newGs, final long minBytesRcvd,final long maxBytesRcvd,    final String clientname, final DatanodeInfosrcDataNode,    final DataNode dn, DataChecksumrequestedChecksum,    CachingStrategy cachingStrategy,    final boolean allowLazyPersist,    final boolean pinning,    final String storageId) throws IOException{  return new BlockReceiver(block, storageType, in,      inAddr, myAddr, stage, newGs,minBytesRcvd, maxBytesRcvd,      clientname, srcDataNode, dn,requestedChecksum,      cachingStrategy, allowLazyPersist,pinning, storageId);} BlockReceiver(final ExtendedBlock block,final StorageType storageType,  final DataInputStream in,  final String inAddr, final String myAddr,  final BlockConstructionStage stage,  final long newGs, final long minBytesRcvd,final long maxBytesRcvd,  final String clientname, final DatanodeInfosrcDataNode,  final DataNode datanode, DataChecksumrequestedChecksum,  CachingStrategy cachingStrategy,  final boolean allowLazyPersist,  final boolean pinning,  final String storageId) throws IOException {  ... ...  if (isDatanode) { //replication or move    replicaHandler =       datanode.data.createTemporary(storageType, storageId, block, false);  } else {    switch (stage) {    case PIPELINE_SETUP_CREATE:        // 创建管道      replicaHandler = datanode.data.createRbw(storageType, storageId,          block, allowLazyPersist);      datanode.notifyNamenodeReceivingBlock(          block,replicaHandler.getReplica().getStorageUuid());      break;    ... ...    default: throw newIOException("Unsupported stage " + stage +          " while receiving block " +block + " from " + inAddr);    }  }  ... ...} public ReplicaHandler createRbw(    StorageType storageType, String storageId,ExtendedBlock b,    boolean allowLazyPersist) throwsIOException {  try (AutoCloseableLock lock =datasetLock.acquire()) {    ... ...     if (ref == null) {      ref = volumes.getNextVolume(storageType,storageId, b.getNumBytes());    }     FsVolumeImpl v = (FsVolumeImpl)ref.getVolume();    // create an rbw file to hold block in thedesignated volume     if (allowLazyPersist &&!v.isTransientStorage()) {     datanode.getMetrics().incrRamDiskBlocksWriteFallback();    }     ReplicaInPipeline newReplicaInfo;    try {        // 创建输出流的临时写文件      new ReplicaInfo = v.createRbw(b);      if(new ReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {        throw new IOException("CreateRBWreturned a replica of state "            +new ReplicaInfo.getReplicaInfo().getState()            + " for block " +b.getBlockId());      }    } catch (IOException e) {      IOUtils.cleanup(null, ref);      throw e;    }     volumeMap.add(b.getBlockPoolId(),newReplicaInfo.getReplicaInfo());    return new ReplicaHandler(newReplicaInfo,ref);  }} public ReplicaHandler createRbw(    StorageType storageType, String storageId,ExtendedBlock b,    boolean allowLazyPersist) throwsIOException {  try (AutoCloseableLock lock =datasetLock.acquire()) {    ... ...     if (ref == null) {        // 有可能有多个临时写文件      ref = volumes.getNextVolume(storageType, storageId,b.getNumBytes());    }     FsVolumeImpl v = (FsVolumeImpl)ref.getVolume();    // create an rbw file to hold block in thedesignated volume     if (allowLazyPersist &&!v.isTransientStorage()) {     datanode.getMetrics().incrRamDiskBlocksWriteFallback();    }     ReplicaInPipeline newReplicaInfo;    try {        // 创建输出流的临时写文件      newReplicaInfo = v.createRbw(b);      if(newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {        throw new IOException("CreateRBWreturned a replica of state "            + newReplicaInfo.getReplicaInfo().getState()            + " for block " +b.getBlockId());      }    } catch (IOException e) {      IOUtils.cleanup(null, ref);      throw e;    }     volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());    return new ReplicaHandler(newReplicaInfo,ref);  }} public ReplicaInPipeline createRbw(ExtendedBlock b)throws IOException {   File f = createRbwFile(b.getBlockPoolId(), b.getLocalBlock());  LocalReplicaInPipeline newReplicaInfo = newReplicaBuilder(ReplicaState.RBW)      .setBlockId(b.getBlockId())     .setGenerationStamp(b.getGenerationStamp())      .setFsVolume(this)      .setDirectoryToUse(f.getParentFile())      .setBytesToReserve(b.getNumBytes())      .buildLocalReplicaInPipeline();  return newReplicaInfo;}



    Ctrl + n全局查找DataStreamer,搜索run方法


    @Overridepublic void run() {        long lastPacket = Time.monotonicNow();       TraceScope scope = null;       while (!streamerClosed &&dfsClient.clientRunning) {        // if the Responder encountered an error, shutdown Responder        if (errorState.hasError()) {              closeResponder();         }         DFSPacket one;        try {              // process datanode IO errors ifany              boolean doSleep =processDatanodeOrExternalError();               final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;              synchronized (dataQueue) {               // wait for a packet to be sent.               long now = Time.monotonicNow();               while ((!shouldStop() && dataQueue.size() == 0 &&                       (stage !=BlockConstructionStage.DATA_STREAMING ||                              now - lastPacket < halfSocketTimeout)) ||doSleep) {                     long timeout =halfSocketTimeout - (now-lastPacket);                     timeout = timeout <= 0 ?1000 : timeout;                     timeout = (stage ==BlockConstructionStage.DATA_STREAMING)?                            timeout : 1000;                     try {                       // 如果dataQueue里面没有数据,代码会阻塞在这儿                       dataQueue.wait(timeout); // 接收到notify消息                     } catch(InterruptedException  e) {                       LOG.warn("Caught exception", e);                     }                     doSleep = false;                     now = Time.monotonicNow();               }               if (shouldStop()) {                     continue;               }               // get packet to be sent.               if (dataQueue.isEmpty()) {                     one =createHeartbeatPacket();               } else {                     try {                       backOffIfNecessary();                     } catch(InterruptedException e) {                       LOG.warn("Caught exception", e);                     }                     //  队列不为空,从队列中取出packet                     one = dataQueue.getFirst(); // regular data packet                     SpanId[] parents =one.getTraceParents();                     if (parents.length > 0){                       scope = dfsClient.getTracer().                              newScope("dataStreamer",parents[0]);                       scope.getSpan().setParents(parents);                     }               }              }               // get new block from namenode.              if (LOG.isDebugEnabled()) {               LOG.debug("stage=" + stage + ", " + this);              }              if (stage ==BlockConstructionStage.PIPELINE_SETUP_CREATE) {               LOG.debug("Allocating new block: {}", this);               // 步骤一:向NameNode 申请block 并建立数据管道               setPipeline(nextBlockOutputStream());               // 步骤二:启动ResponseProcessor用来监听packet发送是否成功               initDataStreaming();              } else if (stage ==BlockConstructionStage.PIPELINE_SETUP_APPEND) {               LOG.debug("Append to block {}", block);               setupPipelineForAppendOrRecovery();               if (streamerClosed) {                     continue;               }               initDataStreaming();              }               long lastByteOffsetInBlock =one.getLastByteOffsetBlock();              if (lastByteOffsetInBlock >stat.getBlockSize()) {               throw new IOException("BlockSize " + stat.getBlockSize() +                       " < lastByteOffsetInBlock, " +this + ", " + one);              }               if (one.isLastPacketInBlock()) {               // wait for all data packets have been successfully acked               synchronized (dataQueue) {                     while (!shouldStop()&& ackQueue.size() != 0) {                       try {                            // wait for acks toarrive from datanodes                            dataQueue.wait(1000);                       } catch (InterruptedException  e) {                            LOG.warn("Caughtexception", e);                       }                     }               }               if (shouldStop()) {                     continue;               }               stage = BlockConstructionStage.PIPELINE_CLOSE;              }               // send the packet              SpanId spanId = SpanId.INVALID;              synchronized (dataQueue) {                // move packet from dataQueue to ackQueue               if (!one.isHeartbeatPacket()) {                     if (scope != null) {                       spanId = scope.getSpanId();                       scope.detach();                        one.setTraceScope(scope);                     }                     scope = null;                     // 步骤三:从dataQueue 把要发送的这个packet 移除出去                     dataQueue.removeFirst();                     // 步骤四:然后往ackQueue 里面添加这个packet                     ackQueue.addLast(one);                     packetSendTime.put(one.getSeqno(),Time.monotonicNow());                     dataQueue.notifyAll();               }              }               LOG.debug("{} sending{}", this, one);               // write out data to remotedatanode              try (TraceScope ignored =dfsClient.getTracer().                     newScope("DataStreamer#writeTo",spanId)) {               //  将数据写出去               one.writeTo(blockStream);               blockStream.flush();              } catch (IOException e) {               errorState.markFirstNodeIfNotMarked();               throw e;              }              lastPacket = Time.monotonicNow();               // update bytesSent              long tmpBytesSent = one.getLastByteOffsetBlock();              if (bytesSent < tmpBytesSent) {               bytesSent = tmpBytesSent;              }               if (shouldStop()) {               continue;              }               // Is this block full?              if (one.isLastPacketInBlock()) {               // wait for the close packet has been acked               synchronized (dataQueue) {                     while (!shouldStop()&& ackQueue.size() != 0) {                       dataQueue.wait(1000);// wait for acks toarrive from datanodes                     }               }               if (shouldStop()) {                     continue;               }                endBlock();              }              if (progress != null) { progress.progress();}               // This is used by unit test totrigger race conditions.              if (artificialSlowdown != 0&& dfsClient.clientRunning) {               Thread.sleep(artificialSlowdown);              }         }catch (Throwable e) {              ... ...         }finally {              if (scope != null) {               scope.close();               scope = null;              }         }       }       closeInternal();} private void initDataStreaming() {  this.setName("DataStreamer for file" + src +      " block " + block);  ... ...  response = new ResponseProcessor(nodes);  response.start();  stage =BlockConstructionStage.DATA_STREAMING;}

    点击response再点击ResponseProcessor,ctrl + f 查找run方法

    public void run(){    ... ...       ackQueue.removeFirst();       packetSendTime.remove(seqno);       dataQueue.notifyAll();       ... ...}



    datanodes blocks/total blocks >= 99.999% + 30s(缓冲时间) 此时安全模式才会推出

    二、eclipse 向hdfs 上传文件为空怎么解决

    而计算进度的原理,不就是已经传输的大小与总大小的比值嘛。这样就简单了,就拿最基本的OutputStream来说,它的基本的写出方法为void write(byte[] b) ,实际上写出的过程不就是通过InputStream循环读,然后OutputStream循环写嘛。你只要事先通过File类取得文件的总大小,然后在读入或写出的循环里加一个简单的进度计算的步骤,每读取或写出一次,就将已传输大小增加b.length,求出比值,更新进度条。具体的计算间隔,可以根据循环次数或时间间隔来定。

