首页系统综合问题手把手带你源码解析HDFS文件上传之write上传过程

手把手带你源码解析HDFS文件上传之write上传过程

时间2022-12-22 07:38:32发布分享专员分类系统综合问题浏览261

今天小编给各位分享write的知识,文中也会对其通过手把手带你源码解析HDFS文件上传之write上传过程和hdfs的启动流程等多篇文章进行知识讲解,如果文章内容对您有帮助,别忘了关注本站,现在进入正文!

内容导航:

  • 手把手带你源码解析HDFS文件上传之write上传过程
  • hdfs的启动流程
  • eclipse 向hdfs 上传文件为空怎么解决
  • 文件上传时,进度条的设计原理是什么?
  • 一、手把手带你源码解析HDFS文件上传之write上传过程

    HDFS的写数据流程,如下图所示:

    HDFS上传源码解析如下图所示:

    0)在pom.xml中增加如下依赖

    <dependencies>       <dependency>              <groupId>org.apache.hadoop</groupId>              <artifactId>hadoop-client</artifactId>              <version>3.1.3</version>       </dependency>        <dependency>              <groupId>org.apache.hadoop</groupId>              <artifactId>hadoop-hdfs</artifactId>              <version>3.1.3</version>       </dependency>        <dependency>              <groupId>org.apache.hadoop</groupId>              <artifactId>hadoop-hdfs-client</artifactId>              <version>3.1.3</version>              <scope>provided</scope>       </dependency>        <dependency>              <groupId>junit</groupId>              <artifactId>junit</artifactId>              <version>4.12</version>       </dependency>       <dependency>              <groupId>org.slf4j</groupId>              <artifactId>slf4j-log4j12</artifactId>              <version>1.7.30</version>       </dependency></dependencies>

    一、向DataStreamer的队列里面写数据

    用户自己写的代码

    @Testpublic void testPut2() throws IOException {    FSDataOutputStream fos = fs.create(newPath("/input"));     fos.write("hello world".getBytes());}

    点击write

    FilterOutputStream.java

    public void write(byte b[]) throws IOException{    write(b, 0, b.length);} public void write(byte b[], int off, int len)throws IOException {    if ((off | len | (b.length - (len + off)) |(off + len)) < 0)        throw new IndexOutOfBoundsException();     for (int i = 0 ; i < len ; i++) {        write(b[off + i]);    }} public void write(int b) throws IOException {    out.write(b);}

    点击write

    OutputStream.java

    public abstract void write(int b) throws IOException;

    ctrl + h 查找write实现类,选择FSOutputSummer.java,在该类中查找write

    FSOutputSummer.java

    public synchronized void write(int b) throws IOException {  buf[count++] = (byte)b;  if(count == buf.length) {    flushBuffer();  }} protected synchronized void flushBuffer() throws IOException {  flushBuffer(false, true);} protected synchronized int flushBuffer(boolean keep,    boolean flushPartial) throws IOException {  int bufLen = count;  int partialLen = bufLen %sum.getBytesPerChecksum();  int lenToFlush = flushPartial ? bufLen :bufLen - partialLen;   if (lenToFlush != 0) {// 向队列中写数据   // Directory=> File => Block(128M) => package(64K) => chunk(chunk 512byte + chunksum 4byte)writeChecksumChunks(buf, 0, lenToFlush);     if (!flushPartial || keep) {      count = partialLen;      System.arraycopy(buf, bufLen - count,buf, 0, count);    } else {      count = 0;    }  }   // total bytes left minus unflushed bytesleft  return count - (bufLen - lenToFlush);} private void writeChecksumChunks(byte b[], int off, int len)throws IOException {   // 计算chunk的校验和  sum.calculateChunkedSums(b, off, len, checksum,0);  TraceScope scope = createWriteTraceScope();   // 按照chunk的大小遍历数据  try {    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {      int chunkLen =Math.min(sum.getBytesPerChecksum(), len - i);      int ckOffset = i / sum.getBytesPerChecksum()* getChecksumSize();         // 一个chunk一个chunk的将数据写入队列      writeChunk(b, off + i,chunkLen, checksum, ckOffset,          getChecksumSize());    }  } finally {    if (scope != null) {      scope.close();    }  }} protected abstract void writeChunk(byte[] b, int bOffset, int bLen,   byte[] checksum, int checksumOffset, intchecksumLen) throws IOException;

    ctrl + h 查找writeChunk实现类DFSOutputStream.java

    protected synchronized void writeChunk(byte[] b, intoffset, int len,    byte[] checksum, int ckoff, int cklen)throws IOException {   writeChunkPrepare(len, ckoff, cklen);   // 往packet里面写chunk的校验和 4byte  currentPacket.writeChecksum(checksum, ckoff,cklen);   // 往packet里面写一个chunk 512 byte  currentPacket.writeData(b, offset, len);   // 记录写入packet中的chunk个数,累计到127个chuck,这个packet就满了  currentPacket.incNumChunks();  getStreamer().incBytesCurBlock(len);   // If packet is full, enqueue it fortransmission  if (currentPacket.getNumChunks() == currentPacket.getMaxChunks()||      getStreamer().getBytesCurBlock() ==blockSize) {    enqueueCurrentPacketFull();  }} synchronized void enqueueCurrentPacketFull() throws IOException {  LOG.debug("enqueue full {}, src={},bytesCurBlock={}, blockSize={},"          + " appendChunk={}, {}",currentPacket, src, getStreamer()          .getBytesCurBlock(), blockSize,getStreamer().getAppendChunk(),      getStreamer());   enqueueCurrentPacket();   adjustChunkBoundary();   endBlock();} void enqueueCurrentPacket() throws IOException {  getStreamer().waitAndQueuePacket(currentPacket);  currentPacket = null;} void waitAndQueuePacket(DFSPacket packet) throws IOException{  synchronized (dataQueue) {    try {        // 如果队列满了,等待      // If queue is full, then wait till wehave enough space      boolean firstWait = true;      try {        while (!streamerClosed &&dataQueue.size() + ackQueue.size() >            dfsClient.getConf().getWriteMaxPackets()) {          if (firstWait) {            Span span =Tracer.getCurrentSpan();            if (span != null) {             span.addTimelineAnnotation("dataQueue.wait");            }            firstWait = false;          }          try {            dataQueue.wait();          } catch (InterruptedException e) {            ... ...          }        }      } finally {        Span span = Tracer.getCurrentSpan();        if ((span != null) && (!firstWait)){         span.addTimelineAnnotation("end.wait");        }      }      checkClosed();        // 如果队列没满,向队列中添加数据      queuePacket(packet);    } catch (ClosedChannelException ignored) {    }  }}

    DataStreamer.java

    void queuePacket(DFSPacketpacket) {  synchronized (dataQueue) {    if (packet == null) return;   packet.addTraceParent(Tracer.getCurrentSpanId());        // 向队列中添加数据    dataQueue.addLast(packet);     lastQueuedSeqno = packet.getSeqno();    LOG.debug("Queued {}, {}",packet, this);        // 通知队列添加数据完成    dataQueue.notifyAll();  }}

    二、建立管道之机架感知(块存储位置)

    1)点击create

    Ctrl + n全局查找DataStreamer,搜索run方法

    DataStreamer.java

    @Overridepublic void run() {        long lastPacket = Time.monotonicNow();       TraceScope scope = null;       while (!streamerClosed &&dfsClient.clientRunning) {        // if the Responder encountered an error, shutdown Responder        if (errorState.hasError()) {              closeResponder();         }         DFSPacket one;        try {              // process datanode IO errors ifany              boolean doSleep =processDatanodeOrExternalError();               final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;              synchronized (dataQueue) {               // wait for a packet to be sent.               long now = Time.monotonicNow();               while ((!shouldStop() && dataQueue.size() == 0 &&                       (stage !=BlockConstructionStage.DATA_STREAMING ||                              now - lastPacket < halfSocketTimeout)) ||doSleep) {                     long timeout =halfSocketTimeout - (now-lastPacket);                     timeout = timeout <= 0 ?1000 : timeout;                     timeout = (stage ==BlockConstructionStage.DATA_STREAMING)?                            timeout : 1000;                     try {                       // 如果dataQueue里面没有数据,代码会阻塞在这儿                       dataQueue.wait(timeout); // 接收到notify消息                     } catch(InterruptedException  e) {                       LOG.warn("Caught exception", e);                     }                     doSleep = false;                     now = Time.monotonicNow();               }               if (shouldStop()) {                     continue;               }               // get packet to be sent.               if (dataQueue.isEmpty()) {                     one =createHeartbeatPacket();               } else {                     try {                       backOffIfNecessary();                     } catch(InterruptedException e) {                       LOG.warn("Caught exception", e);                     }                     //  队列不为空,从队列中取出packet                     one = dataQueue.getFirst(); // regular data packet                     SpanId[] parents =one.getTraceParents();                     if (parents.length > 0){                       scope = dfsClient.getTracer().                              newScope("dataStreamer",parents[0]);                       scope.getSpan().setParents(parents);                     }               }              }               // get new block from namenode.              if (LOG.isDebugEnabled()) {               LOG.debug("stage=" + stage + ", " + this);              }              if (stage ==BlockConstructionStage.PIPELINE_SETUP_CREATE) {               LOG.debug("Allocating new block: {}", this);               // 步骤一:向NameNode 申请block 并建立数据管道               setPipeline(nextBlockOutputStream());               // 步骤二:启动ResponseProcessor用来监听packet发送是否成功               initDataStreaming();              } else if (stage ==BlockConstructionStage.PIPELINE_SETUP_APPEND) {               setupPipelineForAppendOrRecovery();               if (streamerClosed) {                     continue;               }               initDataStreaming();              }               long lastByteOffsetInBlock =one.getLastByteOffsetBlock();              if (lastByteOffsetInBlock >stat.getBlockSize()) {               throw new IOException("BlockSize " + stat.getBlockSize() +                       " < lastByteOffsetInBlock, " +this + ", " + one);              }              … …              // send the packet              SpanId spanId = SpanId.INVALID;              synchronized (dataQueue) {                // move packet from dataQueue to ackQueue               if (!one.isHeartbeatPacket()) {                     if (scope != null) {                       spanId = scope.getSpanId();                       scope.detach();                        one.setTraceScope(scope);                     }                     scope = null;                     // 步骤三:从dataQueue 把要发送的这个packet 移除出去                     dataQueue.removeFirst();                     // 步骤四:然后往ackQueue 里面添加这个packet                     ackQueue.addLast(one);                     packetSendTime.put(one.getSeqno(),Time.monotonicNow());                     dataQueue.notifyAll();               }              }               LOG.debug("{} sending{}", this, one);               // write out data to remotedatanode              try (TraceScope ignored =dfsClient.getTracer().                     newScope("DataStreamer#writeTo",spanId)) {               //  将数据写出去               one.writeTo(blockStream);               blockStream.flush();              } catch (IOException e) {               errorState.markFirstNodeIfNotMarked();               throw e;              }              … …}

    点击nextBlockOutputStream

    protected LocatedBlock nextBlockOutputStream() throws IOException {  LocatedBlock lb;  DatanodeInfo[] nodes;  StorageType[] nextStorageTypes;  String[] nextStorageIDs;  int count =dfsClient.getConf().getNumBlockWriteRetry();  boolean success;  final ExtendedBlock oldBlock =block.getCurrentBlock();  do {    errorState.resetInternalError();    lastException.clear();     DatanodeInfo[] excluded =getExcludedNodes();       // 向NN获取向哪个DN写数据    lb = locateFollowingBlock(        excluded.length > 0 ? excluded :null, oldBlock);     // 创建管道    success = createBlockOutputStream(nodes,nextStorageTypes, nextStorageIDs,          0L, false);    … …  } while (!success && --count >=0);   if (!success) {    throw new IOException("Unable to createnew block.");  }  return lb;} private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,    ExtendedBlock oldBlock) throws IOException{  return DFSOutputStream.addBlock(excluded, dfsClient, src,oldBlock,      stat.getFileId(), favoredNodes,addBlockFlags);} static LocatedBlock addBlock(DatanodeInfo[]excludedNodes,      DFSClient dfsClient, String src,ExtendedBlock prevBlock, long fileId,      String[] favoredNodes,EnumSet<AddBlockFlag> allocFlags)      throws IOException {        ... ...         //向NN获取向哪个DN写数据        return dfsClient.namenode.addBlock(src,dfsClient.clientName, prevBlock,            excludedNodes, fileId,favoredNodes, allocFlags);        ... ...} LocatedBlock addBlock(String src, String clientName,      ExtendedBlock previous, DatanodeInfo[]excludeNodes, long fileId,      String[] favoredNodes,EnumSet<AddBlockFlag> addBlockFlags)      throws IOException;

    ctrl + h 点击NameNodeRpcServer,在该类中搜索addBlock

    NameNodeRpcServer.java

    public LocatedBlock addBlock(String src, StringclientName,    ExtendedBlock previous, DatanodeInfo[]excludedNodes, long fileId,    String[] favoredNodes,EnumSet<AddBlockFlag> addBlockFlags)    throws IOException {  checkNNStartup();  LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,      clientName, previous, excludedNodes,favoredNodes, addBlockFlags);  if (locatedBlock != null) {    metrics.incrAddBlockOps();  }  return locatedBlock;}

    FSNamesystrm.java

    LocatedBlock getAdditionalBlock(    String src, long fileId, String clientName,ExtendedBlock previous,    DatanodeInfo[] excludedNodes, String[]favoredNodes,    EnumSet<AddBlockFlag> flags) throwsIOException {  final String operationName ="getAdditionalBlock";  NameNode.stateChangeLog.debug("BLOCK*getAdditionalBlock: {}  inodeId {}"+      " for {}", src, fileId,clientName);   ... ...  // 选择块存储位置  DatanodeStorageInfo[] targets =FSDirWriteFileOp.chooseTargetForNewBlock(      blockManager, src, excludedNodes,favoredNodes, flags, r);   ... ...  return lb;} staticDatanodeStorageInfo[] chooseTargetForNewBlock(    BlockManager bm, String src, DatanodeInfo[]excludedNodes,    String[] favoredNodes,EnumSet<AddBlockFlag> flags,    ValidateAddBlockResult r) throwsIOException {  ... ...  return bm.chooseTarget4NewBlock(src, r.numTargets,clientNode,                                 excludedNodesSet, r.blockSize,                                 favoredNodesList, r.storagePolicyID,                                  r.blockType,r.ecPolicy, flags);} public DatanodeStorageInfo[] chooseTarget4NewBlock(... ...) throws IOException {  ... ...          final DatanodeStorageInfo[] targets =blockplacement.chooseTarget(src,      numOfReplicas, client, excludedNodes,blocksize,      favoredDatanodeDescriptors,storagePolicy, flags);   ... ...  return targets;} DatanodeStorageInfo[] chooseTarget(String src,    int numOfReplicas, Node writer,    Set<Node> excludedNodes,    long blocksize,    List<DatanodeDescriptor>favoredNodes,    BlockStoragePolicy storagePolicy,    EnumSet<AddBlockFlag> flags) {   return chooseTarget(src, numOfReplicas, writer,      newArrayList<DatanodeStorageInfo>(numOfReplicas), false,      excludedNodes, blocksize, storagePolicy,flags);} public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,    int numOfReplicas,    Node writer,    List<DatanodeStorageInfo> chosen,    boolean returnChosenNodes,    Set<Node> excludedNodes,    long blocksize,    BlockStoragePolicy storagePolicy,EnumSet<AddBlockFlag>flags);

    Crtl + h 查找chooseTarget实现类BlockPlacementPolicyDefault.java

    public DatanodeStorageInfo[] chooseTarget(String srcPath,    int numOfReplicas,    Node writer,    List<DatanodeStorageInfo>chosenNodes,    boolean returnChosenNodes,    Set<Node> excludedNodes,    long blocksize,    final BlockStoragePolicy storagePolicy,    EnumSet<AddBlockFlag> flags) {        return chooseTarget(numOfReplicas, writer, chosenNodes,returnChosenNodes,      excludedNodes, blocksize, storagePolicy,flags, null);} private DatanodeStorageInfo[]chooseTarget(int numOfReplicas,  Node writer,  List<DatanodeStorageInfo> chosenStorage,  boolean returnChosenNodes,  Set<Node> excludedNodes,  long blocksize,  final BlockStoragePolicy storagePolicy,  EnumSet<AddBlockFlag> addBlockFlags,  EnumMap<StorageType, Integer> sTypes) {  … …    int[] result =getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);  numOfReplicas = result[0];  int maxNodesPerRack = result[1];     for (DatanodeStorageInfo storage :chosenStorage) {    // add localMachine and related nodes toexcludedNodes       // 获取不可用的DN    addToExcludedNodes(storage.getDatanodeDescriptor(),excludedNodes);  }   List<DatanodeStorageInfo> results =null;  Node localNode = null;  boolean avoidStaleNodes = (stats != null      &&stats.isAvoidingStaleDataNodesForWrite());  //    boolean avoidLocalNode = (addBlockFlags !=null      &&addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)      && writer != null      &&!excludedNodes.contains(writer));  // Attempt to exclude local node if theclient suggests so. If no enough  // nodes can be obtained, it falls back tothe default block placement  // policy.   // 有数据正在写,避免都写入本地  if (avoidLocalNode) {    results = new ArrayList<>(chosenStorage);    Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);    if (writer != null) {      excludedNodeCopy.add(writer);    }    localNode = chooseTarget(numOfReplicas,writer,        excludedNodeCopy, blocksize,maxNodesPerRack, results,        avoidStaleNodes, storagePolicy,        EnumSet.noneOf(StorageType.class),results.isEmpty(), sTypes);    if (results.size() < numOfReplicas) {      // not enough nodes; discard results andfall back      results = null;    }  }  if (results == null) {    results = newArrayList<>(chosenStorage);       // 真正的选择DN节点    localNode = chooseTarget(numOfReplicas, writer, excludedNodes,        blocksize, maxNodesPerRack, results,avoidStaleNodes,        storagePolicy,EnumSet.noneOf(StorageType.class), results.isEmpty(),        sTypes);  }   if (!returnChosenNodes) {     results.removeAll(chosenStorage);  }     // sorting nodes to form a pipeline  return getPipeline(      (writer != null && writerinstanceof DatanodeDescriptor) ? writer          : localNode,      results.toArray(newDatanodeStorageInfo[results.size()]));} private Node chooseTarget(int numOfReplicas,   ... ...) {         writer = chooseTargetInOrder(numOfReplicas, writer,excludedNodes, blocksize,          maxNodesPerRack, results,avoidStaleNodes, newBlock, storageTypes);   ... ...} protected Node chooseTargetInOrder(int numOfReplicas,                               Node writer,                               final Set<Node> excludedNodes,                               final longblocksize,                               final intmaxNodesPerRack,                               final List<DatanodeStorageInfo> results,                               final booleanavoidStaleNodes,                               final booleannewBlock,                              EnumMap<StorageType, Integer> storageTypes)                               throws NotEnoughReplicasException {  final int numOfResults = results.size();  if (numOfResults == 0) {       // 第一个块存储在当前节点    DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,        excludedNodes, blocksize,maxNodesPerRack, results, avoidStaleNodes,        storageTypes, true);     writer = (storageInfo != null) ?storageInfo.getDatanodeDescriptor()                                   : null;     if (--numOfReplicas == 0) {      return writer;    }  }  final DatanodeDescriptor dn0 =results.get(0).getDatanodeDescriptor();  // 第二个块存储在另外一个机架  if (numOfResults <= 1) {    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,        results, avoidStaleNodes, storageTypes);    if (--numOfReplicas == 0) {      return writer;    }  }  if (numOfResults <= 2) {    final DatanodeDescriptor dn1 =results.get(1).getDatanodeDescriptor();       // 如果第一个和第二个在同一个机架,那么第三个放在其他机架    if (clusterMap.isOnSameRack(dn0, dn1)) {      chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,          results, avoidStaleNodes,storageTypes);    } else if (newBlock){        // 如果是新块,和第二个块存储在同一个机架      chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,          results, avoidStaleNodes,storageTypes);    } else {        // 如果不是新块,放在当前机架      chooseLocalRack(writer,excludedNodes, blocksize, maxNodesPerRack,          results, avoidStaleNodes,storageTypes);    }    if (--numOfReplicas == 0) {      return writer;    }  }  chooseRandom(numOfReplicas, NodeBase.ROOT,excludedNodes, blocksize,      maxNodesPerRack, results,avoidStaleNodes, storageTypes);  return writer;}

    三、建立管道之Socket发送

    NN处理完DN请求后,再次回到DN端,启动对应的线程

    点击nextBlockOutputStream

    protected LocatedBlock nextBlockOutputStream() throws IOException {  LocatedBlock lb;  DatanodeInfo[] nodes;  StorageType[] nextStorageTypes;  String[] nextStorageIDs;  int count =dfsClient.getConf().getNumBlockWriteRetry();  boolean success;  final ExtendedBlock oldBlock =block.getCurrentBlock();  do {    errorState.resetInternalError();    lastException.clear();     DatanodeInfo[] excluded =getExcludedNodes();       // 向NN获取向哪个DN写数据    lb = locateFollowingBlock(        excluded.length > 0 ? excluded :null, oldBlock);     // 创建管道    success = createBlockOutputStream(nodes, nextStorageTypes,nextStorageIDs,          0L, false);    … …  } while (!success && --count >=0);   if (!success) {    throw new IOException("Unable tocreate new block.");  }  return lb;} boolean createBlockOutputStream(DatanodeInfo[]nodes,      StorageType[] nodeStorageTypes, String[]nodeStorageIDs,      long newGS, boolean recoveryFlag) {    ... ...       // 和DN创建socket       s = createSocketForPipeline(nodes[0],nodes.length, dfsClient);             // 获取输出流,用于写数据到DN       OutputStreamunbufOut =NetUtils.getOutputStream(s, writeTimeout);       // 获取输入流,用于读取写数据到DN的结果    InputStreamunbufIn = NetUtils.getInputStream(s,readTimeout);          IOStreamPair saslStreams =dfsClient.saslClient.socketSend(s,        unbufOut, unbufIn, dfsClient,accessToken, nodes[0]);    unbufOut = saslStreams.out;    unbufIn = saslStreams.in;    out = new DataOutputStream(new BufferedOutputStream(unbufOut,       DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));    blockReplyStream = new DataInputStream(unbufIn);             // 发送数据       new Sender(out).writeBlock(blockCopy,nodeStorageTypes[0], accessToken,            dfsClient.clientName, nodes,nodeStorageTypes, null, bcs,            nodes.length, block.getNumBytes(),bytesSent, newGS,            checksum4WriteBlock,cachingStrategy.get(), isLazyPersistFile,            (targetPinnings != null &&targetPinnings[0]), targetPinnings,            nodeStorageIDs[0], nodeStorageIDs);       ... ...} public void writeBlock(... ...) throws IOException{  ... ...   send(out, Op.WRITE_BLOCK, proto.build());}

    四、建立管道之Socket接收

    1)点击create

    Ctrl +n 全局查找DataXceiverServer.java,在该类中查找run方法

    public void run(){  Peer peer = null;  while (datanode.shouldRun &&!datanode.shutdownForUpgrade) {    try {        // 接收socket的请求      peer = peerServer.accept();       // Make sure the xceiver count is notexceeded      int curXceiverCount =datanode.getXceiverCount();      if (curXceiverCount > maxXceiverCount){        throw new IOException("Xceivercount " + curXceiverCount            + " exceeds the limit ofconcurrent xcievers: "            + maxXceiverCount);      }        // 客户端每发送一个block,都启动一个DataXceiver去处理block      new Daemon(datanode.threadGroup,          DataXceiver.create(peer, datanode, this))          .start();    } catch (SocketTimeoutException ignored) {      ... ...    }  }   ... ...}

    点击DataXceiver(线程),查找run方法

    public void run() {  int opsProcessed = 0;  Op op = null;   try {    synchronized(this) {      xceiver = Thread.currentThread();    }    dataXceiverServer.addPeer(peer,Thread.currentThread(), this);    peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);    InputStream input = socketIn;    try {      IOStreamPair saslStreams =datanode.saslServer.receive(peer, socketOut,        socketIn,datanode.getXferAddress().getPort(),       return;    }       super.initialize(newDataInputStream(input));       do {      updateCurrentThreadName("Waiting foroperation #" + (opsProcessed + 1));       try {        if (opsProcessed != 0) {          assert dnConf.socketKeepaliveTimeout> 0;         peer.setReadTimeout(dnConf.socketKeepaliveTimeout);        } else {         peer.setReadTimeout(dnConf.socketTimeout);        }              // 读取这次数据的请求类型        op = readOp();      } catch (InterruptedIOException ignored){        // Time out while we wait for clientrpc        break;      } catch (EOFException |ClosedChannelException e) {        // Since we optimistically expect thenext op, it's quite normal to        // get EOF here.        LOG.debug("Cached {} closing after {}ops.  " +            "This message is usuallybenign.", peer, opsProcessed);        break;      } catch (IOException err) {        incrDatanodeNetworkErrors();        throw err;      }       // restore normal timeout      if (opsProcessed != 0) {       peer.setReadTimeout(dnConf.socketTimeout);      }       opStartTime = monotonicNow();        // 根据操作类型处理我们的数据      processOp(op);      ++opsProcessed;    } while ((peer != null) &&        (!peer.isClosed() &&dnConf.socketKeepaliveTimeout > 0));  } catch (Throwable t) {    ... ...  }} protected finalvoid processOp(Op op) throws IOException {  switch(op) {  ... ...  case WRITE_BLOCK:    opWriteBlock(in);    break;  ... ...  default:    throw new IOException("Unknown op" + op + " in data stream");  }} private void opWriteBlock(DataInputStream in) throwsIOException {  final OpWriteBlockProto proto =OpWriteBlockProto.parseFrom(vintPrefixed(in));  final DatanodeInfo[] targets = PBHelperClient.convert(proto.getTargetsList());  TraceScope traceScope =continueTraceSpan(proto.getHeader(),      proto.getClass().getSimpleName());  try {    writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),       PBHelperClient.convertStorageType(proto.getStorageType()),       PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),        proto.getHeader().getClientName(),        targets,       PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(),targets.length),       PBHelperClient.convert(proto.getSource()),        fromProto(proto.getStage()),        proto.getPipelineSize(),        proto.getMinBytesRcvd(),proto.getMaxBytesRcvd(),        proto.getLatestGenerationStamp(),       fromProto(proto.getRequestedChecksum()),        (proto.hasCachingStrategy() ?           getCachingStrategy(proto.getCachingStrategy()) :          CachingStrategy.newDefaultStrategy()),        (proto.hasAllowLazyPersist() ?proto.getAllowLazyPersist() : false),        (proto.hasPinning() ?proto.getPinning(): false),       (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),        proto.getStorageId(),       proto.getTargetStorageIdsList().toArray(new String[0]));  } finally {   if (traceScope != null) traceScope.close();  }}

    Ctrl +alt +b 查找writeBlock的实现类DataXceiver.java

    public void writeBlock(... ...) throws IOException{  ... ...  try {    final Replica replica;    if (isDatanode ||        stage !=BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {      // open a block receiver        // 创建一个BlockReceiver      setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,          peer.getRemoteAddressString(),          peer.getLocalAddressString(),          stage, latestGenerationStamp,minBytesRcvd, maxBytesRcvd,          clientname, srcDataNode, datanode,requestedChecksum,          cachingStrategy, allowLazyPersist,pinning, storageId));      replica = blockReceiver.getReplica();    } else {      replica = datanode.data.recoverClose(          block, latestGenerationStamp,minBytesRcvd);    }    storageUuid = replica.getStorageUuid();    isOnTransientStorage = replica.isOnTransientStorage();     //    // Connect to downstream machine, ifappropriate    // 继续连接下游的机器    if (targets.length > 0) {      InetSocketAddress mirrorTarget = null;      // Connect to backup machine      mirrorNode =targets[0].getXferAddr(connectToDnViaHostname);      LOG.debug("Connecting to datanode{}", mirrorNode);      mirrorTarget =NetUtils.createSocketAddr(mirrorNode);         // 向新的副本发送socket      mirrorSock = datanode.newSocket();      try {         ... ...        if (targetPinnings != null &&targetPinnings.length > 0) {               // 往下游socket发送数据          new Sender(mirrorOut).writeBlock(originalBlock,targetStorageTypes[0],              blockToken, clientname, targets,targetStorageTypes,              srcDataNode, stage, pipelineSize,minBytesRcvd, maxBytesRcvd,              latestGenerationStamp,requestedChecksum, cachingStrategy,              allowLazyPersist,targetPinnings[0], targetPinnings,              targetStorageId, targetStorageIds);        } else {          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],              blockToken, clientname, targets,targetStorageTypes,              srcDataNode, stage, pipelineSize,minBytesRcvd, maxBytesRcvd,              latestGenerationStamp,requestedChecksum, cachingStrategy,              allowLazyPersist, false,targetPinnings,              targetStorageId,targetStorageIds);        }         mirrorOut.flush();        DataNodeFaultInjector.get().writeBlockAfterFlush();         // read connect ack (only for clients,not for replication req)        if (isClient) {          BlockOpResponseProto connectAck =           BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));          mirrorInStatus =connectAck.getStatus();          firstBadLink =connectAck.getFirstBadLink();          if (mirrorInStatus != SUCCESS) {            LOG.debug("Datanode {} gotresponse for connect" +                "ack  from downstream datanode with firstbadlink as{}",                targets.length, firstBadLink);          }        }       … …   //update metrics datanode.getMetrics().addWriteBlockOp(elapsed());  datanode.getMetrics().incrWritesFromClient(peer.isLocal(),size);} BlockReceiver getBlockReceiver(    final ExtendedBlock block, finalStorageType storageType,    final DataInputStream in,    final String inAddr, final String myAddr,    final BlockConstructionStage stage,    final long newGs, final long minBytesRcvd,final long maxBytesRcvd,    final String clientname, final DatanodeInfosrcDataNode,    final DataNode dn, DataChecksumrequestedChecksum,    CachingStrategy cachingStrategy,    final boolean allowLazyPersist,    final boolean pinning,    final String storageId) throws IOException{  return new BlockReceiver(block, storageType, in,      inAddr, myAddr, stage, newGs,minBytesRcvd, maxBytesRcvd,      clientname, srcDataNode, dn,requestedChecksum,      cachingStrategy, allowLazyPersist,pinning, storageId);} BlockReceiver(final ExtendedBlock block,final StorageType storageType,  final DataInputStream in,  final String inAddr, final String myAddr,  final BlockConstructionStage stage,  final long newGs, final long minBytesRcvd,final long maxBytesRcvd,  final String clientname, final DatanodeInfosrcDataNode,  final DataNode datanode, DataChecksumrequestedChecksum,  CachingStrategy cachingStrategy,  final boolean allowLazyPersist,  final boolean pinning,  final String storageId) throws IOException {  ... ...  if (isDatanode) { //replication or move    replicaHandler =       datanode.data.createTemporary(storageType, storageId, block, false);  } else {    switch (stage) {    case PIPELINE_SETUP_CREATE:        // 创建管道      replicaHandler = datanode.data.createRbw(storageType, storageId,          block, allowLazyPersist);      datanode.notifyNamenodeReceivingBlock(          block,replicaHandler.getReplica().getStorageUuid());      break;    ... ...    default: throw newIOException("Unsupported stage " + stage +          " while receiving block " +block + " from " + inAddr);    }  }  ... ...} public ReplicaHandler createRbw(    StorageType storageType, String storageId,ExtendedBlock b,    boolean allowLazyPersist) throwsIOException {  try (AutoCloseableLock lock =datasetLock.acquire()) {    ... ...     if (ref == null) {      ref = volumes.getNextVolume(storageType,storageId, b.getNumBytes());    }     FsVolumeImpl v = (FsVolumeImpl)ref.getVolume();    // create an rbw file to hold block in thedesignated volume     if (allowLazyPersist &&!v.isTransientStorage()) {     datanode.getMetrics().incrRamDiskBlocksWriteFallback();    }     ReplicaInPipeline newReplicaInfo;    try {        // 创建输出流的临时写文件      new ReplicaInfo = v.createRbw(b);      if(new ReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {        throw new IOException("CreateRBWreturned a replica of state "            +new ReplicaInfo.getReplicaInfo().getState()            + " for block " +b.getBlockId());      }    } catch (IOException e) {      IOUtils.cleanup(null, ref);      throw e;    }     volumeMap.add(b.getBlockPoolId(),newReplicaInfo.getReplicaInfo());    return new ReplicaHandler(newReplicaInfo,ref);  }} public ReplicaHandler createRbw(    StorageType storageType, String storageId,ExtendedBlock b,    boolean allowLazyPersist) throwsIOException {  try (AutoCloseableLock lock =datasetLock.acquire()) {    ... ...     if (ref == null) {        // 有可能有多个临时写文件      ref = volumes.getNextVolume(storageType, storageId,b.getNumBytes());    }     FsVolumeImpl v = (FsVolumeImpl)ref.getVolume();    // create an rbw file to hold block in thedesignated volume     if (allowLazyPersist &&!v.isTransientStorage()) {     datanode.getMetrics().incrRamDiskBlocksWriteFallback();    }     ReplicaInPipeline newReplicaInfo;    try {        // 创建输出流的临时写文件      newReplicaInfo = v.createRbw(b);      if(newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {        throw new IOException("CreateRBWreturned a replica of state "            + newReplicaInfo.getReplicaInfo().getState()            + " for block " +b.getBlockId());      }    } catch (IOException e) {      IOUtils.cleanup(null, ref);      throw e;    }     volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());    return new ReplicaHandler(newReplicaInfo,ref);  }} public ReplicaInPipeline createRbw(ExtendedBlock b)throws IOException {   File f = createRbwFile(b.getBlockPoolId(), b.getLocalBlock());  LocalReplicaInPipeline newReplicaInfo = newReplicaBuilder(ReplicaState.RBW)      .setBlockId(b.getBlockId())     .setGenerationStamp(b.getGenerationStamp())      .setFsVolume(this)      .setDirectoryToUse(f.getParentFile())      .setBytesToReserve(b.getNumBytes())      .buildLocalReplicaInPipeline();  return newReplicaInfo;}

    五、客户端接收DN写数据应答Response

    NN处理完DN请求后,再次回到DN端,启动对应的线程

    Ctrl + n全局查找DataStreamer,搜索run方法

    DataStreamer.java

    @Overridepublic void run() {        long lastPacket = Time.monotonicNow();       TraceScope scope = null;       while (!streamerClosed &&dfsClient.clientRunning) {        // if the Responder encountered an error, shutdown Responder        if (errorState.hasError()) {              closeResponder();         }         DFSPacket one;        try {              // process datanode IO errors ifany              boolean doSleep =processDatanodeOrExternalError();               final int halfSocketTimeout =dfsClient.getConf().getSocketTimeout()/2;              synchronized (dataQueue) {               // wait for a packet to be sent.               long now = Time.monotonicNow();               while ((!shouldStop() && dataQueue.size() == 0 &&                       (stage !=BlockConstructionStage.DATA_STREAMING ||                              now - lastPacket < halfSocketTimeout)) ||doSleep) {                     long timeout =halfSocketTimeout - (now-lastPacket);                     timeout = timeout <= 0 ?1000 : timeout;                     timeout = (stage ==BlockConstructionStage.DATA_STREAMING)?                            timeout : 1000;                     try {                       // 如果dataQueue里面没有数据,代码会阻塞在这儿                       dataQueue.wait(timeout); // 接收到notify消息                     } catch(InterruptedException  e) {                       LOG.warn("Caught exception", e);                     }                     doSleep = false;                     now = Time.monotonicNow();               }               if (shouldStop()) {                     continue;               }               // get packet to be sent.               if (dataQueue.isEmpty()) {                     one =createHeartbeatPacket();               } else {                     try {                       backOffIfNecessary();                     } catch(InterruptedException e) {                       LOG.warn("Caught exception", e);                     }                     //  队列不为空,从队列中取出packet                     one = dataQueue.getFirst(); // regular data packet                     SpanId[] parents =one.getTraceParents();                     if (parents.length > 0){                       scope = dfsClient.getTracer().                              newScope("dataStreamer",parents[0]);                       scope.getSpan().setParents(parents);                     }               }              }               // get new block from namenode.              if (LOG.isDebugEnabled()) {               LOG.debug("stage=" + stage + ", " + this);              }              if (stage ==BlockConstructionStage.PIPELINE_SETUP_CREATE) {               LOG.debug("Allocating new block: {}", this);               // 步骤一:向NameNode 申请block 并建立数据管道               setPipeline(nextBlockOutputStream());               // 步骤二:启动ResponseProcessor用来监听packet发送是否成功               initDataStreaming();              } else if (stage ==BlockConstructionStage.PIPELINE_SETUP_APPEND) {               LOG.debug("Append to block {}", block);               setupPipelineForAppendOrRecovery();               if (streamerClosed) {                     continue;               }               initDataStreaming();              }               long lastByteOffsetInBlock =one.getLastByteOffsetBlock();              if (lastByteOffsetInBlock >stat.getBlockSize()) {               throw new IOException("BlockSize " + stat.getBlockSize() +                       " < lastByteOffsetInBlock, " +this + ", " + one);              }               if (one.isLastPacketInBlock()) {               // wait for all data packets have been successfully acked               synchronized (dataQueue) {                     while (!shouldStop()&& ackQueue.size() != 0) {                       try {                            // wait for acks toarrive from datanodes                            dataQueue.wait(1000);                       } catch (InterruptedException  e) {                            LOG.warn("Caughtexception", e);                       }                     }               }               if (shouldStop()) {                     continue;               }               stage = BlockConstructionStage.PIPELINE_CLOSE;              }               // send the packet              SpanId spanId = SpanId.INVALID;              synchronized (dataQueue) {                // move packet from dataQueue to ackQueue               if (!one.isHeartbeatPacket()) {                     if (scope != null) {                       spanId = scope.getSpanId();                       scope.detach();                        one.setTraceScope(scope);                     }                     scope = null;                     // 步骤三:从dataQueue 把要发送的这个packet 移除出去                     dataQueue.removeFirst();                     // 步骤四:然后往ackQueue 里面添加这个packet                     ackQueue.addLast(one);                     packetSendTime.put(one.getSeqno(),Time.monotonicNow());                     dataQueue.notifyAll();               }              }               LOG.debug("{} sending{}", this, one);               // write out data to remotedatanode              try (TraceScope ignored =dfsClient.getTracer().                     newScope("DataStreamer#writeTo",spanId)) {               //  将数据写出去               one.writeTo(blockStream);               blockStream.flush();              } catch (IOException e) {               errorState.markFirstNodeIfNotMarked();               throw e;              }              lastPacket = Time.monotonicNow();               // update bytesSent              long tmpBytesSent = one.getLastByteOffsetBlock();              if (bytesSent < tmpBytesSent) {               bytesSent = tmpBytesSent;              }               if (shouldStop()) {               continue;              }               // Is this block full?              if (one.isLastPacketInBlock()) {               // wait for the close packet has been acked               synchronized (dataQueue) {                     while (!shouldStop()&& ackQueue.size() != 0) {                       dataQueue.wait(1000);// wait for acks toarrive from datanodes                     }               }               if (shouldStop()) {                     continue;               }                endBlock();              }              if (progress != null) { progress.progress();}               // This is used by unit test totrigger race conditions.              if (artificialSlowdown != 0&& dfsClient.clientRunning) {               Thread.sleep(artificialSlowdown);              }         }catch (Throwable e) {              ... ...         }finally {              if (scope != null) {               scope.close();               scope = null;              }         }       }       closeInternal();} private void initDataStreaming() {  this.setName("DataStreamer for file" + src +      " block " + block);  ... ...  response = new ResponseProcessor(nodes);  response.start();  stage =BlockConstructionStage.DATA_STREAMING;}

    点击response再点击ResponseProcessor,ctrl + f 查找run方法

    public void run(){    ... ...       ackQueue.removeFirst();       packetSendTime.remove(seqno);       dataQueue.notifyAll();       ... ...}

    相关文章:

    Hadoop之HDFS的I/O流操作

    Hadoop(HDFS)之 数据完整性

    Hadoop(HDFS)之CheckPoint时间设置

    Hadoop小试牛刀——HDFS集群压测

    Hadoop运维工具箱之HDFS异构存储

    一、hdfs的启动流程

    整理HDFS整个启动的详细过程

    Namenode保存文件系统元数据镜像,namenode在内存及磁盘(fsimage和editslog)上分别存在一份元数据镜像文件,内存中元数据镜像保证了hdfs文件系统文件访问效率,磁盘上的元数据镜像保证了hdfs文件系统的安全性。

    namenode在磁盘上的两类文件组成:

    fsimage文件:保存文件系统至上次checkpoint为止目录和文件元数据。

    edits文件:保存文件系统从上次checkpoint起对hdfs的所有操作记录日志信息。

    fsimage和editlog文件可以在本地文件系统看到
    首次安装格式化(format)主要作用是在本地文件系统生成fsimage文件。

    1、首此启动hdfs过程:

    启动namenode:

    读取fsimage生成内存中元数据镜像。

    启动datanode:

    向namenode注册;

    向namenode发送blockreport。

    启动成功后,client可以对HDFS进行目录创建、文件上传、下载、查看、重命名等操作,更改namespace的操作将被记录在edits文件中。

    2、之后启动HDFS文件系统过程:

    启动namenode:

    读取fsimage元数据镜像文件,加载到内存中。

    读取editlog日志文件,加载到内存中,使当前内存中元数据信息与上次关闭系统时保持一致。然后在磁盘上生成一份同内存中元数据镜像相同的fsimage文件,同时生成一个新的null的editlog文件用于记录以后的hdfs文件系统的更改。

    启动datanode:

    向namenode注册;

    向namenode发送blockreport。

    启动成功后,client可以对HDFS进行目录创建、文件上传、下载、查看、重命名等操作,更改namespace的操作将被记录在editlog文件中。

    3、SecondaryNameNode

    辅助namenode,不能代替namenode。

    SecondaryNameNode的主要作用是用于合并fsimage和editlog文件。在没有SecondaryNameNode守护进程的情况下,从namenode启动开始至namenode关闭期间所有的HDFS更改操作都将记录到editlog文件,这样会造成巨大的editlog文件,所带来的直接危害就是下次启动namenode过程会非常漫长。

    在启动SecondaryNameNode守护进程后,每当满足一定的触发条件(每3600s、文件数量增加100w等),SecondaryNameNode都会拷贝namenode的fsimage和editlog文件到自己的目录下,首先将fsimage加载到内存中,然后加载editlog文件到内存中合并fsimage和editlog文件为一个新的fsimage文件,然后将新的fsimage文件拷贝回namenode目录下。并且声称新的editlog文件用于记录DFS的更改。

    4、安全模式

    在启动namenode至所有datanode启动完成前的阶段成为安全模式。在安全模式下,client只能读取部分HDFS文件信息,不允许client对HDFS的任何更改操作,比如创建目录、上传文件、删除文件、重命名文件等。

    namenode推出安全模式条件需要满足以下条件:

    datanodes blocks/total blocks >= 99.999% + 30s(缓冲时间) 此时安全模式才会推出

    Secondary namenode工作流程:

    1)secondary通知namenode切换edits文件

    2)secondary通过http请求从namenode获得fsimage和edits文件

    3)secondary将fsimage载入内存,然后开始合并edits

    4)secondary将新的fsimage发回给namenode

    5)namenode用新的fsimage替换旧的fsimage

    二、eclipse 向hdfs 上传文件为空怎么解决

    eclipse 向hdfs 上传文件为空怎么解决
    首先,代码稍微改一下【】---->【】其次,你这种做法是无法上传文件的,只是将form中的所有数据写到文件中。必须要能判断【request.getInputStream();】流中,哪些是文件流,哪些是文本域的流或者其他信息的判断。我这边帮你写了一个,用到了一个组件【】two.jsp:MyJSP'UpLoadProcess.jsp'startingpageitems=upload.parseRequest(request);Iteratorit=items.iterator();System.out.println(it.hasNext());while(it.hasNext()){FileItemitem=(FileItem)it.next();if(!item.isFormField()){FilefullFile=newFile(item.getName());FilesavedFile=newFile(uploadPath,fullFile.getName());item.write(savedFile);}}}%>

    三、文件上传时,进度条的设计原理是什么?

    而计算进度的原理,不就是已经传输的大小与总大小的比值嘛。这样就简单了,就拿最基本的OutputStream来说,它的基本的写出方法为void write(byte[] b) ,实际上写出的过程不就是通过InputStream循环读,然后OutputStream循环写嘛。你只要事先通过File类取得文件的总大小,然后在读入或写出的循环里加一个简单的进度计算的步骤,每读取或写出一次,就将已传输大小增加b.length,求出比值,更新进度条。具体的计算间隔,可以根据循环次数或时间间隔来定。

    关于write的问题,通过《eclipse 向hdfs 上传文件为空怎么解决》、《文件上传时,进度条的设计原理是什么?》等文章的解答希望已经帮助到您了!如您想了解更多关于write的相关信息,请到本站进行查找!

    爱资源吧版权声明:以上文中内容来自网络,如有侵权请联系删除,谢谢。

    write
    计算机丢失X3D来自Audio1-7.dll怎么重新安装? 干货分享|这些网络故障排错的处理方法,你都试过了吗?