Hadoop HDFS创建文件写数据流程、源码详细解析

Hadoop HDFS创建文件写数据流程、源码详细解析,第1张

Hadoop HDFS创建文件/写数据流程、源码详细解析

HDFS创建文件/写数据源码解析
  • HDFS
  • HDFS写流程
  • 创建文件源码
    • 客户端
      • DistributedFileSystem
      • DFSClient
        • DFSOutputStream
          • 客户端/Namenode协议
          • DataStreamer
        • 租约LeaseRenewer
    • NameNode
      • FSNamesystem
      • FSDirWriteFileOp
  • 写数据
    • 客户端
      • FSOutputSummer
      • DFSOutputStream
      • DataStreamer
        • 创建block
        • 建立数据管道
        • initDataStreaming();
        • response.start();
    • NameNode
      • NameNodeRpcServer
        • 调用addblock方法
      • FSNamesystem
      • FSDirWriteFileOp
    • DataNode
      • DataXceiver

HDFS

简单介绍一下HDFS
hdfs的组件有两个:NameNode、DataNode
hdfs文件系统其实就是客户端,nn与dn多个服务(线程)交互的结果
Hadoop网络复杂:
1)服务之间的方法的调用通信,注册,心跳 用的是Hadoop RPC
2)同步元数据的时候 用的是HTTP
3)写数据的时候用的是socket

客户端通过调用DistributedFileSystem#create方法发送写请求(客户端和nn协议,rpc动态代理),nn验证用户权限、是否存在等后会创建一个文件,添加租约等 *** 作;

客户端通过DataStreamer线程向nn申请block,nn根据机架感知向客户端返回存储的dn列表,客户端连接dn列表的第一个dn,建立数据管道,里面含有输入输出 流用于传输数据和返回的ack(socket);

客户端通过调用Sender.writeBlock()方法触发一个写数据块请求, 这个请求会传送到数据流管道中的每一个数据节点, 数据流管道中的最后一个数据节点会回复请求确认, 这个确认消息逆向地通过数据流管道送回客户端。

客户端收到请求确认后, 通过FSDataOutputStream将要写入的数据块切分成若干个数据包(创建packet(64kb),按照chunk的方式去写入数据(一个chunk等于512b数据+4b校验和,写满一个包约需要127个chunk,写满一个block后等待重新建立管道)),写满一个包后加入到数据队列中向数据流管道中发送这些数据包(实际上只发送给dn1,dn1的dataxceiver发送给dn2)。数据包会首先从DFSClient发送到数据流管道中的第一个数据节点 , Datanode1成功接收数据包后, 会将数据包写入磁盘, 然后将数据包发送到数据流管道中的第二个节点(Datanode2) 。 依此类推, 当数据包到达数据流管道中的最后一个节点(Datanode3)时, Datanode3会对收到的数据包进行校验, 如果校验成功, Datanode3会发送数据包确认消息, 这个确认消息会逆向地通过数据流管道送回DFSClient。 当一个数据块中的所有数据包都成功发送完毕, 并且收到确认消息后, DFSClient会发送一个空数据包标识当前数据块发送完毕。 至此, 整个数据块发送流程结束。

HDFS写流程

注:写流程图网上有人总结了,这边就不重复造了

创建文件源码 客户端 DistributedFileSystem

客户端发起写文件请求,调用DistributedFileSystem.create方法

@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
    final EnumSet cflags, final int bufferSize,
    final short replication, final long blockSize,
    final Progressable progress, final ChecksumOpt checksumOpt)
    throws IOException {
  return new FileSystemlinkResolver() {
    @Override
    public FSDataOutputStream doCall(final Path p) throws IOException {
      //创建一个DFSOutputStream
      final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
          cflags, replication, blockSize, progress, bufferSize,
          checksumOpt);
      return dfs.createWrappedOutputStream(dfsos, statistics);
    }
    @Override
    public FSDataOutputStream next(final FileSystem fs, final Path p)
        throws IOException {
      return fs.create(p, permission, cflags, bufferSize,
          replication, blockSize, progress, checksumOpt);
    }
  }.resolve(this, absF);
}

进入dfs.create()

DFSClient

一路调用该类其他的create方法

public DFSOutputStream create(String src, FsPermission permission,
                              EnumSet flag, short replication, long blockSize,
                              Progressable progress, int buffersize, ChecksumOpt checksumOpt)
    throws IOException {
    return create(src, permission, flag, true,
                  replication, blockSize, progress, buffersize, checksumOpt, null);
}

public DFSOutputStream create(String src, FsPermission permission,
                              EnumSet flag, boolean createParent, short replication,
                              long blockSize, Progressable progress, int buffersize,
                              ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)
    throws IOException {
    return create(src, permission, flag, createParent, replication, blockSize,
                  progress, buffersize, checksumOpt, favoredNodes, null);
}

public DFSOutputStream create(String src, FsPermission permission,
                              EnumSet flag, boolean createParent, short replication,
                              long blockSize, Progressable progress, int buffersize,
                              ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
                              String ecPolicyName) throws IOException {
    checkOpen();
    //校验权限
    final FsPermission masked = applyUMask(permission);
    LOG.debug("{}: masked={}", src, masked);
    //这里主要做了三个动作,newStreamForCreate这个方法
    // 1、往文件目录树INodeDirectory添加了INodeFile
    // 2、添加文件契约lease
    // 3、启动DataStreamer,负责与NameNodeRPCServer通信
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
                                                                      src, masked, flag, createParent, replication, blockSize, progress,
                                                                      dfsClientConf.createChecksum(checksumOpt),
                                                                      getFavoredNodesStr(favoredNodes), ecPolicyName);
    //开启契约并自动续约
    beginFileLease(result.getFileId(), result);
    return result;
}

