今天看啥  ›  专栏  ›  FangZzzzz

RocketMQ深度解析(三):Producer

FangZzzzz  · 掘金  ·  · 2019-09-02 06:45
阅读 11

RocketMQ深度解析(三):Producer

消息发送

RocketMQ支持3种消息发送模式:

  • 同步(sync):发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果。
  • 异步(async):发送者向MQ执行发送消息API时,指定消息发送成功后的回调函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行。
  • 单向(oneway):消息发送者向MQ执行发送消息API时,直接返回,不等待消息服务器的结果,也不注册回调函数。

RocketMQ消息

Message属性

Message全属性构造函数

    public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
        this.topic = topic;
        this.flag = flag;
        this.body = body;

        if (tags != null && tags.length() > 0)
            this.setTags(tags);

        if (keys != null && keys.length() > 0)
            this.setKeys(keys);

        this.setWaitStoreMsgOK(waitStoreMsgOK);
    }
复制代码

延迟级别属性

    public int getDelayTimeLevel() {
        String t = this.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        if (t != null) {
            return Integer.parseInt(t);
        }

        return 0;
    }

    public void setDelayTimeLevel(int level) {
        this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
    }
复制代码

Message扩展属性

  • tag:消息TAG,用于消息过略。
  • keys:Message索引键,多用空格隔开。
  • waitStoreMsgOK:消息发送时是否等消息存储完成后才返回。
  • delayTimeLevel:消息延迟级别,用于定时消息或消息重试。

这些扩展属性存储在Message的properties中。

Producer启动流程

启动一个Producer我们使用的是DefaultMQProducer这个类。

org.apache.rocketmq.client.producer.DefaultMQProducer#start
    @Override
    public void start() throws MQClientException {
        this.setProducerGroup(withNamespace(this.producerGroup));
        this.defaultMQProducerImpl.start();
        if (null != traceDispatcher) {
            try {
                traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
            } catch (MQClientException e) {
                log.warn("trace dispatcher start failed ", e);
            }
        }
    }
复制代码

可以看到,该start方法中只做了两件事,一件事是为ProduerGroup加上命名空间,这里实际上就是加上了NameServer的地址相关信息,有兴趣的同学可以看一下withNamespace的源码。另外一件事就是调用了defaultMQProducerImpl.start()。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#start(boolean)
    this.checkConfig();

    if (!this.defaultMQProducer.getProducerGroup().equals(MixAll.CLIENT_INNER_PRODUCER_GROUP)) {
        this.defaultMQProducer.changeInstanceNameToPID();
    }
复制代码

这里就是检查一下配置是否正确,以及将生产者的instanceName设置为进程Id。

this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQProducer, rpcHook);
复制代码

我们先看一下MQClientManager这个类。

    private static MQClientManager instance = new MQClientManager();
    public static MQClientManager getInstance() {
        return instance;
    }
复制代码

这是一个明显的饿汉的单例模式,也就是一个JVM中只会有一个MQClientManager instance。

org.apache.rocketmq.client.impl.MQClientManager#getAndCreateMQClientInstance(org.apache.rocketmq.client.ClientConfig, org.apache.rocketmq.remoting.RPCHook)
    public MQClientInstance getAndCreateMQClientInstance(final ClientConfig clientConfig, RPCHook rpcHook) {
        String clientId = clientConfig.buildMQClientId();
        MQClientInstance instance = this.factoryTable.get(clientId);
        if (null == instance) {
            instance =
                new MQClientInstance(clientConfig.cloneClientConfig(),
                    this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook);
            MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
            if (prev != null) {
                instance = prev;
                log.warn("Returned Previous MQClientInstance for clientId:[{}]", clientId);
            } else {
                log.info("Created new MQClientInstance for clientId:[{}]", clientId);
            }
        }

        return instance;
    }
复制代码

创建MQClientInstance实例。用ClientId(客户端Ip+进程id组成+unitname(可选))作为Key,也就是一个JVM中同一个ClientId只会存在一个MQClientInstance实例。

boolean registerOK = mQClientFactory.registerProducer(this.defaultMQProducer.getProducerGroup(), this);
复制代码

将当前Producer注册进MQClientInstance的实例中。这里同一个mQClientFactory中只允许一个相同的producerGroupName的DefaultMQProducer,否则就会注册失败。

