Hadoop源码分析(十三)

Hadoop源码分析(十三),第1张

Hadoop源码分析(十三)

2021SC@SDUSC

Hadoop RPC

实现ClientProtocol类,该类定义了接口协议,在该接口中定义了两个方法:echo和add。而在Hadoop中所有的协议接口都需要实现VersionedProtocol接口。

package com.cszjo.hadoop.rpc;

import org.apache.hadoop.ipc.VersionedProtocol;
import java.io.IOException;


public interface ClientProtocol extends VersionedProtocol {
    public static final long versionID = 1L;
    String echo(String value) throws IOException;
    int add(int a, int b) throws IOException;
}

在服务端编写具体的实现类ClientProtocolImpl,实现了协议ClientProtocol接口:

package com.cszjo.hadoop.rpc;

import org.apache.hadoop.ipc.ProtocolSignature;

import java.io.IOException;


public class ClientProtocolImpl implements ClientProtocol {
    public String echo(String value) throws IOException {
        return value;
    }
    public int add(int a, int b) throws IOException {
        return a + b;
    }
    public long getProtocolVersion(String s, long l) throws IOException {
        return ClientProtocol.versionID;
    }
    public ProtocolSignature getProtocolSignature(String s, long l, int i) throws IOException {
        return new ProtocolSignature(ClientProtocol.versionID, null);
    }
}

Server端:

Client:客户端。

Listener:Server端只存在一个Listener,主要功能就是分发,在Selector中注册了ACCEPT事件,没当有新的Client连接,便会为Client指定一个Reader线程。

Reader:Reader线程有多个,主要任务是读取请求,并将请求封装成一个Call,放入callQueue中。

Listener的run方法:

@Override
public void run() {
//轮询从selector中去获得已经就绪的事件
  while (running) {
    SelectionKey key = null;
    try {
      //得到在构造方法中实现的selector,并执行select()方法,返回已经就绪的IO *** 作的channal的个数
      getSelector().select();
      //得到已经就绪的IO *** 作的key的集合
      Iterator iter = getSelector().selectedKeys().iterator();
      while (iter.hasNext()) {
        key = iter.next();
        iter.remove();
        try {
          if (key.isValid()) {
            if (key.isAcceptable())
              //ACCEPT事件具体的 *** 作
              doAccept(key);
          }
        } catch (IOException e) {
        }
        key = null;
      }
    }
...

doAccept方法:

void doAccept(SelectionKey key) throws InterruptedException, IOException,  OutOfMemoryError {
  //从key中拿出ServerSocketChannel
  ServerSocketChannel server = (ServerSocketChannel) key.channel();
  SocketChannel channel;
  //server.accept()返回和ServerChannel连接的SocketChannel
  while ((channel = server.accept()) != null) {
    channel.configureBlocking(false);
    channel.socket().setTcpNoDelay(tcpNoDelay);
    channel.socket().setKeepAlive(true);
    //从Reader数组中拿到一个Reader,下面是getReader()方法的具体实现
    //currentReader = (currentReader + 1) % readers.length;
    //return readers[currentReader];
    //currentReader 表示的是当前使用的Reader的下标,当又有一个Client连入时,
    //就加一取Readers个数的模,得到Reader的下标
    Reader reader = getReader();
    //ConnectionManager保存了Connection的集合,
    //register方法将channel包装成了Connection对象,并加入了ConncetionManager
    Connection c = connectionManager.register(channel);
    //将Connection附加在SelectionKey上面
    key.attach(c);  // so closeCurrentConnection can get the object
    //选中的reader添加Connection
    reader.addConnection(c);
  }
}

由此看出,Listener类是一个线程类,主要任务就是为连入的Socket分配Reader。

Reader为线程类,构造方法如下:

Reader(String name) throws IOException {
  super(name);
  // pendingConnections 是个阻塞队列,保存的是这个Reader分到的Connection
  // readerPendingConnectionQueue表示阻塞队列的大小
  // 可以通过ipc.server.read.connection-queue.size参数指定
  // 默认大小是100
  this.pendingConnections =
      new linkedBlockingQueue(readerPendingConnectionQueue);
  // 每个Reader都有一个自己的Selector
  this.readSelector = Selector.open();
}

doRead方法:

void doRead(SelectionKey key) throws InterruptedException {
  int count = 0;
  // 从SelectionKey中将之前附上的Connection拿出来
  Connection c = (Connection)key.attachment();
  if (c == null) {
    return;  
  }
  c.setLastContact(Time.now());
  
  try {
    // 读方法在这里
    count = c.readAndProcess();
  }

Call类中封装了request和response,放在callQueue之中,等待Handler的处理。

public static class Call implements Schedulable {
  private final int callId;             
  // 重试次数
  private final int retryCount;        
  // RPC Request 等待Handler处理
  // 如果是用的WritableRpcEngine,这里则是一个Invocation类
  private final Writable rpcRequest;    
  // 保存着和客户端的连接信息
  private final Connection connection; 
  private long timestamp;              
  // Response                            
  private ByteBuffer rpcResponse;       
  // rpcKind
  private final RPC.RpcKind rpcKind;
  private final byte[] clientId;

在整个RPC调用中具体包括了四层:

序列化层:序列化主要作用是将结构化对象转为字节流以便于通过网络进行传输或写入持久存储,在RPC框架中。我们常见的有Java JDK默认实现的序列化,Google开源的Protocol Buffers等等,在Hadoop2.6.0中已经提供了Hadoop默认的基于Writeable和Protocol Buffers的序列化方式。

函数调用层:函数调用层主要功能是定位要调用的函数并执行该函数,如上图所示,Hadoop RPC采用了Java反射机制(服务器端)与动态代理(客户端)实现了函数调用。

网络传输层:网络传输层描述了Client与Server之间消息传输的方式,Hadoop RPC采用了基于TCP/IP的Socket机制。

服务器端处理框架:服务器端处理框架可被抽象为网络I/O模型,它描述了客户端与服务器端间信息交互方式。


 

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5682428.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存