进入DFSOutputStream.newStreamForCreate()

DFSOutputStream
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
      FsPermission masked, EnumSet flag, boolean createParent,
      short replication, long blockSize, Progressable progress,
      DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
      throws IOException {
    try (TraceScope ignored =
             dfsClient.newPathTraceScope("newStreamForCreate", src)) {
      HdfsFileStatus stat = null;

      // Retry the create if we get a RetryStartFileException up to a maximum
      // number of times
      boolean shouldRetry = true;
      int retryCount = CREATE_RETRY_COUNT;
      //不断重试直到文件目录创建成功
      while (shouldRetry) {
        shouldRetry = false;
        try {
          //与NN进行通信,创建文件
          stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
              new EnumSetWritable<>(flag), createParent, replication,
              blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
          break;
        } ...
      final DFSOutputStream out;
      if(stat.getErasureCodingPolicy() != null) {
        out = new DFSStripedOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes);
      } else {
        //初始化了DataStreamer
        out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes, true);
      }
      //启动了DataStreamer线程
      out.start();
      return out;
    }
  }
    
#初始化了DFSOutputStream

  protected DFSOutputStream(DFSClient dfsClient, String src,
      HdfsFileStatus stat, EnumSet flag, Progressable progress,
      DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
    this(dfsClient, src, flag, progress, stat, checksum);
    this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);

    
    //计算数据单元的值
    computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
        bytesPerChecksum);

    if (createStreamer) {
      //创建了DataStreamer
      streamer = new DataStreamer(stat, null, dfsClient, src, progress,
          checksum, cachingStrategy, byteArrayManager, favoredNodes,
          addBlockFlags);
    }
  }
    
#进入out.start();
protected synchronized void start() {
    //实际上启动了DataStreamer
    getStreamer().start();
  }
客户端/Namenode协议

深究create方法,最后通过动态代理,NameNodeRpcServer接受到rpc请求,创建文件

#进入ClientProtocol#create
@AtMostOnce
  HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable flag,
      boolean createParent, short replication, long blockSize,
      CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
      throws IOException;
#ClientNamenodeProtocolTranslatorPB#create
@Override
public HdfsFileStatus create(String src, FsPermission masked,
                             String clientName, EnumSetWritable flag,
                             boolean createParent, short replication, long blockSize,
                             CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
    throws IOException {
    CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
        .setSrc(src)
        .setMasked(PBHelperClient.convert(masked))
        .setClientName(clientName)
        .setCreateFlag(PBHelperClient.convertCreateFlag(flag))
        .setCreateParent(createParent)
        .setReplication(replication)
        .setBlockSize(blockSize);
    if (ecPolicyName != null) {
        builder.setEcPolicyName(ecPolicyName);
    }
    FsPermission unmasked = masked.getUnmasked();
    if (unmasked != null) {
        builder.setUnmasked(PBHelperClient.convert(unmasked));
    }
    builder.addAllCryptoProtocolVersion(
        PBHelperClient.convert(supportedVersions));
    CreateRequestProto req = builder.build();
    try {
        CreateResponseProto res = rpcProxy.create(null, req);
        return res.hasFs() ? PBHelperClient.convert(res.getFs()) : null;
    } catch (ServiceException e) {
        throw ProtobufHelper.getRemoteException(e);
    }

}
#动态代理ClientNamenodeProtocolProtos.java
public org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateResponseProto create(
    com.google.protobuf.RpcController controller,
    org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto request)
    throws com.google.protobuf.ServiceException;

#ClientNamenodeProtocolServerSideTranslatorPB.java
  @Override
  public CreateResponseProto create(RpcController controller,
      CreateRequestProto req) throws ServiceException {
    try {
      FsPermission masked = req.hasUnmasked() ?
          FsCreateModes.create(PBHelperClient.convert(req.getMasked()),
              PBHelperClient.convert(req.getUnmasked())) :
          PBHelperClient.convert(req.getMasked());
      HdfsFileStatus result = server.create(req.getSrc(),
          masked, req.getClientName(),
          PBHelperClient.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
          (short) req.getReplication(), req.getBlockSize(),
          PBHelperClient.convertCryptoProtocolVersions(
              req.getCryptoProtocolVersionList()),
          req.getEcPolicyName());

      if (result != null) {
        return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result))
            .build();
      }
      return VOID_CREATE_RESPONSE;
    } catch (IOException e) {
      throw new ServiceException(e);
    }
  }
DataStreamer

DataStreamer是一个线程,启动后实现run方法中逻辑代码,这个在写流程中介绍

返回DFSClient#create方法,进入到beginFileLease方法,开启文件租约

租约LeaseRenewer
private void beginFileLease(final long inodeId, final DFSOutputStream out)
    throws IOException {
    synchronized (filesBeingWritten) {
        putFileBeingWritten(inodeId, out);
        //租约线程在第一个out流创建之前不会启动
        getLeaseRenewer().put(this);
    }
}