this.topicPublishInfoTable.put(this.defaultMQProducer.getCreateTopicKey(), new TopicPublishInfo());
复制代码

用于当Topic不存在时,send自动创建Topic,仅当autoCreateTopicEnable设为true才可用。生产环境中不建议使用。

   if (startFactory) {
       mQClientFactory.start();
   }
复制代码

启动MQClientInstance。

org.apache.rocketmq.client.impl.factory.MQClientInstance#start
    public void start() throws MQClientException {

        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) {
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    }
                    // Start request-response channel
                    // 关于netty网络交互 
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    // 启动一些定时任务
                    this.startScheduledTask();
                    // Start pull service
                    // 消费者相关
                    this.pullMessageService.start();
                    // Start rebalance service
                    // 消费者相关
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [{}] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            }
        }
    }
复制代码

startScheduledTask中大部分定时任务都和消费者相关。所以不在本章中详解。 其中包括120s更新一次本地缓存中的NameServer地址。30s更新一次本地缓存中的topic路由相关信息(Consumer)相关,30s向所有Broker发送自己的一些信息(Consumer相关),50s持久化一次本地consumer offset(Consumer相关),以及60s调整一次消费线程池(Consumer相关)。

消息发送

消息发送主要包含三个步骤:验证消息、查找路由、消息发送(包括异常处理机制)。 我们以同步消息为例,看一下消息发送的流程。

验证消息

org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        Validators.checkMessage(msg, this);
        msg.setTopic(withNamespace(msg.getTopic()));
        return this.defaultMQProducerImpl.send(msg);
    }
复制代码

消息长度验证,消息体长度不能为0以及不能超过4MB。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message)
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
复制代码
org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#send(org.apache.rocketmq.common.message.Message, long)
    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
复制代码

查找路由

TopicPublishInfo

关于Topic路由信息的本地缓存。

这里说两个属性,

  • MessageQueueList:该主题的消息队列
  • sendWhichQueue:用ThreadLocal维护了一个index,用于负载均衡。

在sendDefaultImpl方法中,首先会查找主题相关的路由信息。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#tryToFindTopicPublishInfo
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }

        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            return topicPublishInfo;
        } else {
            // 本次主要是因为topic为创建,看broker是否允许自动创建Topic。
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;
        }
    }
复制代码

如果缓存中包含了该Topic的路由信息且路由信息内的MessageQueue不为空。则直接返回路由信息。如果没有,则向NameServer查询该topic的路由信息,如果有更新本地缓存。

没有的话尝试用默认主题createTopicKey去查询,只有BrokerConfig#autoCreateTopicEnable为true时,NameServer才会返回路由信息。

org.apache.rocketmq.client.impl.factory.MQClientInstance#updateTopicRouteInfoFromNameServer(java.lang.String, boolean, org.apache.rocketmq.client.producer.DefaultMQProducer)
    TopicRouteData topicRouteData;
    if (isDefault && defaultMQProducer != null) {
        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(), 1000 * 3);
        if (topicRouteData != null) {
            for (QueueData data : topicRouteData.getQueueDatas()) {
                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                data.setReadQueueNums(queueNums);
                data.setWriteQueueNums(queueNums);
            }   
        }
    } else {
        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
    }
复制代码

如果isDefault为true,则使用默认主题去查询,如果查询到路由信息,则替换路由信息中读写队列个数为消息生产者默认的队列个数(DefaultTopicQueueNums);如果isDefault为false,则使用参数topic去查询;如果为查询到路由信息,则返回false,表示路由信息未变化。

    TopicRouteData old = this.topicRouteTable.get(topic);
    boolean changed = topicRouteDataIsChange(old, topicRouteData);
    if (!changed) {
        changed = this.isNeedUpdateTopicRouteInfo(topic);
    } else {
        log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
    }
复制代码

如果查询到路由信息,和本地缓存中的路由信息比较,将changed置为true。

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext()) { 
    Entry<String, MQProducerInner> entry = it.next();
    MQProducerInner impl = entry.getValue();
    if (impl != null) {
    impl.updateTopicPublishInfo(topic, publishInfo);
    }
}
复制代码

更新该MQClientInstance所管辖的所有producer的本地缓存。

