优秀的编程知识分享平台

网站首页 > 技术文章 正文

rocketmq长轮询实现消息推送(rocketmq长轮询实现消息推送信息)

nanyue 2024-08-04 16:53:28 技术文章 10 ℃

消息推送有两种:

  1. socket服务端主动推送给客户端
  2. 客户端定时轮询服务端

各自都有优缺点:

轮询会造成消息到达不及时

推送网络连接容易中断,尤其是比如断网了,wifi断开等情况导致网络断开后,服务端消息推送会失败。

长轮询

长轮询是通过客户端不停的给服务端发送拉去消息请求,如果服务端发现消息了就立即返回给客户端,如果没有发现消息,那么服务端就会等待,直到有消息后再响应给客户端,客户端收到消息后不会等待又立即发起拉取消息请求给服务端。按照这个流程一直走下去。

rocketMQ长轮询实现

上面的图是rocketmq长轮询实现流程,客户端会发送给服务端pullRequest请求,服务端发现有消息后会通知客户端,调用客户端的PullCallback,然后继续触发客户端发送pullRequest请求。这点是跟长轮询一致的。

但是长轮询也存在一个问题,比如网络异常断开了,客户端没有收到服务端响应,那么客户端就不会再次发起轮询请求了,那这样岂不是和主动推送一样了,rocketmq怎么实现的?

其实rocketMQ还有一个rebalence服务,会每隔20秒去消息队列中拉取消息,所以如果客户端和服务端网络断开了,客户端还有一个服务定时会去连接服务端的。

rocketMQ怎么解决长轮询发送拉取请求失败问题的?

try {
     this.pullAPIWrapper.pullKernelImpl(
     pullRequest.getMessageQueue(),
     subExpression,
     subscriptionData.getExpressionType(),
     subscriptionData.getSubVersion(),
     pullRequest.getNextOffset(),
     this.defaultMQPushConsumer.getPullBatchSize(),
     sysFlag,
     commitOffsetValue,
     BROKER_SUSPEND_MAX_TIME_MILLIS,
     CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
     CommunicationMode.ASYNC,
     pullCallback
     );
} catch (Exception e) {
     log.error("pullKernelImpl exception", e);
     this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}

就是这段代码,如果发送消息异常了,会调用executePullRequestLater方法,经过一定时间间隔后,把拉取请求放入队列中,然后长轮询定时任务会继续再从队列中取出拉取请求发送给客户端,直到成功为止。

长轮询问题

所以综上长轮询网络断开问题怎么处理,上面rocketMQ的是一种方式。

另外也可以在长轮询发起请求后起一个单次的定时任务,比如间隔20秒,如果20秒内服务端响应了,则定时任务不执行业务逻辑,如果超过20秒服务端还没有响应,则执行业务逻辑,比如继续给服务端发起拉取消息请求。

Tags:

最近发表
标签列表