//这里使用了synchronized关键字,保证同一个时间只有一个客户端能写数据,其他的阻塞
public synchronized void put(final DFSClient dfsc) {
    if (dfsc.isClientRunning()) {
        if (!isRunning() || isRenewerExpired()) {
            //start a new deamon with a new id.
            final int id = ++currentId;
            daemon = new Daemon(new Runnable() {
                @Override
                public void run() {
                    LeaseRenewer.this.run(id);
                }
            });
            daemon.start();
        }
    }
}


private void run(final int id) throws InterruptedException {
    //代码就是每隔1秒就会检查
    for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
        Thread.sleep(getSleepPeriod())) {
        //当前时间 -  上一次续约的时间
        final long elapsed = Time.monotonicNow() - lastRenewed;
        //如果已经超过30秒没有进行续约
        if (elapsed >= getRenewalTime()) {
            //就进行续约
            renew();
            lastRenewed = Time.monotonicNow();
        }
    }
}

//续约
private void renew() throws IOException {
    final List copies;
    for (final DFSClient c : copies) {
        //skip if current client name is the same as the previous name.
        //重点代码
        if (!c.getClientName().equals(previousName)) {
            if (!c.renewLease()) {
                LOG.debug("Did not renew lease for client {}", c);
                continue;
            }
            previousName = c.getClientName();
            LOG.debug("Lease renewed for client {}", previousName);
        }
    }
}


public boolean renewLease() throws IOException {
    if (clientRunning && !isFilesBeingWrittenEmpty()) {
        try {
            //获取namenode的代理进行续约
            namenode.renewLease(clientName);
            //修改上一次的续约时间
            updateLastLeaseRenewal();
            return true;
        } 
    }
    return false;
}
NameNode

NameNodeRpcServer创建文件

@Override // ClientProtocol
public HdfsFileStatus create(String src, FsPermission masked,
                             String clientName, EnumSetWritable flag,
                             boolean createParent, short replication, long blockSize,
                             CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
    throws IOException {
    checkNNStartup();
    String clientMachine = getClientMachine();
    //验证 *** 作是否被允许
    namesystem.checkOperation(OperationCategory.WRITE);
    HdfsFileStatus status = null;
    try {
        //权限
        PermissionStatus perm = new PermissionStatus(getRemoteUser()
                                                     .getShortUserName(), null, masked);
        //创建文件
        status = namesystem.startFile(src, perm, clientName, clientMachine,
                                      flag.get(), createParent, replication, blockSize, supportedVersions,
                                      ecPolicyName, cacheEntry != null);
    } finally {
        RetryCache.setState(cacheEntry, status != null, status);
    }

    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return status;
}
FSNamesystem

在名称空间中创建文件实体

HdfsFileStatus startFile(String src, PermissionStatus permissions,
                         String holder, String clientMachine, EnumSet flag,
                         boolean createParent, short replication, long blockSize,
                         CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
                         boolean logRetryCache) throws IOException {

    HdfsFileStatus status;

    //调用同类的startFileInt方法
    status = startFileInt(src, permissions, holder, clientMachine, flag,
                          createParent, replication, blockSize, supportedVersions, ecPolicyName,
                          logRetryCache);
    logAuditEvent(true, "create", src, status);
    return status;
}

private HdfsFileStatus startFileInt(String src,
      PermissionStatus permissions, String holder, String clientMachine,
      EnumSet flag, boolean createParent, short replication,
      long blockSize, CryptoProtocolVersion[] supportedVersions,
      String ecPolicyName, boolean logRetryCache) throws IOException {
    //验证路径是否有效
    if (!DFSUtil.isValidName(src) ||
        FSDirectory.isExactReservedName(src) ||
        (FSDirectory.isReservedName(src)
            && !FSDirectory.isReservedRawName(src)
            && !FSDirectory.isReservedInodesName(src))) {
      throw new InvalidPathException(src);
    }

    INodesInPath iip = null;
    writeLock();
    try {
      checkOperation(OperationCategory.WRITE);
      //判断安全模式
      checkNameNodeSafeMode("Cannot create file" + src);

      //验证是否目录/是否为文件/是否允许覆盖等/检查当前用户是否允许访问该路径
      iip = FSDirWriteFileOp.resolvePathForStartFile(
          dir, pc, src, flag, createParent);
      //副本策略
      if (shouldReplicate) {
        blockManager.verifyReplication(src, replication, clientMachine);
      } else {
        final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp
            .getErasureCodingPolicy(this, ecPolicyName, iip);
        if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
          checkErasureCodingSupported("createWithEC");
          if (blockSize < ecPolicy.getCellSize()) {
            throw new IOException("Specified block size (" + blockSize
                + ") is less than the cell size (" + ecPolicy.getCellSize()
                +") of the erasure coding policy (" + ecPolicy + ").");
          }
        } else {
          blockManager.verifyReplication(src, replication, clientMachine);
        }
      }
      skipSync = false; // following might generate edits
      toRemoveBlocks = new BlocksMapUpdateInfo();
      dir.writeLock();
      try {
        stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
            clientMachine, flag, createParent, replication, blockSize, feInfo,
            toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
      } 

    return stat;
  }
FSDirWriteFileOp

进入FSDirWriteFileOp#startFile

  static HdfsFileStatus startFile(
      FSNamesystem fsn, INodesInPath iip,
      PermissionStatus permissions, String holder, String clientMachine,
      EnumSet flag, boolean createParent,
      short replication, long blockSize,
      FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
      boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry)
      throws IOException {
    //添加inode节点
      iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
          replication, blockSize, holder, clientMachine, shouldReplicate,
          ecPolicyName);
      newNode = iip != null ? iip.getLastINode().asFile() : null;
    //添加租约,自动续约
    fsn.leaseManager.addLease(
        newNode.getFileUnderConstructionFeature().getClientName(),
        newNode.getId());
    //写入editlog
    fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
    return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);
  }