org.apache.rocketmq.client.impl.factory.MQClientInstance#topicRouteData2TopicPublishInfo
List<QueueData> qds = route.getQueueDatas();
Collections.sort(qds);
for (QueueData qd : qds) {
    // 如果是写队列
    if (PermName.isWriteable(qd.getPerm())) {
        BrokerData brokerData = null;
        // 找到对应的broker
        for (BrokerData bd : route.getBrokerDatas()) {
            if (bd.getBrokerName().equals(qd.getBrokerName())) {
                brokerData = bd;
                break;
            }
        }
        // 如果broker为null 结束本次循环
        if (null == brokerData) {
            continue;
        }
        // 如果broker中不包含master 结束本次循环
        if (!brokerData.getBrokerAddrs().containsKey(MixAll.MASTER_ID)) {
            continue;
        }
        // 将本QueueDate中的读队列放到TopicPublishInfo
        for (int i = 0; i < qd.getWriteQueueNums(); i++) {
            MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
                info.getMessageQueueList().add(mq);
        }
    }
}
复制代码

循环遍历路由信息中QueueData信息,如果队列有写权限,就创建对应的MessageQueue,填充 topicPublishInfo的List<QueueMessage>

路由查找就到此完毕了,不太明白的同学可以跟着源码一起读。

选择消息队列

在上一步,我们已经得到了topic的路由信息。

比如说某topic分布在broker-a和broker-b各四个队列。

那么得到的MessageQueue则是:

  • {"topic":"topic", "brokerName":"broker-a", "queueId":0}
  • {"topic":"topic", "brokerName":"broker-a", "queueId":1}
  • {"topic":"topic", "brokerName":"broker-a", "queueId":2}
  • {"topic":"topic", "brokerName":"broker-a", "queueId":3}
  • {"topic":"topic", "brokerName":"broker-b", "queueId":0}
  • {"topic":"topic", "brokerName":"broker-b", "queueId":1}
  • {"topic":"topic", "brokerName":"broker-b", "queueId":2}
  • {"topic":"topic", "brokerName":"broker-b", "queueId":3}

那么RocketMQ如果选择这些消息队列呢。 首先消息队列发送端采用重试机制,由retryTimesWhenSendFailed指定同步方式重试次数,异步重试机制在收到消息发送结构后执行回调之前进行重试。 选择消息队列有两种方式:

  • sendLatencyFaultEnable = false, 默认不启用Broker故障延迟机制。
  • sendLatencyFaultEnable = true, 启用Broker故障延迟机制。

这两种机制有什么区别呢,我们下面来看一看。

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 开启了故障延迟机制
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }
        //默认不开启故障延迟
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
复制代码

默认机制

org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue(java.lang.String)
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        if (lastBrokerName == null) {
            return selectOneMessageQueue();
        } else {
            int index = this.sendWhichQueue.getAndIncrement();
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                int pos = Math.abs(index++) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = this.messageQueueList.get(pos);
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            return selectOneMessageQueue();
        }
    }
复制代码
org.apache.rocketmq.client.impl.producer.TopicPublishInfo#selectOneMessageQueue()
    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.getAndIncrement();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }
复制代码

如果上一次执行发送消息失败的BrokerName(lastBrokerName)为空,则直接用sendWhichQueue自增然后对消息队列个数进行取模。

如果上一次发送消息失败的lastBrokerName不为空,则规避上次失败了的BrokerName,比如说,上次发送到{"topic":"topic", "brokerName":"broker-a", "queueId":0}这个队列中失败,则本次选择队列的时候就会规避所有brokerName为broker-a的队列。

我们先看一下sendWhichQueue数据结构。

org.apache.rocketmq.client.common.ThreadLocalIndex
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();

    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        if (null == index) {
            index = Math.abs(random.nextInt());
            if (index < 0)
                index = 0;
            this.threadLocalIndex.set(index);
        }

        index = Math.abs(index + 1);
        if (index < 0)
            index = 0;

        this.threadLocalIndex.set(index);
        return index;
    }
复制代码

其实就是本地线程维护了一个线程安全的index,第一次生成则生成一个随机数,每次负载则自增一。也就是说,发送消息在同一线程内才会负载均衡,如果是多个用户每个用户只发送一次消息的话,走的是随机的消息队列,吐槽一下这神一般的负载均衡策略😂。

