A线程异步调用B线程,整个过程链路是怎样的?比如dubbo调用netty的过程代码是怎样的?比如发送异步MQ消息的代码如何实现?
一、一图总结线程交互逻辑:
二、调用方A线程的视角
A线程调用B线程之前,A线程把自己存入一个全局的ConcurrentHashMap中,并把Map的key作为参数传给线程B,然后A线程调用Thread的wait()方法进入WAITTING状态,,等到B线程处理完后利用A的Key从Map中取出对应的线程,进行唤醒notify()
三、接收放B线程的视角
B线程本身也是一个异步处理,当接收到A的请求后,把A的参数存入处理队列,然后通知A接收成功(A接收到成功的消息后调用Thread的wait()方法进入WAITTING状);B的处理多线程按规则处理队列里面的任务,完成相应的任务后从全局的ConcurrentHashMap中取出A的线程,调用A的notify(),然后A开始运行后告诉B自己成功了,则B完成一次任务
四、线程状态机(供参考用)
五、具体解析
5.1 发送同步转异步
在网络通信框架,都会面临一个很经典的问题,上游业务调用外部方法是同步的,但是网络请求会设计成异步,当服务端处理完成以后告诉客户端,这个过程如何实现? 举例:业务调用本地的netty客户端发起请求以后阻塞,netty客户端异步发送请求给服务端,当服务端处理完成以后告诉客户端,当客户端收到响应以后,唤醒之前阻塞的业务方调用。这个过程如何实现?
而rocketMq 在这个地方的实现上代码是比较简洁的。(dubbo调用netty的过程也超级简直,有兴趣可以了解)代码如下:
代码片段一,同步发送消息的入口:
作者:随风
链接:https://www.zhihu.com/question/623539711/answer/3224195091
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
        final long timeoutMillis)
        throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
        final int opaque = request.getOpaque();
        try {
            final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
            this.responseTable.put(opaque, responseFuture); //------------①
            final SocketAddress addr = channel.remoteAddress();
            channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture f) throws Exception {
                    if (f.isSuccess()) {
                        responseFuture.setSendRequestOK(true);
                        return;
                    } else {
                        responseFuture.setSendRequestOK(false);
                    }
                    responseTable.remove(opaque);
                    responseFuture.setCause(f.cause());
                    responseFuture.putResponse(null);
                    log.warn("send a request command to channel <" + addr + "> failed.");
                }
            });
            RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); //------②
            if (null == responseCommand) {
                if (responseFuture.isSendRequestOK()) {
                    throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
                        responseFuture.getCause());
                } else {
                    throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
                }
            }
            return responseCommand;
        } finally {
            this.responseTable.remove(opaque);
        }
    }下面看下②处里面具体的代码:
代码片段二:
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException {
        this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        return this.responseCommand;
   }可以理解为responseFuture中有一个responseCommand对象,它放置的就是真正的响应结果,但是它只有在countDownLatch.countdown()以后,才不会阻塞在上面的this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS) 这里,所以可以想到,还有另外一个线程在收到服务端响应后会调用countDownLatch.countdown,继续看下文另外一个线程怎么在接收到服务端响应后来处理。
下面看下当客户端收到服务端响应是否会这样呢?代码如下:
代码片段三,收到服务端响应入口:
作者:随风
链接:https://www.zhihu.com/question/623539711/answer/3224195091
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
        final int opaque = cmd.getOpaque();
        final ResponseFuture responseFuture = responseTable.get(opaque); //---①
        if (responseFuture != null) {
            responseFuture.setResponseCommand(cmd);
            responseTable.remove(opaque);
            if (responseFuture.getInvokeCallback() != null) {
                executeInvokeCallback(responseFuture);
            } else {
                responseFuture.putResponse(cmd);  //----②
                responseFuture.release();
            }
        } else {
            log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
            log.warn(cmd.toString());
        }
    }代码片段四:
public void putResponse(final RemotingCommand responseCommand) {
        this.responseCommand = responseCommand;
        this.countDownLatch.countDown(); //----------③
    }5.2. 回调采用支持独立线程池
作者:随风
链接:https://www.zhihu.com/question/623539711/answer/3224195091
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
private void executeInvokeCallback(final ResponseFuture responseFuture) {
        boolean runInThisThread = false;
        ExecutorService executor = this.getCallbackExecutor();//------------1
        if (executor != null) {
            try {
                executor.submit(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            responseFuture.executeInvokeCallback();//------2
                        } catch (Throwable e) {
                            log.warn("execute callback in executor exception, and callback throw", e);
                        } finally {
                            responseFuture.release();
                        }
                    }
                });
            } catch (Exception e) {
                runInThisThread = true;
                log.warn("execute callback in executor exception, maybe executor busy", e);
            }
        } else {
            runInThisThread = true;
        }
        if (runInThisThread) {
            try {
                responseFuture.executeInvokeCallback();
            } catch (Throwable e) {
                log.warn("executeInvokeCallback Exception", e);
            } finally {
                responseFuture.release();
            }
        }
    }上面的这段代码是消息发送成功后触发业务方回调的方法,
设计上采用:ExecutorService executor = this.getCallbackExecutor(); 这里采用线程池异步处理,可以防止回
调方法的不确定性(超时等)阻碍消息接收的主线程。