至此,hdfs文件创建完成,返回到DFSOutputStream#newStreamForCreate()

写数据 客户端 FSOutputSummer

DFSOutputStream并没有write方法,所以找父类FSOutputSummer

@Override
public synchronized void write(int b) throws IOException {
    buf[count++] = (byte)b;
    if(count == buf.length) {
        //写文件
        flushBuffer();
    }
}


protected synchronized void flushBuffer() throws IOException {
    flushBuffer(false, true);
}


protected synchronized int flushBuffer(boolean keep,
                                       boolean flushPartial) throws IOException {
    int bufLen = count;
    int partialLen = bufLen % sum.getBytesPerChecksum();
    int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
    if (lenToFlush != 0) {
        //核心的代码
        //HDFS File -> Block(128M) -> packet(64K) = 127chunk -> chunk 512 + chunksum 4  =  516
        //目录树->目录->文件->Block文件块(默认128M) ->packet(64K) 大约是127个chunk ->chunk 512字节+4字节  =chunksize 516字节
        writeChecksumChunks(buf, 0, lenToFlush);
        if (!flushPartial || keep) {
            count = partialLen;
            System.arraycopy(buf, bufLen - count, buf, 0, count);
        } else {
            count = 0;
        }
    }

    // total bytes left minus unflushed bytes left
    return count - (bufLen - lenToFlush);
}

private void writeChecksumChunks(byte b[], int off, int len)
  throws IOException {
    //计算出来chunk的校验和
    sum.calculateChunkedSums(b, off, len, checksum, 0);
    TraceScope scope = createWriteTraceScope();
    try {
      //按照chunk的大小遍历数据
      for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
        int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
        int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
        writeChunk(b, off + i, chunkLen, checksum, ckOffset,
            getChecksumSize());
      }
    } finally {
      if (scope != null) {
        scope.close();
      }
    }
  }
DFSOutputStream

FSOutputSummer#writeChunk实际上进入DFSOutputStream#writeChunk

// @see FSOutputSummer#writeChunk()
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
                                       byte[] checksum, int ckoff, int cklen) throws IOException {
    //创建packet
    writeChunkPrepare(len, ckoff, cklen);

    //往packet里面写 chunk的校验和  4 byte
    currentPacket.writeChecksum(checksum, ckoff, cklen);
    //往packet里面写一个chunk  512 byte
    currentPacket.writeData(b, offset, len);
    //累计一共有多少个chunk  -> packet  如果写满了127chunk 那就是一个完整的packet
    currentPacket.incNumChunks();
    //Block -> packet  Block -> 128那就是写满了一个文件块
    getStreamer().incBytesCurBlock(len);

    // If packet is full, enqueue it for transmission
    //TODO 两个条件:
    //1:如果写满了一个packet(127 chunk) = packet
    //2:一个文件块写满了 Block (128M)  2048 packet
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        getStreamer().getBytesCurBlock() == blockSize) {
        enqueueCurrentPacketFull();
    }
}

synchronized void enqueueCurrentPacketFull() throws IOException {
    LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
              + " appendChunk={}, {}", currentPacket, src, getStreamer()
              .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
              getStreamer());
    //写满一个packet后将packet入队,唤醒队列
    enqueueCurrentPacket();
    adjustChunkBoundary();
    //发送一个空包作为标识
    endBlock();
}
DataStreamer