该算法只能在一次发送中规避上一次发送失败的broker。也就是说只在一次send中有效,如果是当前线程第二次调用send,则无法规避上一次send中发送失败的broker。我们可以仔细看看上面selectOneMessageQueue中lastBrokerName不等于null的情况,先从sendWhichQueue里取出一个index,然后通过临时的index变量来规避上一次发送失败的lastBrokerName。那么当第二次发送,从sendWhichQueue取出的index,大概率就是上一次发送失败的Broke中,那么又会引发重试,造成不必要的性能损耗,那么有什么办法可以暂时将该Broker排除在队列选择的范围外呢?

Broker故障延迟机制

在sendDefaultImpl方法中,发送成功或者抛出异常,都会调用org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem方法。

我们先看一下这个方法。

org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#updateFaultItem
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
    }
复制代码
org.apache.rocketmq.client.latency.MQFaultStrategy#updateFaultItem
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }
复制代码

首先会计算系统是否不可用持续时长。如果isolation为true,则将30000传给computeNotAvailableDuration,否则传currentLatency(当前延迟)。

org.apache.rocketmq.client.latency.MQFaultStrategy#computeNotAvailableDuration
    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }
        return 0;
    }
复制代码

看一下latencyMax和notAvailableDuration。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
复制代码

如果延迟时间达到某个量,则认为不可用时间是某个量,这是一个经验值。

我们继续往下看,updateFaultItem方法。

org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#updateFaultItem
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }
复制代码

根据broker从缓存中查询出FaultItem,如果找到则更新FaultItem,没有则新建。 主要是当前CurrentLatency和StartTimestamp两个字段。

  • CurrentLatency以及StartTimestamp被volatile修饰,多线程下是立即可见的。
  • StartTimestamp为当前系统时间加上需要规避的时长。是判断Broker是否可用最直接的依据。

我们看一下isAvailable方法。

org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl#isAvailable
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            return faultItem.isAvailable();
        }
        return true;
    }
复制代码
org.apache.rocketmq.client.latency.LatencyFaultToleranceImpl.FaultItem#isAvailable
    public boolean isAvailable() {
        return (System.currentTimeMillis() - startTimestamp) >= 0;
    }
复制代码

如果当前时间大于或者等于StartTimestamp则认为Broker是可用的。

我们回过头来看一下选择消息队列方法中的开启了故障延迟的处理办法。

try {
    int index = tpInfo.getSendWhichQueue().getAndIncrement();
    for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
        int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
        if (pos < 0)
            pos = 0;
        // 首先根据index轮询出一个mq
        MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
        // 判断是否可用
        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
            // 这里不是很懂,在github上很多人提issue说这是一个bug但是没有官方回复
            if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                return mq;
        }
    }
    
    // 如果没有找到可用的mq,则尝试从规避的Broker中选择一个Broker
    final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
    int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
    if (writeQueueNums > 0) {
        final MessageQueue mq = tpInfo.selectOneMessageQueue();
        if (notBestBroker != null) {
            mq.setBrokerName(notBestBroker);
            mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
        }
        return mq;
    } else {
        latencyFaultTolerance.remove(notBestBroker);
    }
}
复制代码

消息发送

消息发送API核心入口:

DefaultMQProducerImpl#sendKernelImpl。
    private SendResult sendKernelImpl(final Message msg,
                                      final MessageQueue mq,
                                      final CommunicationMode communicationMode,
                                      final SendCallback sendCallback,
                                      final TopicPublishInfo topicPublishInfo,
                                      final long timeout)
复制代码
  • msg:待发送的消息
  • mq:消息将发送到该消息队列上
  • communicationMode:消息发送模式,SYNC、ASYNC、ONEWAY
  • sendCallback:异步消息回调函数。
  • topicPublishInfo:主题路由信息
  • timeout:消息发送超时时间
    String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    if (null == brokerAddr) {
        tryToFindTopicPublishInfo(mq.getTopic());
        brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
    }
复制代码

尝试从缓存中获取Broker的网络地址,如果未缓存该信息则主动从NameServer更新一下缓存,然后再获取一次。

    if (!(msg instanceof MessageBatch)) {
        MessageClientIDSetter.setUniqID(msg);
    }

    boolean topicWithNamespace = false;
    if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
        msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
        topicWithNamespace = true;
    }

    int sysFlag = 0;
    boolean msgBodyCompressed = false;
    if (this.tryToCompressMessage(msg)) {
        sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
        msgBodyCompressed = true;
    }

    final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
        sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
    }
