专栏名称: FangZzzzz
目录
相关文章推荐
今天看啥  ›  专栏  ›  FangZzzzz

RocketMQ深度解析(四):Consumer

FangZzzzz  · 掘金  ·  · 2019-09-25 11:43

文章预览

阅读 15

RocketMQ深度解析(四):Consumer

消息消费

首先我们看看RocketMQ中消息消费需要关注哪些问题。

  • 消息队列负载与重新分布
  • 消息消费模式
  • 消息拉取方式
  • 消息进度反馈
  • 消息过滤
  • 顺序消息

概述

消息消费以组的模式展开,一个消费组内可以包含多个消费者(同一个JVM实例内只允许不允许存在消费组相同的消费者),消费组之间要保持统一的订阅关系,这一点很重要

消费组之间有两种消费模式:

  • 广播模式:主题下的同一条消息将被集群内的所有消费者消费一次。
  • 集群模式:主题下的同一条消息只允许呗其中一个消费者消费。

消息服务器与消费者之间的消息传送也有两种方式:

  • 拉模式:消费端主动发起拉请求
  • 推模式:消息达到服务器后,推送给消息消费者(实际上推模式也是基于拉模式实现的,在拉模式上封装了一层)。

源码解析

消费者启动流程

先来看看DefaultMQPushConsumerImpl#start方法。

  // 检查配置
  this.checkConfig();
  // 构建主题订阅消息SubscriptionData
  this.copySubscription();
复制代码

检查配置没什么可说的,我们来看看copySubscription方法。

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#copySubscription
private void copySubscription() throws MQClientException {
    try {
        // 取出订阅关系表
        Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
        if (sub != null) {
            for (final Map.Entry<String, String> entry : sub.entrySet()) {
                final String topic = entry.getKey();
                final String subString = entry.getValue();
                // 构造订阅关系subscriptionData
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString);
                // 加入到rebalanceImpl的subscriptionInner中
                // subscriptionInner是一个ConcurrentMapHashMap 
                // 就是订阅关系表,以topic为key
                this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
            }
        }
        
        // 注册监听器
        if (null == this.messageListenerInner) {
            this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            // 如果是广播模式,不需要重试
            case BROADCASTING:
                break;
            // 如果是集群模式,则将重试topic也加入到subscriptionInner中,
            // 消息重试是以消费组为单位,topic为%RETRY%+消费组名
            case CLUSTERING:
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
        }
    } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
    }
}
复制代码

可以看到,subscriptionInner不光保存了使用者相关的订阅关系,还保存了以%RETRY%+消费组名为topic的订阅关系,自动订阅了重试topic。

我们继续看消费者启动流程。

// 如果是集群模式 则将instanceName更改为进程id
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();
}

// 初始化ClientInstance,在上篇文章中已经说过了,MQClientManager是单例的。
// 然后向MQClientManager的factoryTable添加本Instance的实例,key为ip+pid,
// 也就是说单个进程内只有一个MQClientInstance,除非自己设置过InstanceName。
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

// 初始化消息重新负载实现类
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
复制代码

上面一步主要是生成MQClientInstance并注册到MQClientManager的factoryTable中,该实例主要是用于和外部通信。

然后初始化了消息负载均衡的实现类。

    // 长连接,主要用于从broker拉取消息并交由用户的Listener处理
    this.pullAPIWrapper = new PullAPIWrapper(
    mQClientFactory,
    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
    this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
复制代码

初始化了pullAPIWrapper。

if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
    switch (this.defaultMQPushConsumer.getMessageModel()) {
        // 如果是广播消费模式,则offset在本地存储
        case BROADCASTING:
            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        // 如果是集群模式,则将offset存储在broker端。
        case CLUSTERING:
            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
            break;
        default:
            break;
    }
    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加载offset,集群模式下是个空方法
this.offsetStore.load();
复制代码

这里初始化了offsetStore,集群模式下offset存储在broker端,广播模式下存储在本地。这里也很好理解,集群模式一条消息只允许被一个consumer消费,广播模式一条消息则需要被所有consumer消费。

// 如果是顺序消息
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    this.consumeOrderly = true;
    this.consumeMessageService = 
        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    // 并发消息
    this.consumeOrderly = false;
    this.consumeMessageService =
    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 启动consumeMessageService消费线程服务
this.consumeMessageService.start();
复制代码

初始化并启动consumeMessageService消费线程服务,主要负责消息消费,分为有序消息和无序消息。 我们先来看看并发消费线程服务的实现,顺序消费逻辑下面再讲。

先看看ConsumeMessageConcurrentlyService的构造方法。

org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#ConsumeMessageConcurrentlyService
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
        MessageListenerConcurrently messageListener) {
        // 设置defaultMQPushConsumerImpl
        this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
        // 注册用户的messageListener具体处理类
        this.messageListener = messageListener;
        // 设置defaultMQPushConsumer
        this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
        // 设置消费组
        this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
        // 消费请求队列
        this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

        // 消费线程池
        this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(),
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));
        // 定时
        this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
        // 定时清理线程池
        this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
    }