DataStreamer是一个线程,启动时候进行run方法代码逻辑

  @Override
  public void run() {
    long lastPacket = Time.monotonicNow();
    TraceScope scope = null;
    while (!streamerClosed && dfsClient.clientRunning) {
      // if the Responder encountered an error, shutdown Responder
      //一旦发生错误,连接关闭
      if (errorState.hasError()) {
        closeResponder();
      }

      DFSPacket one;
      try {
        // process datanode IO errors if any
        boolean doSleep = processDatanodeOrExternalError();

        final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
        synchronized (dataQueue) {
          // wait for a packet to be sent.
          long now = Time.monotonicNow();
          // 刚开始创建文件的时候,dataQueue.size() == 0
          while ((!shouldStop() && dataQueue.size() == 0 &&
              (stage != BlockConstructionStage.DATA_STREAMING ||
                  now - lastPacket < halfSocketTimeout)) || doSleep) {
            long timeout = halfSocketTimeout - (now-lastPacket);
            timeout = timeout <= 0 ? 1000 : timeout;
            timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
                timeout : 1000;
            try {
              //如果dataQueue队列里面没有数据,代码就会阻塞在这里,等待被唤醒。
              dataQueue.wait(timeout);
            } catch (InterruptedException  e) {
              LOG.debug("Thread interrupted", e);
            }
            doSleep = false;
            now = Time.monotonicNow();
          }
          if (shouldStop()) {
            continue;
          }
          // get packet to be sent.
          if (dataQueue.isEmpty()) {
            one = createHeartbeatPacket();
          } else {
            try {
              backOffIfNecessary();
            } catch (InterruptedException e) {
              LOG.debug("Thread interrupted", e);
            }
            //如果队列不为空,从往队列里面取出packet
            one = dataQueue.getFirst(); // regular data packet
            SpanId[] parents = one.getTraceParents();
            if (parents.length > 0) {
              scope = dfsClient.getTracer().
                  newScope("dataStreamer", parents[0]);
              scope.getSpan().setParents(parents);
            }
          }
        }

        // get new block from namenode.
        LOG.debug("stage={}, {}", stage, this);

        // 建立数据管道pipeline,向NameNode申请block
        if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
          LOG.debug("Allocating new block: {}", this);
          //建立数据管道,保存管道中block的存储信息,包括位置,类型和ID
          //nextBlockOutputStream()这个方法很重要,向namenode申请block用于写入数据,选择存放block的DataNode策略也是在这个方法里面
          //nextBlockOutputStream 这个方法里面完成了两个事:
          //向Namenode申请block
          //建立数据管道
          setPipeline(nextBlockOutputStream());
          //初始化DataStreaming服务,启动了ResponseProcessor,用来监听packet发送的状态
          initDataStreaming();
        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
          LOG.debug("Append to block {}", block);
          setupPipelineForAppendOrRecovery();
          if (streamerClosed) {
            continue;
          }
          initDataStreaming();
        }

        long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
        if (lastByteOffsetInBlock > stat.getBlockSize()) {
          throw new IOException("BlockSize " + stat.getBlockSize() +
              " < lastByteOffsetInBlock, " + this + ", " + one);
        }

        //如果是block的最后一个包,等待返回的ack
        if (one.isLastPacketInBlock()) {
          // wait for all data packets have been successfully acked
          synchronized (dataQueue) {
            while (!shouldStop() && ackQueue.size() != 0) {
              try {
                // wait for acks to arrive from datanodes
                dataQueue.wait(1000);
              } catch (InterruptedException  e) {
                LOG.debug("Thread interrupted", e);
              }
            }
          }
          if (shouldStop()) {
            continue;
          }
          stage = BlockConstructionStage.PIPELINE_CLOSE;
        }

        // send the packet
        SpanId spanId = SpanId.INVALID;
        synchronized (dataQueue) {
          // move packet from dataQueue to ackQueue
          if (!one.isHeartbeatPacket()) {
            if (scope != null) {
              spanId = scope.getSpanId();
              scope.detach();
              one.setTraceScope(scope);
            }
            scope = null;
            dataQueue.removeFirst();
            ackQueue.addLast(one);
            packetSendTime.put(one.getSeqno(), Time.monotonicNow());
            dataQueue.notifyAll();
          }
        }

        LOG.debug("{} sending {}", this, one);

        // write out data to remote datanode
        try (TraceScope ignored = dfsClient.getTracer().
            newScope("DataStreamer#writeTo", spanId)) {
          //这个就是我们写数据代码
          one.writeTo(blockStream);
          blockStream.flush();
        } catch (IOException e) {
          // HDFS-3398 treat primary DN is down since client is unable to
          // write to primary DN. If a failed or restarting node has already
          // been recorded by the responder, the following call will have no
          // effect. Pipeline recovery can handle only one node error at a
          // time. If the primary node fails again during the recovery, it
          // will be taken out then.
          //PrimaryDatanode 指的是数据管道第一个datanode
          errorState.markFirstNodeIfNotMarked();
          throw e;
        }
        lastPacket = Time.monotonicNow();

        // update bytesSent
        long tmpBytesSent = one.getLastByteOffsetBlock();
        if (bytesSent < tmpBytesSent) {
          bytesSent = tmpBytesSent;
        }

        if (shouldStop()) {
          continue;
        }

        // Is this block full?
        if (one.isLastPacketInBlock()) {
          // wait for the close packet has been acked
          synchronized (dataQueue) {
            while (!shouldStop() && ackQueue.size() != 0) {
              dataQueue.wait(1000);// wait for acks to arrive from datanodes
            }
          }
          if (shouldStop()) {
            continue;
          }

          endBlock();
        }
        if (progress != null) { progress.progress(); }

        // This is used by unit test to trigger race conditions.
        if (artificialSlowdown != 0 && dfsClient.clientRunning) {
          Thread.sleep(artificialSlowdown);
        }
      } catch (Throwable e) {
        // Log warning if there was a real error.
        if (!errorState.isRestartingNode()) {
          // Since their messages are descriptive enough, do not always
          // log a verbose stack-trace WARN for quota exceptions.
          if (e instanceof QuotaExceededException) {
            LOG.debug("DataStreamer Quota Exception", e);
          } else {
            LOG.warn("DataStreamer Exception", e);
          }
        }
        lastException.set(e);
        assert !(e instanceof NullPointerException);
        errorState.setInternalError();
        if (!errorState.isNodeMarked()) {
          // Not a datanode issue
          streamerClosed = true;
        }
      } finally {
        if (scope != null) {
          scope.close();
          scope = null;
        }
      }
    }
    closeInternal();
  }
创建block
//构建管道
setPipeline(nextBlockOutputStream());


