消息消费
首先我们看看RocketMQ中消息消费需要关注哪些问题。
- 消息队列负载与重新分布
- 消息消费模式
- 消息拉取方式
- 消息进度反馈
- 消息过滤
- 顺序消息
概述
消息消费以组的模式展开,一个消费组内可以包含多个消费者(同一个JVM实例内只允许不允许存在消费组相同的消费者),消费组之间要保持统一的订阅关系,这一点很重要。
消费组之间有两种消费模式:
- 广播模式:主题下的同一条消息将被集群内的所有消费者消费一次。
- 集群模式:主题下的同一条消息只允许呗其中一个消费者消费。
消息服务器与消费者之间的消息传送也有两种方式:
- 拉模式:消费端主动发起拉请求
- 推模式:消息达到服务器后,推送给消息消费者(实际上推模式也是基于拉模式实现的,在拉模式上封装了一层)。
源码解析
消费者启动流程
先来看看DefaultMQPushConsumerImpl#start方法。
// 检查配置
this.checkConfig();
// 构建主题订阅消息SubscriptionData
this.copySubscription();
复制代码
检查配置没什么可说的,我们来看看copySubscription方法。
private void copySubscription() throws MQClientException {
try {
// 取出订阅关系表
Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
if (sub != null) {
for (final Map.Entry<String, String> entry : sub.entrySet()) {
final String topic = entry.getKey();
final String subString = entry.getValue();
// 构造订阅关系subscriptionData
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),topic, subString);
// 加入到rebalanceImpl的subscriptionInner中
// subscriptionInner是一个ConcurrentMapHashMap
// 就是订阅关系表,以topic为key
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
}
}
// 注册监听器
if (null == this.messageListenerInner) {
this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
}
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 如果是广播模式,不需要重试
case BROADCASTING:
break;
// 如果是集群模式,则将重试topic也加入到subscriptionInner中,
// 消息重试是以消费组为单位,topic为%RETRY%+消费组名
case CLUSTERING:
final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
retryTopic, SubscriptionData.SUB_ALL);
this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
break;
default:
break;
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
复制代码
可以看到,subscriptionInner不光保存了使用者相关的订阅关系,还保存了以%RETRY%+消费组名为topic的订阅关系,自动订阅了重试topic。
我们继续看消费者启动流程。
// 如果是集群模式 则将instanceName更改为进程id
if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
this.defaultMQPushConsumer.changeInstanceNameToPID();
}
// 初始化ClientInstance,在上篇文章中已经说过了,MQClientManager是单例的。
// 然后向MQClientManager的factoryTable添加本Instance的实例,key为ip+pid,
// 也就是说单个进程内只有一个MQClientInstance,除非自己设置过InstanceName。
this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
// 初始化消息重新负载实现类
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
复制代码
上面一步主要是生成MQClientInstance并注册到MQClientManager的factoryTable中,该实例主要是用于和外部通信。
然后初始化了消息负载均衡的实现类。
// 长连接,主要用于从broker拉取消息并交由用户的Listener处理
this.pullAPIWrapper = new PullAPIWrapper(
mQClientFactory,
this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
复制代码
初始化了pullAPIWrapper。
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
switch (this.defaultMQPushConsumer.getMessageModel()) {
// 如果是广播消费模式,则offset在本地存储
case BROADCASTING:
this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
// 如果是集群模式,则将offset存储在broker端。
case CLUSTERING:
this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
break;
default:
break;
}
this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
// 加载offset,集群模式下是个空方法
this.offsetStore.load();
复制代码
这里初始化了offsetStore,集群模式下offset存储在broker端,广播模式下存储在本地。这里也很好理解,集群模式一条消息只允许被一个consumer消费,广播模式一条消息则需要被所有consumer消费。
// 如果是顺序消息
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
// 并发消息
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// 启动consumeMessageService消费线程服务
this.consumeMessageService.start();
复制代码
初始化并启动consumeMessageService消费线程服务,主要负责消息消费,分为有序消息和无序消息。 我们先来看看并发消费线程服务的实现,顺序消费逻辑下面再讲。
先看看ConsumeMessageConcurrentlyService的构造方法。
public ConsumeMessageConcurrentlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerConcurrently messageListener) {
// 设置defaultMQPushConsumerImpl
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
// 注册用户的messageListener具体处理类
this.messageListener = messageListener;
// 设置defaultMQPushConsumer
this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
// 设置消费组
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
// 消费请求队列
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
// 消费线程池
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
// 定时
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
// 定时清理线程池
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("CleanExpireMsgScheduledThread_"));
}
复制代码
主要是初始化了各种线程池。
// 开启了一个定时清理过期消息的线程。
public void start() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);
}
复制代码
详细的后面会讲,这里主要就是三个线程池,其实这也是RocketMQ的魅力所在,各种异步相互协调。
回到消费者启动流程。
// 向MQClientInstance注册defaultMQPushConsumerImpl,以消费组的为key,也就是说一个进程内只允许有一个同名的消费组实例
boolean registerOK mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
// 如果注册失败,则抛出异常
if (!registerOK) {
` this.serviceState = ServiceState.CREATE_JUST;
this.consumeMessageService.shutdown();
throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
null);
}
// 启动MQClientInstance
mQClientFactory.start();
复制代码
主要是向MQClientInstance注册defaultMQPushConsumerImpl,以及启动MQClientInstance。看看MQClientInstance启动干了什么。
public void start() throws MQClientException {
synchronized (this) {
switch (this.serviceState) {
case CREATE_JUST:
this.serviceState = ServiceState.START_FAILED;
// If not specified,looking address from name server
if (null == this.clientConfig.getNamesrvAddr()) {
this.mQClientAPIImpl.fetchNameServerAddr();
}
// Start request-response channel
this.mQClientAPIImpl.start();
// Start various schedule tasks
this.startScheduledTask();
// Start pull service
this.pullMessageService.start();
// Start rebalance service
this.rebalanceService.start();
// Start push service
this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
log.info("the client factory [{}] start OK", this.clientId);
this.serviceState = ServiceState.RUNNING;
break;
case RUNNING:
break;
case SHUTDOWN_ALREADY:
break;
case START_FAILED:
throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
default:
break;
}
}
}
复制代码
让我们看看下面定时任务做了什么。
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);
复制代码
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
复制代码
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
复制代码
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
复制代码
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);
复制代码
消费者启动流程总结
- 检查配置
- 将订阅关系添加到负载实现类中
- 改变实例id为进程id
- 初始化MQClientInstance并将MQClientInstance加入到MQClientManager中
- 初始化负载均衡实现类
- 初始化长连接
- 初始化offsetStore
- 初始化并启动consumeMessageService消费线程服务
- 向MQClientInstance注册自己
- 启动MQClientInstance
消息拉取流程
PullMessageService
从上文可以看到,一个进程内只存在一个MQClientInstance(自己设置InstanceName除外),从MQClientInstance的启动流程可以看出,MQClientInstance使用一个单独的线程PullMessageService来负责消息的拉取。
PullMessageService继承了ServiceThread,ServiceThread实现了Runnable,实际上ServiceThread只是重写了start等方法,将该线程设置为了守护线程。我们先来看看PullMessageService的run方法。
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
复制代码
他的主要任务就是循环不断阻塞的从pullRequestQueue中取出pullRequest。
然后调用pullMessage方法进行处理。
我们先来看一下pullRequest是什么时候被放到pullRequestQueue队列里去的。 PullMessageService
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
复制代码
分别会在消息拉取任务结束后,又重新将PullRequest对象放入到pullRequestQueue。
和RebalanceImpl中创建。
从上面得知,PullMessageServuce只有在拿到PullRequst对象时才会执行拉取任务,我们先看看PullRequest到底是什么。
public class PullRequest {
/**
* 消费者组
*/
private String consumerGroup;
/**
* 待拉取消费队列
*/
private MessageQueue messageQueue;
/**
* 消息处理队列
*/
private ProcessQueue processQueue;
/**
* 偏移量
*/
private long nextOffset;
/**
* 是否被锁定
*/
private boolean lockedFirst = false;
}
复制代码
继续回到PullMessageService
private void pullMessage(final PullRequest pullRequest) {
// 根据消费组名从MQclientInstance中获取对应的实现类
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
// 调用DefaultMQPushConsumerImpl的pullMessage
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
复制代码
在讲解消息拉取过程之前。我们先讲一下ProcessQueue。
ProcessQueue是MessageQueue在消息端的快照。PullMessageService从消息服务器默认每次拉取32跳消息,按消息的队列偏移量顺序存放在ProcessQueue中,PullMessageService然后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。
消息拉取流程
消息拉取分为3个主要步骤。
- Client消息拉取请求封装
- Broker查找并返回消息
- Client处理返回的消息
消息拉取
// 拿到队列快照
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 如果处理队列当前状态被丢弃
if (processQueue.isDropped()) {
log.info("the pull request[{}] is dropped.", pullRequest.toString());
return;
}
// 更新processQueue的LastPullTimestamp为当前时间戳
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
// 判断当前Client是否RUNNING状态
try {
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
return;
}
// 如果当前消费者被挂起,则将拉取任务延迟一秒再次放入PullMessageService的拉取任务队列中,结束本次拉取
if (this.isPause()) {
log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
return;
}
复制代码
一些消息拉取的前置校验。
// 如果消息总数大于1000
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
// 延迟放入则将拉取任务延迟一秒再次放入PullMessageService的拉取任务队列中
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// 流控次数达到1000次则打警告日志
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
// 如果消息大小大于100MB
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
// 延迟放入则将拉取任务延迟一秒再次放入PullMessageService的拉取任务队列中
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
// 流控次数达到1000次则打警告日志
if ((queueFlowControlTimes++ % 1000) == 0) {
log.warn(
"the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
}
return;
}
复制代码
从两个维度来进行流控:
- 消息总数不得大于1000
- 消息大小不得大于100MB
// 如果不是顺序消息
if (!this.consumeOrderly) {
// 如果快照内的消息最大偏移量间隔大于2000
if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
log.warn(
"the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
pullRequest, queueMaxSpanFlowControlTimes);
}
return;
}
}
复制代码
如果是并发消息消费的话,还要判断快照内消息最大偏移量间隔是否大于2000,如果是则流控。
顺序消息的逻辑我们下面再讲。
// 拿到该主题订阅信息
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
// 如果为空,则延迟3s拉取
if (null == subscriptionData) {
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
复制代码
从rebalanceImpl中拿到该主题的订阅信息。
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
// 如果是集群消费模式
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 从本地缓存中读取offset消费进度
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
// 如果消费进度大于0
if (commitOffsetValue > 0) {
// 则告诉Broker需要保存消费进度
commitOffsetEnable = true;
}
}
//这段不是很明白
String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
subExpression = sd.getSubString();
}
classFilter = sd.isClassFilterMode();
}
// 构建消息拉取系统标记
int sysFlag = PullSysFlag.buildSysFlag(
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
复制代码
系统标记用四个二进制位分别表示四个状态位。
/**
* 表示从内存中读取的消费进度大于0,则设置该标记位
*/
private final static int FLAG_COMMIT_OFFSET = 0x1;
/**
* 表示消息拉取时支持挂起
*/
private final static int FLAG_SUSPEND = 0x1 << 1;
/**
* 消息过滤机制位表达式机制
*/
private final static int FLAG_SUBSCRIPTION = 0x1 << 2;
/**
* 消息过滤机制为类过滤模式
*/
private final static int FLAG_CLASS_FILTER = 0x1 << 3;
复制代码
继续回到拉取消息。
try {
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(), // 具体哪个消息队列
subExpression, // 消息过滤表达式
subscriptionData.getExpressionType(), // 消息过滤表达式类型
subscriptionData.getSubVersion(), // 消息过滤表达式版本号
pullRequest.getNextOffset(), //消息拉取偏移量
this.defaultMQPushConsumer.getPullBatchSize(), //本次拉取最大消息条数
sysFlag, // 拉取消息系统标记
commitOffsetValue, //当前MessageQueue的消费进度(内存内)
BROKER_SUSPEND_MAX_TIME_MILLIS, //拉取过程中允许broker挂起时间
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, //拉取消息超时时间
CommunicationMode.ASYNC, //异步拉取
pullCallback // 从broker拉取到消息的回调方法
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
复制代码
最终拉取消息。
我们看一下org.apache.rocketmq.client.impl.consumer.PullAPIWrapper#pullKernelImpl方法。
// 根据brokername以及brokerId从内存中查询broker地址相关信息
FindBrokerResult findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
// 如果内存为空,则先从NamerServer更新一下内存的Broker地址
// 再从内存中拿
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
findBrokerResult =
this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
this.recalculatePullFromWhichNode(mq), false);
}
复制代码
首先拿到broker相关信息,经典的缓存策略。
if (findBrokerResult != null) {
{
// 如果过滤表达式不是TAG类型 && broker版本小于V4_1_0_SNAPSHOT
// 也就是说V4_1_0版本之前是只支持tag类型的
if (!ExpressionType.isTagType(expressionType)
&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
}
}
int sysFlagInner = sysFlag;
//如果是从节点 则清除FLAG_COMMIT_OFFSET标志位
if (findBrokerResult.isSlave()) {
sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
}
// 拼接请求
PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
requestHeader.setConsumerGroup(this.consumerGroup);
requestHeader.setTopic(mq.getTopic());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setQueueOffset(offset);
requestHeader.setMaxMsgNums(maxNums);
requestHeader.setSysFlag(sysFlagInner);
requestHeader.setCommitOffset(commitOffset);
requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
requestHeader.setSubscription(subExpression);
requestHeader.setSubVersion(subVersion);
requestHeader.setExpressionType(expressionType);
String brokerAddr = findBrokerResult.getBrokerAddr();
if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
}
// 发送请求
PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
brokerAddr,
requestHeader,
timeoutMillis,
communicationMode,
pullCallback);
return pullResult;
}
复制代码
最后通过pullMessageAsync异步发送请求。
消息拉取客户端处理消息
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
if (!responseFuture.isSendRequestOK()) {
pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
} else if (responseFuture.isTimeout()) {
pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
responseFuture.getCause()));
} else {
pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
}
}
}
});
复制代码
解析请求成功则会回调onSuccess方法。
// 拿到下一次拉取的offset
long prevRequestOffset = pullRequest.getNextOffset();
// 设置下一次拉取offset
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
复制代码
首先设置下次拉取的offset。
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
复制代码
如果没有消息返回,则将pullRequest放入pullMessageService的pullRequestQueue中。
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
复制代码
把拉取到的消息存储processQueue。 然后将拉取到的消息交给consumeMessageService的线程池去处理。
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
复制代码
之后再把pullRequest放到pullMessageService中,拉取任务就已经结束了。
还有异常逻辑,有兴趣的同学可以去看一下。这里就不进行深入讲解了。
todo 流程图
todo 负载均衡
··· 未完待续