- NettyServer.doOpen
- NettyHandler#messageReceived()
- HeaderExchangeHandler.received
- DubboProtocol->ExchangeHandlerAdapter.reply()
- 消费端接收信息HeaderExchangeHandler.received->handleResponse方法
- 总结
消费端把消息发送出去之后,服务端会收到消息,然后把执行的结果返回到客户端。 NettyServer.doOpen
NettyServer开启了netty服务,对应的handler处理是NettyHandler.
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
NettyHandler#messageReceived()
将netty的channel包装成dubbo自身封装的channel
使用dubbo自身的ChannelHandler去处理消息,而Handler与Servlet中的filter很像,通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一 对日志错误进行处理、统一对请求进行计数、控制Handler执行与否
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// 将netty的channel封装成dubbo内部的channel
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
try {
handler.received(channel, e.getMessage());
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
}
handler的执行流程比较复杂,
handler->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler- >HeaderExchangeHandler->最后进入这个方法->DubboProtocol$requestHandler(receive)
- MultiMessageHandler: 复合消息处理
- HeartbeatHandler:心跳消息处理,接收心跳并发送心跳响应
- AllChannelHandler:业务线程转化处理器,把接收到的消息封装成ChannelEventRunnable可执行任 务给线程池处理
- DecodeHandler:业务解码处理器
HeaderExchangeHandler前面有一些解码的 *** 作比较复杂,就不分析了.HeaderExchangeHandler的主要 *** 作就是对请求进行处理,
handlerRequest,双向请求
handler.received 单向请求
handleResponse 响应消息(在消费端执行)
@Override
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
// 处理请求对象
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
// 处理事件
handlerEvent(channel, request);
} else {
// 处理普通的请求
// 双向通信
if (request.isTwoWay()) {
// 向后调用服务,并得到调用结果
handleRequest(exchangeChannel, request);
} else {
// 如果是单向通信,仅向后调用指定服务即可,无需返回调用结果
handler.received(exchangeChannel, request.getData());
}
}
// 处理响应对象,服务消费方会执行此处逻辑.
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
// telnet调用
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
再看看handleRequest方法,handleRequest后面执行到handler.reply方法,我们主要看看dubbo协议,就走到DubboProtocol的匿名类对象ExchangeHandlerAdapter里面了.
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
Response res = new Response(req.getId(), req.getVersion());
// 检测请求是否合法,不合法则返回状态码为 BAD_REQUEST 的响应
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// find handler by message class.
// 获取 data 字段值,也就是 RpcInvocation 对象
Object msg = req.getData();
try {
// handle data.
// 继续向下调用
CompletableFuture<Object> future = handler.reply(channel, msg);
if (future.isDone()) {
res.setStatus(Response.OK);
res.setResult(future.get());
channel.send(res);
return;
}
future.whenComplete((result, t) -> {
try {
if (t == null) {
res.setStatus(Response.OK);
res.setResult(result);
} else {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
channel.send(res);
} catch (RemotingException e) {
logger.warn("Send result to consumer failed, channel is " + channel + ", msg is " + e);
} finally {
// HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
});
} catch (Throwable e) {
res.setStatus(Response.SERVICE_ERROR);
res.setErrorMessage(StringUtils.toString(e));
channel.send(res);
}
}
DubboProtocol->ExchangeHandlerAdapter.reply()
reply方法主要进行如下步骤,
将message转成Invocation,获取invoker,调用invoker的invoke方法,封装返回参数返回.
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: "
+ (message == null ? null : (message.getClass().getName() + ": " + message))
+ ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
Invocation inv = (Invocation) message;
// 获取invoker实例
Invoker<?> invoker = getInvoker(channel, inv);
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || !methodsStr.contains(",")) {
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods) {
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
}
if (!hasMethod) {
logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
+ " not found in callback service interface ,invoke will be ignored."
+ " please update the api interface. url is:"
+ invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext rpcContext = RpcContext.getContext();
rpcContext.setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
if (result instanceof AsyncRpcResult) {
return ((AsyncRpcResult) result).getResultFuture().thenApply(r -> (Object) r);
} else {
return CompletableFuture.completedFuture(result);
}
}
获取invoker对象就是从服务注册的时候,保存的map中获取的:我们看看export方法:
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
// if it's callback service on client side
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(Constants.STUB_EVENT_KEY));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path += "." + inv.getAttachments().get(Constants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
// 计算 service key,格式为 groupName/serviceName:serviceVersion:port。比如:
// dubbo/com.alibaba.dubbo.demo.DemoService:1.0.0:20880
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
// 从 exporterMap 查找与 serviceKey 相对应的 DubboExporter 对象,
// 服务导出过程中会将 映射关系存储到 exporterMap 集合中
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
}
//返回invoker对象
return exporter.getInvoker();
}
拿到的invoker实际也是生成的代理类,
invoker=ProtocolFilterWrapper(InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxy Invoker)))
最后一定会进入到这个代码里面 AbstractProxyInvoker
在AbstractProxyInvoker里面,doInvoker本质上调用的是wrapper.invokeMethod()。Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。
public class Wrapper0 extends Wrapper implements ClassGenerator.DC {
public static String[] pns;
public static Map pts;
public static String[] mns;
public static String[] dmns;
public static Class[] mts0;
// 省略其他方法
public Object invokeMethod(Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException {
DemoService demoService;
try {
// 类型转换
demoService = (DemoService)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
// 根据方法名调用指定的方法
if ("sayHello".equals(string) && arrclass.length == 1) {
return demoService.sayHello((String)arrobject[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class com.alibaba.dubbo.demo.DemoService.").toString());
}
}
消费端接收信息HeaderExchangeHandler.received->handleResponse方法
消费端接收消息主要就是通过DefaultFuture类来实现的,通过生成唯一id来找到对应请求的feature.
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
private void doReceived(Response res) {
lock.lock();
try {
response = res;
if (done != null) {
// 唤醒用户线程
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
总结
dubbo的调用有很多我没有讲到的地方,比如netty那块,编码解码我都没怎么看,感觉自己太菜,对那一块也不是很熟悉.故直接跳过了,后续有时间可以再仔细的分析一波.到这里dubbo源码的学习暂时告一段落了.
ChannelEventRunnable#run()
—> DecodeHandler#received(Channel, Object)
—> HeaderExchangeHandler#received(Channel, Object)
—> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)
—> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)
—> Filter#invoke(Invoker, Invocation)
—> AbstractProxyInvoker#invoke(Invocation)
—> Wrapper0#invokeMethod(Object, String, Class[], Object[])
—> DemoServiceImpl#sayHello(String)
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)