//向NN申请块、返回目标DN的列表
protected LocatedBlock nextBlockOutputStream() throws IOException {
    LocatedBlock lb;
    DatanodeInfo[] nodes;
    StorageType[] nextStorageTypes;
    String[] nextStorageIDs;
    int count = dfsClient.getConf().getNumBlockWriteRetry();
    boolean success;
    final ExtendedBlock oldBlock = block.getCurrentBlock();
    
    do {
        errorState.resetInternalError();
        lastException.clear();

        DatanodeInfo[] excluded = getExcludedNodes();
        // TODO 向NameNode申请一个block
        
        lb = locateFollowingBlock(
            excluded.length > 0 ? excluded : null, oldBlock);
        block.setCurrentBlock(lb.getBlock());
        block.setNumBytes(0);
        bytesSent = 0;
        accessToken = lb.getBlockToken();
        nodes = lb.getLocations();
        nextStorageTypes = lb.getStorageTypes();
        nextStorageIDs = lb.getStorageIDs();

        // Connect to first DataNode in the list.
        // TODO 其实HDFS管道的建立就是靠的这段代码完成的。
        success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
                                          0L, false);

        if (!success) {
            LOG.warn("Abandoning " + block);
            //TODO 如果管道建立不成功,那么就是放弃这个block
            dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
                                            stat.getFileId(), src, dfsClient.clientName);
            block.setCurrentBlock(null);
            final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
            LOG.warn("Excluding datanode " + badNode);
            excludedNodes.put(badNode, badNode);
        }
    } while (!success && --count >= 0);

    if (!success) {
        throw new IOException("Unable to create new block.");
    }
    return lb;
}

//创建block,实际上走的rpc代理
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
                                          ExtendedBlock oldBlock) throws IOException {
    return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
                                    stat.getFileId(), favoredNodes, addBlockFlags);
}

//添加block
static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
                             DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
                             String[] favoredNodes, EnumSet allocFlags)
    throws IOException {
    final DfsClientConf conf = dfsClient.getConf();
    int retries = conf.getNumBlockWriteLocateFollowingRetry();
    long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
    long localstart = Time.monotonicNow();
    while (true) {
        //TODO 通过RPC 调用NameNode服务端的代码
        return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
                                           excludedNodes, fileId, favoredNodes, allocFlags);
        ...
        }
}
建立数据管道
boolean createBlockOutputStream(DatanodeInfo[] nodes,
      StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
      long newGS, boolean recoveryFlag) {
    if (nodes.length == 0) {
      LOG.info("nodes are empty for write pipeline of " + block);
      return false;
    }
    String firstBadlink = "";
    boolean checkRestart = false;
    if (LOG.isDebugEnabled()) {
      LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);
    }

    // persist blocks on namenode on next flush
    persistBlocks.set(true);

    int refetchEncryptionKey = 1;
    while (true) {
      boolean result = false;
      DataOutputStream out = null;
      try {
        assert null == s : "Previous socket unclosed";
        assert null == blockReplyStream : "Previous blockReplyStream unclosed";
        //socket  rpc  http 与第一个dn连接
        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
        long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
        long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);

        //输出流
        OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
        //输入流
        InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
        //socket
        IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
            unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
        unbufOut = saslStreams.out;
        unbufIn = saslStreams.in;
        //这个输出流是把客户端的数据写到DataNode上面
        out = new DataOutputStream(new BufferedOutputStream(unbufOut,
            DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
        //客户端通过这个输入流来读DataNode返回来的信息
        blockReplyStream = new DataInputStream(unbufIn);

        //
        // Xmit header info to datanode
        //

        BlockConstructionStage bcs = recoveryFlag ?
            stage.getRecoveryStage() : stage;

        // We cannot change the block length in 'block' as it counts the number
        // of bytes ack'ed.
        ExtendedBlock blockCopy = block.getCurrentBlock();
        blockCopy.setNumBytes(stat.getBlockSize());

        boolean[] targetPinnings = getPinnings(nodes);
        // send the request
        //发送写block请求
        //datanode那儿会启动一个DataXceiver服务接受socket请求
        new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
            dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
            nodes.length, block.getNumBytes(), bytesSent, newGS,
            checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
            (targetPinnings != null && targetPinnings[0]), targetPinnings,
            nodeStorageIDs[0], nodeStorageIDs);

        // receive ack for connect
        //接受返回的ack
        BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
            PBHelperClient.vintPrefixed(blockReplyStream));
        Status pipelineStatus = resp.getStatus();
        firstBadlink = resp.getFirstBadlink();

        // Got an restart OOB ack.
        // If a node is already restarting, this status is not likely from
        // the same node. If it is from a different node, it is not
        // from the local datanode. Thus it is safe to treat this as a
        // regular node error.
        if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
            !errorState.isRestartingNode()) {
          checkRestart = true;
          throw new IOException("A datanode is restarting.");
        }

        String logInfo = "ack with firstBadlink as " + firstBadlink;
        DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);

        assert null == blockStream : "Previous blockStream unclosed";
        blockStream = out;
        result =  true; // success
        errorState.resetInternalError();
        lastException.clear();
        // remove all restarting nodes from failed nodes list
        failed.removeAll(restartingNodes);
        restartingNodes.clear();
      } catch (IOException ie) {
        if (!errorState.isRestartingNode()) {
          LOG.info("Exception in createBlockOutputStream " + this, ie);
        }
        if (ie instanceof InvalidEncryptionKeyException &&
            refetchEncryptionKey > 0) {
          LOG.info("Will fetch a new encryption key and retry, "
              + "encryption key was invalid when connecting to "
              + nodes[0] + " : " + ie);
          // The encryption key used is invalid.
          refetchEncryptionKey--;
          dfsClient.clearDataEncryptionKey();
          // Don't close the socket/exclude this node just yet. Try again with
          // a new encryption key.
          continue;
        }

        // find the datanode that matches
        if (firstBadlink.length() != 0) {
          for (int i = 0; i < nodes.length; i++) {
            // NB: Unconditionally using the xfer addr w/o hostname
            if (firstBadlink.equals(nodes[i].getXferAddr())) {
              errorState.setBadNodeIndex(i);
              break;
            }
          }
        } else {
          assert !checkRestart;
          errorState.setBadNodeIndex(0);
        }

        final int i = errorState.getBadNodeIndex();
        // Check whether there is a restart worth waiting for.
        if (checkRestart) {
          errorState.initRestartingNode(i,
              "Datanode " + i + " is restarting: " + nodes[i],
              shouldWaitForRestart(i));
        }
        errorState.setInternalError();
        lastException.set(ie);
        result =  false;  // error
      } finally {
        if (!result) {
          IOUtils.closeSocket(s);
          s = null;
          IOUtils.closeStream(out);
          IOUtils.closeStream(blockReplyStream);
          blockReplyStream = null;
        }
      }
      return result;
    }
  }