复制代码

为消息分配全局唯一ID,如果消息体默认超过超过4K,会对消息体采用ZIP压缩,并设置消息的系统标记为MessageSysFlag.COMPRESSED_FLAG,如果消息是PREPARED事务消息,则设置消息系统标记MessageSysFlag.TRANSACTION_PREPARED_TYPE。

if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this);
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    context.setCommunicationMode(communicationMode);
    context.setBornHost(this.defaultMQProducer.getClientIP());
    context.setBrokerAddr(brokerAddr);
    context.setMessage(msg);
    context.setMq(mq);
    context.setNamespace(this.defaultMQProducer.getNamespace());
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context);
    }
复制代码

如果注册了消息发送钩子函数,则执行消息发送之前增强逻辑。

org.apache.rocketmq.client.hook.SendMessageHook
public interface SendMessageHook {
    String hookName();

    void sendMessageBefore(final SendMessageContext context);

    void sendMessageAfter(final SendMessageContext context);
}
复制代码

钩子接口。

SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
// 设置发送组
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
// 设置topic
requestHeader.setTopic(msg.getTopic());
// 默认创建主题Key
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
// 单个Broker默认消息队列数
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
// 队列Id
requestHeader.setQueueId(mq.getQueueId());
// 消息系统标记
requestHeader.setSysFlag(sysFlag);
// 消息发送时间
requestHeader.setBornTimestamp(System.currentTimeMillis());
// 消息标记
requestHeader.setFlag(msg.getFlag());
// 消息扩展属性
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
// 消息重试次数
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
// 是否批次消息
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
    if (reconsumeTimes != null) {
    requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
    MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
    }
    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
    if (maxReconsumeTimes != null) {
        requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
    }
}
复制代码

构建消息发送请求包。

org.apache.rocketmq.client.impl.MQClientAPIImpl#sendMessage
public SendResult sendMessage(
        final String addr,
        final String brokerName,
        final Message msg,
        final SendMessageRequestHeader requestHeader,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final MQClientInstance instance,
        final int retryTimesWhenSendFailed,
        final SendMessageContext context,
        final DefaultMQProducerImpl producer
    ) throws RemotingException, MQBrokerException, InterruptedException {
        long beginStartTime = System.currentTimeMillis();
        RemotingCommand request = null;
        if (sendSmartMsg || msg instanceof MessageBatch) {
            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        } else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());

        switch (communicationMode) {
            case ONEWAY:
                this.remotingClient.invokeOneway(addr, request, timeoutMillis);
                return null;
            case ASYNC:
                final AtomicInteger times = new AtomicInteger();
                long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeAsync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
                    retryTimesWhenSendFailed, times, context, producer);
                return null;
            case SYNC:
                long costTimeSync = System.currentTimeMillis() - beginStartTime;
                if (timeoutMillis < costTimeSync) {
                    throw new RemotingTooMuchRequestException("sendMessage call timeout");
                }
                return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
            default:
                assert false;
                break;
        }

        return null;
    }
复制代码

根据消息发送方式,同步、异步、单向方式进行网络传输。

    if (this.hasSendMessageHook()) {
        context.setSendResult(sendResult);
        this.executeSendMessageHookAfter(context);
    }
复制代码

如果注册了钩子函数,执行after逻辑。

总结

  • 客户端调用producer发送消息时,会先从NameServer获取该topic的路由信息。消息头code为GET_ROUTEINFO_BY_TOPIC
  • 从NameServer返回的路由信息,包括topic包含的队列列表和broker列表
  • Producer端根据查询策略,选出其中一个队列,用于后续存储消息
  • 每条消息会生成一个唯一id,添加到消息的属性中。属性的key为UNIQ_KEY
  • 对消息做一些特殊处理,比如:超过4M会对消息进行压缩
  • producer向Broker发送rpc请求,将消息保存到broker端。消息头的code为SEND_MESSAGE或SEND_MESSAGE_V2(配置文件设置了特殊标志)

#参考文献




原文地址:访问原文地址
快照地址: 访问文章快照