复制代码

主要是初始化了各种线程池。

    // 开启了一个定时清理过期消息的线程。
    public void start() {
        this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                cleanExpireMsg();
            }

        }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
    }
复制代码

详细的后面会讲,这里主要就是三个线程池,其实这也是RocketMQ的魅力所在,各种异步相互协调。

回到消费者启动流程。

// 向MQClientInstance注册defaultMQPushConsumerImpl,以消费组的为key,也就是说一个进程内只允许有一个同名的消费组实例
boolean registerOK mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
// 如果注册失败,则抛出异常
if (!registerOK) {
`   this.serviceState = ServiceState.CREATE_JUST;
    this.consumeMessageService.shutdown();
    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
    + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
    null);
}
// 启动MQClientInstance
mQClientFactory.start();
复制代码

主要是向MQClientInstance注册defaultMQPushConsumerImpl,以及启动MQClientInstance。看看MQClientInstance启动干了什么。

org.apache.rocketmq.client.impl.factory.MQClientInstance#start
    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
复制代码

让我们看看下面定时任务做了什么。

每隔2分钟尝试获取一次NameServer地址
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
            } catch (Exception e) {
                log.error("ScheduledTask fetchNameServerAddr exception", e);
            }
        }
    }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
复制代码
每隔30S尝试更新主题路由信息
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.updateTopicRouteInfoFromNameServer();
            } catch (Exception e) {
                log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
            }
        }
    }, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
复制代码
每隔30S进行Broker心跳检测
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.cleanOfflineBroker();
                MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
            } catch (Exception e) {
                log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
            }
        }
    }, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
复制代码
默认每隔5秒持久化ConsumeOffset
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制代码
默认每隔1S检查线程池适配
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.adjustThreadPool();
            } catch (Exception e) {
                log.error("ScheduledTask adjustThreadPool exception", e);
            }
        }
    }, 1, 1, TimeUnit.MINUTES);
复制代码

消费者启动流程总结

  • 检查配置
  • 将订阅关系添加到负载实现类中
  • 改变实例id为进程id
  • 初始化MQClientInstance并将MQClientInstance加入到MQClientManager中
  • 初始化负载均衡实现类
  • 初始化长连接
  • 初始化offsetStore
  • 初始化并启动consumeMessageService消费线程服务
  • 向MQClientInstance注册自己
  • 启动MQClientInstance

消息拉取流程

PullMessageService

从上文可以看到,一个进程内只存在一个MQClientInstance(自己设置InstanceName除外),从MQClientInstance的启动流程可以看出,MQClientInstance使用一个单独的线程PullMessageService来负责消息的拉取。

PullMessageService继承了ServiceThread,ServiceThread实现了Runnable,实际上ServiceThread只是重写了start等方法,将该线程设置为了守护线程。我们先来看看PullMessageService的run方法。

org.apache.rocketmq.client.impl.consumer.PullMessageService#run
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                this.pullMessage(pullRequest);
            } catch (InterruptedException ignored) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }
复制代码

他的主要任务就是循环不断阻塞的从pullRequestQueue中取出pullRequest。

然后调用pullMessage方法进行处理。

我们先来看一下pullRequest是什么时候被放到pullRequestQueue队列里去的。 PullMessageService

    public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        if (!isStopped()) {
            this.scheduledExecutorService.schedule(new Runnable() {
                @Override
                public void run() { 
                    PullMessageService.this.executePullRequestImmediately(pullRequest);
                }
            }, timeDelay, TimeUnit.MILLISECONDS);
        } else {
            log.warn("PullMessageServiceScheduledThread has shutdown");
        }
    }

    public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }
复制代码

分别会在消息拉取任务结束后,又重新将PullRequest对象放入到pullRequestQueue。

和RebalanceImpl中创建。

从上面得知,PullMessageServuce只有在拿到PullRequst对象时才会执行拉取任务,我们先看看PullRequest到底是什么。

public class PullRequest {
    /**
     * 消费者组
     */
    private String consumerGroup;
    /**
     * 待拉取消费队列
     */
    private MessageQueue messageQueue;
    /**
     * 消息处理队列
     */
    private ProcessQueue processQueue;
    /**
     * 偏移量
     */
    private long nextOffset;
    /**
     * 是否被锁定
     */
    private boolean lockedFirst = false;
}
复制代码

继续回到PullMessageService

    private void pullMessage(final PullRequest pullRequest) {
        // 根据消费组名从MQclientInstance中获取对应的实现类
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) {
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            // 调用DefaultMQPushConsumerImpl的pullMessage
            impl.pullMessage(pullRequest);
        } else {
            log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
        }
    }
复制代码

在讲解消息拉取过程之前。我们先讲一下ProcessQueue。

ProcessQueue是MessageQueue在消息端的快照。PullMessageService从消息服务器默认每次拉取32跳消息,按消息的队列偏移量顺序存放在ProcessQueue中,PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。

消息拉取流程

消息拉取分为3个主要步骤。

  • Client消息拉取请求封装
  • Broker查找并返回消息
  • Client处理返回的消息

消息拉取

org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage
        // 拿到队列快照
        final ProcessQueue processQueue = pullRequest.getProcessQueue();
        // 如果处理队列当前状态被丢弃
        if (processQueue.isDropped()) {
            log.info("the pull request[{}] is dropped.", pullRequest.toString());
            return;
        }
        // 更新processQueue的LastPullTimestamp为当前时间戳
        pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

        // 判断当前Client是否RUNNING状态
        try {
            this.makeSureStateOK();
        } catch (MQClientException e) {
            log.warn("pullMessage exception, consumer state not ok", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            return;
        }

        // 如果当前消费者被挂起,则将拉取任务延迟一秒再次放入PullMessageService的拉取任务队列中,结束本次拉取
        if (this.isPause()) {
            log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
            return;
        }
复制代码

一些消息拉取的前置校验。

        // 如果消息总数大于1000
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
            // 延迟放入则将拉取任务延迟一秒再次放入PullMessageService的拉取任务队列中
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            // 流控次数达到1000次则打警告日志
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
        
        // 如果消息大小大于100MB
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
            // 延迟放入则将拉取任务延迟一秒再次放入PullMessageService的拉取任务队列中
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            // 流控次数达到1000次则打警告日志
            if ((queueFlowControlTimes++ % 1000) == 0) {
                log.warn(
                    "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            }
            return;
        }
复制代码

从两个维度来进行流控:

  • 消息总数不得大于1000
  • 消息大小不得大于100MB
        // 如果不是顺序消息
        if (!this.consumeOrderly) {
            // 如果快照内的消息最大偏移量间隔大于2000
            if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
                if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                    log.warn(
                        "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                        processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                        pullRequest, queueMaxSpanFlowControlTimes);
                }
                return;
            }
        }
复制代码

如果是并发消息消费的话,还要判断快照内消息最大偏移量间隔是否大于2000,如果是则流控。

顺序消息的逻辑我们下面再讲。

        // 拿到该主题订阅信息
        final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        // 如果为空,则延迟3s拉取
        if (null == subscriptionData) {
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
            log.warn("find the consumer's subscription failed, {}", pullRequest);
            return;
        }
复制代码

从rebalanceImpl中拿到该主题的订阅信息。

        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        // 如果是集群消费模式
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            // 从本地缓存中读取offset消费进度
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            // 如果消费进度大于0
            if (commitOffsetValue > 0) {
                // 则告诉Broker需要保存消费进度
                commitOffsetEnable = true;
            }
        }

        //这段不是很明白
        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString();
            }

            classFilter = sd.isClassFilterMode();
        }
    
        // 构建消息拉取系统标记
        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
        );
复制代码

系统标记用四个二进制位分别表示四个状态位。

org.apache.rocketmq.common.sysflag.PullSysFlag
    /**
     * 表示从内存中读取的消费进度大于0,则设置该标记位
     */
    private final static int FLAG_COMMIT_OFFSET = 0x1;
    /**
     * 表示消息拉取时支持挂起
     */
    private final static int FLAG_SUSPEND = 0x1 << 1;
    /**
     * 消息过滤机制位表达式机制
     */
    private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
    /**
     * 消息过滤机制为类过滤模式
     */
    private final static int FLAG_CLASS_FILTER = 0x1 << 3;
复制代码

继续回到拉取消息。

        try {
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),  // 具体哪个消息队列
                subExpression,                  // 消息过滤表达式
                subscriptionData.getExpressionType(), // 消息过滤表达式类型
                subscriptionData.getSubVersion(),  // 消息过滤表达式版本号
                pullRequest.getNextOffset(),    //消息拉取偏移量
                this.defaultMQPushConsumer.getPullBatchSize(),   //本次拉取最大消息条数
                sysFlag,    // 拉取消息系统标记
                commitOffsetValue,  //当前MessageQueue的消费进度(内存内)
                BROKER_SUSPEND_MAX_TIME_MILLIS, //拉取过程中允许broker挂起时间
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,   //拉取消息超时时间
                CommunicationMode.ASYNC,    //异步拉取
                pullCallback    // 从broker拉取到消息的回调方法
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
        }
复制代码

最终拉取消息。

我们看一下org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl方法。

        // 根据brokername以及brokerId从内存中查询broker地址相关信息
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
        // 如果内存为空,则先从NamerServer更新一下内存的Broker地址
        // 再从内存中拿
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }
复制代码

首先拿到broker相关信息,经典的缓存策略。

        if (findBrokerResult != null) {
            {
                // 如果过滤表达式不是TAG类型 && broker版本小于V4_1_0_SNAPSHOT
                // 也就是说V4_1_0版本之前是只支持tag类型的
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            //如果是从节点 则清除FLAG_COMMIT_OFFSET标志位
            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }
            // 拼接请求
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
            // 发送请求
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
        }
复制代码

最后通过pullMessageAsync异步发送请求。

消息拉取客户端处理消息

org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync
        this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
            @Override
            public void operationComplete(ResponseFuture responseFuture) {
                RemotingCommand response = responseFuture.getResponseCommand();
                if (response != null) {
                    try {
                        PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
                        assert pullResult != null;
                        pullCallback.onSuccess(pullResult);
                    } catch (Exception e) {
                        pullCallback.onException(e);
                    }
                } else {
                    if (!responseFuture.isSendRequestOK()) {
                        pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
                    } else if (responseFuture.isTimeout()) {
                        pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                            responseFuture.getCause()));
                    } else {
                        pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
                    }
                }
            }
        });
复制代码

解析请求成功则会回调onSuccess方法。

org.apache.rocketmq.client.consumer.PullCallback#onSuccess
    // 拿到下一次拉取的offset
    long prevRequestOffset = pullRequest.getNextOffset();
    // 设置下一次拉取offset
    pullRequest.setNextOffset(pullResult.getNextBeginOffset());
复制代码

首先设置下次拉取的offset。

if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
复制代码

如果没有消息返回,则将pullRequest放入pullMessageService的pullRequestQueue中。

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
复制代码

把拉取到的消息存储processQueue。 然后将拉取到的消息交给consumeMessageService的线程池去处理。

    if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
        DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
    } else {
        DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
    }
复制代码

之后再把pullRequest放到pullMessageService中,拉取任务就已经结束了。

还有异常逻辑,有兴趣的同学可以去看一下。这里就不进行深入讲解了。

todo 流程图

todo 负载均衡

··· 未完待续

………………………………

原文地址:访问原文地址
快照地址: 访问文章快照
总结与预览地址:访问总结与预览
推荐文章