initDataStreaming();
private void initDataStreaming() {
    this.setName("DataStreamer for file " + src +
                 " block " + block);
    if (LOG.isDebugEnabled()) {
        LOG.debug("nodes {} storageTypes {} storageIDs {}",
                  Arrays.toString(nodes),
                  Arrays.toString(storageTypes),
                  Arrays.toString(storageIDs));
    }
    response = new ResponseProcessor(nodes);
    response.start();
    stage = BlockConstructionStage.DATA_STREAMING;
}
response.start();

启动了ResponseProcessor 用来监听我们一个packet发送是否成功

@Override
public void run() {

    setName("ResponseProcessor for block " + block);
    //ack响应队列,返回DN写数据的结果
    PipelineAck ack = new PipelineAck();

    TraceScope scope = null;
    while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
        // process responses from datanodes.
        try {
            // read an ack from the pipeline
            //读取下游的处理结果
            ack.readFields(blockReplyStream);
            if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
                Long begin = packetSendTime.get(ack.getSeqno());
                if (begin != null) {
                    long duration = Time.monotonicNow() - begin;
                    if (duration > dfsclientSlowLogThresholdMs) {
                        LOG.info("Slow ReadProcessor read fields for block " + block
                                 + " took " + duration + "ms (threshold="
                                 + dfsclientSlowLogThresholdMs + "ms); ack: " + ack
                                 + ", targets: " + Arrays.asList(targets));
                    }
                }
            }

            LOG.debug("DFSClient {}", ack);

            long seqno = ack.getSeqno();
            // processes response status from datanodes.
            //接受所有的ack
            ArrayList congestedNodesFromAck = new ArrayList<>();
            for (int i = ack.getNumOfReplies()-1; i >=0  && dfsClient.clientRunning; i--) {
                final Status reply = PipelineAck.getStatusFromHeader(ack
                                                                     .getHeaderFlag(i));
                if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
                    PipelineAck.ECN.CONGESTED) {
                    congestedNodesFromAck.add(targets[i]);
                }
                // Restart will not be treated differently unless it is
                // the local node or the only one in the pipeline.
                if (PipelineAck.isRestartOOBStatus(reply)) {
                    final String message = "Datanode " + i + " is restarting: "
                        + targets[i];
                    errorState.initRestartingNode(i, message,
                                                  shouldWaitForRestart(i));
                    throw new IOException(message);
                }
                // node error
                if (reply != SUCCESS) {
                    errorState.setBadNodeIndex(i); // mark bad datanode
                    throw new IOException("Bad response " + reply +
                                          " for " + block + " from datanode " + targets[i]);
                }
            }

            if (!congestedNodesFromAck.isEmpty()) {
                synchronized (congestedNodes) {
                    congestedNodes.clear();
                    congestedNodes.addAll(congestedNodesFromAck);
                }
            } else {
                synchronized (congestedNodes) {
                    congestedNodes.clear();
                    lastCongestionBackoffTime = 0;
                }
            }

            assert seqno != PipelineAck.UNKOWN_SEQNO :
            "Ack for unknown seqno should be a failed ack: " + ack;
            if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
                continue;
            }

            // a success ack for a data packet
            DFSPacket one;
            synchronized (dataQueue) {
                one = ackQueue.getFirst();
            }
            if (one.getSeqno() != seqno) {
                throw new IOException("ResponseProcessor: Expecting seqno " +
                                      " for block " + block +
                                      one.getSeqno() + " but received " + seqno);
            }
            isLastPacketInBlock = one.isLastPacketInBlock();

            // Fail the packet write for testing in order to force a
            // pipeline recovery.
            if (DFSClientFaultInjector.get().failPacket() &&
                isLastPacketInBlock) {
                failPacket = true;
                throw new IOException(
                    "Failing the last packet for testing.");
            }

            // update bytesAcked
            block.setNumBytes(one.getLastByteOffsetBlock());

            synchronized (dataQueue) {
                scope = one.getTraceScope();
                if (scope != null) {
                    scope.reattach();
                    one.setTraceScope(null);
                }
                lastAckedSeqno = seqno;
                pipelineRecoveryCount = 0;
                //如果ack发送成功那么就会把ackQueue里面packet移除来
                ackQueue.removeFirst();
                packetSendTime.remove(seqno);
                dataQueue.notifyAll();

                one.releaseBuffer(byteArrayManager);
            }
        } catch (Throwable e) {
            if (!responderClosed) {
                lastException.set(e);
                errorState.setInternalError();
                errorState.markFirstNodeIfNotMarked();
                synchronized (dataQueue) {
                    dataQueue.notifyAll();
                }
                if (!errorState.isRestartingNode()) {
                    LOG.warn("Exception for " + block, e);
                }
                responderClosed = true;
            }
        } finally {
            if (scope != null) {
                scope.close();
            }
            scope = null;
        }
    }
}
NameNode NameNodeRpcServer 调用addblock方法
@Override
public LocatedBlock addBlock(String src, String clientName,
                             ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
                             String[] favoredNodes, EnumSet addBlockFlags)
    throws IOException {
    checkNNStartup();
    //TODO 添加一个block
    
    LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
                                                              clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
    if (locatedBlock != null) {
        metrics.incrAddBlockOps();
    }
    return locatedBlock;
}
FSNamesystem

FSNamesystem#getAdditionalBlock添加block

LocatedBlock getAdditionalBlock(
    String src, long fileId, String clientName, ExtendedBlock previous,
    DatanodeInfo[] excludedNodes, String[] favoredNodes,
    EnumSet flags) throws IOException {
    final String operationName = "getAdditionalBlock";
    NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {}  inodeId {}" +
                                  " for {}", src, fileId, clientName);
    //验证
    r = FSDirWriteFileOp.validateAddBlock();
    //机架感知,为block选择合适的dn
    DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
        blockManager, src, excludedNodes, favoredNodes, flags, r);

    //创建block
    lb = FSDirWriteFileOp.storeAllocatedBlock();
    return lb;
}
FSDirWriteFileOp

NN根据机架感知为block选择合适的DN,创建block,修改内存和磁盘的元数据信息。

static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
      long fileId, String clientName, ExtendedBlock previous,
      DatanodeStorageInfo[] targets) throws IOException {
    long offset;
    //创建block块,
    Block newBlock = fsn.createNewBlock(blockType);
    INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
    //修改了内存里面的目录树(修改内存里面的元数据信息)
    saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, blockType);
	//把元数据写入到磁盘
    persistNewBlock(fsn, src, pendingFile);
    // Return located block
    return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset);
  }
  
DataNode

由于这部分源码在DN章节介绍过,所以在此简单说明以下即可
DataXceiverServer对象用于在Datanode上监听流式接口的请求, 每当有Client通过Sender类发起流式接口请求时, DataXceiverServer就会监听并接收这个请求, 然后创建一个DataXceiver对象用于响应这个请求并执行对应的 *** 作。Receiver.processOp()方法用于处理流式接口的请求, 它首先从数据流中读取序列化后的参数, 对参数反序列化, 然后根据 *** 作码调用DataTransferProtocol中定义的方法, 这些方法都是在DataXceiver中具体实现的。

DataXceiver

processOp实际是调用了writeBlock()

@Override
public void writeBlock() throws IOException {
    if (isDatanode || 
        stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
        // open a block receiver
        //创建一个BlockReceiver
        setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
                                                 peer.getRemoteAddressString(),
                                                 peer.getLocalAddressString(),
                                                 stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
                                                 clientname, srcDataNode, datanode, requestedChecksum,
                                                 cachingStrategy, allowLazyPersist, pinning, storageId));
        replica = blockReceiver.getReplica();
    }
    //
    // Connect to downstream machine, if appropriate
    //
    //连接下游dn
    if (targets.length > 0) {
        InetSocketAddress mirrorTarget = null;
        // Connect to backup machine
        mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
        LOG.debug("Connecting to datanode {}", mirrorNode);
        mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
        //镜像副本
        mirrorSock = datanode.newSocket();

        if (targetPinnings != null && targetPinnings.length > 0) {
            //向下游发送socket连接
            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
                                             blockToken, clientname, targets, targetStorageTypes,
                                             srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
                                             latestGenerationStamp, requestedChecksum, cachingStrategy,
                                             allowLazyPersist, targetPinnings[0], targetPinnings,
                                             targetStorageId, targetStorageIds);
        } else {
            new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
                                             blockToken, clientname, targets, targetStorageTypes,
                                             srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
                                             latestGenerationStamp, requestedChecksum, cachingStrategy,
                                             allowLazyPersist, false, targetPinnings,
                                             targetStorageId, targetStorageIds);
        }
        mirrorOut.flush();
        // receive the block and mirror to the next target
        if (blockReceiver != null) {
            String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
            //接受block
            blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
                                       mirrorAddr, null, targets, false);
        }
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5635128.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-15
下一篇 2022-12-16

发表评论

登录后才能评论

评论列表(0条)

保存