07、RocketMQ 源码 - Producer发送消息的总体流程
- 单向发送: 把消息发向Broker服务器, 不管Broker是否接收, 只管发, 不管结果。
- 同步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。
- 异步发送: 把消息发向Broker服务器, 如果Broker成功接收, 可以得到Broker的响应。异步所以发送消息后, 不用等待, 等到Broker服务器的响应调用回调。
DefaultMQProducer提供了很多send方法的重载:
1.send源码入口
1.1 同步消息
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//调用defaultMQProducerImpl#send发送消息
return this.defaultMQProducerImpl.send(msg);
}
defaultMQProducerImpl#send发送消息。
调用另一个send(), 超时时间为3s。
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//调用另一个send方法,设置超时时间参数,默认3000ms
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
/**
* DefaultMQProducerImpl的方法
*
* @param msg 消息
* @param timeout 超时时间,毫秒值
*/
public SendResult send(Message msg,
long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//调用另一个sendDefaultImpl方法,设置消息发送模式为SYNC,即同步;设置回调函数为null
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
该方法内部又调用另一个sendDefaultImpl方法, 设置消息发送方式为SYNC, 为同步, 设置回调函数为null。
1.2 单向消息
单向消息使用sendOneway发送。
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException
{
//根据namespace设置topic
msg.setTopic(withNamespace(msg.getTopic()));
//调用defaultMQProducerImpl#sendOneway发送消息
this.defaultMQProducerImpl.sendOneway(msg);
}
defaultMQProducerImpl#sendOneway。
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {
try {
//调用sendDefaultImpl方法,设置消息发送模式为ONEWAY,即单向;设置回调函数为null;设置超时时间参数,默认3000ms
this.sendDefaultImpl(msg, CommunicationMode.ONEWAY, null, this.defaultMQProducer.getSendMsgTimeout());
} catch (MQBrokerException e) {
throw new MQClientException("unknown exception", e);
}
}
最终调用sendDefaultImpl方法, 发送模式为ONEWAY, 设置回调函数为null, 超时时间为3s。
1.3 异步消息
异步消息使用带有callback函数的send方法发送。
public void send(Message msg, SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException {
//根据namespace设置topic
msg.setTopic(withNamespace(msg.getTopic()));
//调用defaultMQProducerImpl#send发送消息,带有sendCallback参数
this.defaultMQProducerImpl.send(msg, sendCallback);
}
该方法内部调用defaultMQProducerImpl#send方法发送消息, 带sendCallback参数。
public void send(Message msg, SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException {
//该方法内部又调用另一个send方法,设置超时时间参数,默认3000ms。
send(msg, sendCallback, this.defaultMQProducer.getSendMsgTimeout());
}
public void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
//调用起始时间
final long beginStartTime = System.currentTimeMillis();
//获取异步发送执行器线程池
ExecutorService executor = this.getAsyncSenderExecutor();
try {
/*
* 使用线程池异步的执行sendDefaultImpl方法,即异步发送消息
*/
executor.submit(new Runnable() {
@Override
public void run() {
/*
* 发送之前计算超时时间,如果超时则不发送,直接执行回调函数onException方法
*/
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout > costTime) {
try {
//调用sendDefaultImpl方法执行发送操作
sendDefaultImpl(msg, CommunicationMode.ASYNC, sendCallback, timeout - costTime);
} catch (Exception e) {
//抛出异常,执行回调函数onException方法
sendCallback.onException(e);
}
} else {
//超时,执行回调函数onException方法
sendCallback.onException(
new RemotingTooMuchRequestException("DEFAULT ASYNC send call timeout"));
}
}
});
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}
}
方法内部会获取获取异步发送执行器线程池, 使用线程池异步的执行sendDefaultImpl方法, 即异步发送。
发送之前计算超时时间, 如果超时则执行回调函数onException()。
2.sendDefaultImpl发送消息实现
该方法位于DefaultMQProducerImpl中, 无论是同步, 异步, 还是单向, 最后调用的都是sendDefaultImpl方法。
1、 makeSureStateOK方法,确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常;
2、 检查消息的合法性;
3、 调用tryToFindTopicPublishInfo方法,尝试查找消息的一个topic路由,进行发送消息;
4、 计算循环发送消息的总次数timesTotal,默认情况下,同步为3,允许重试2次,其他模式为1,即不允许重试实际上,异步发送消息可以最多重试2次,不是这里实现的;
5、 调用selectOneMessageQueue方法,选择一个队列MessageQueue,支持失败故障转移;
6、 调用sendKernelImpl方法发送消息,同步、异步、单向发送消息都是这个方法实现的;
7、 调用updateFaultItem方法,更新本地错误表缓存数据,用于延迟时间的故障转移的功能;
8、 根据发送模式的不同,如果是异步或者单向发送则直接返回,如果是同步的话,如果开启了retryAnotherBrokerWhenNotStoreOK,那么如果返回值不返回SEND_OK状态,则重新执行;
9、 此过程中,如果抛出RemotingException、MQClientException、以及MQBrokerException异常,那么会重试,如果抛出InterruptedException,或者超时则不会重试;
/**
* DefaultMQProducerImpl的方法
*
* @param msg 方法
* @param communicationMode 通信模式
* @param sendCallback 回调方法
* @param timeout 超时时间
*/
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
/*
* 1 确定此producer的服务状态正常,如果服务状态不是RUNNING,那么抛出异常
*/
this.makeSureStateOK();
/*
* 2 校验消息的合法性
*/
Validators.checkMessage(msg, this.defaultMQProducer);
//生成本次调用id
final long invokeID = random.nextLong();
//开始时间戳
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
//结束时间戳
long endTimestamp = beginTimestampFirst;
/*
* 3 尝试查找消息的一个topic路由,用以发送消息
*/
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
//找到有效的topic信息
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
/*
* 4 计算发送消息的总次数
* 同步模式为3,即默认允许重试2次,可更改重试次数;其他模式为1,即不允许重试,不可更改
*/
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
//记录每一次重试时候发送消息目标Broker名字的数组
String[] brokersSent = new String[timesTotal];
/*
* 在循环中,发送消息,包含消息重试的逻辑,总次数默认不超过3
*/
for (; times < timesTotal; times++) {
//上次使用过的broker,可以为空,表示第一次选择
String lastBrokerName = null == mq ? null : mq.getBrokerName();
/*
* 5 选择一个消息队列MessageQueue
*/
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
//设置brokerName
brokersSent[times] = mq.getBrokerName();
try {
//调用的开始时间
beginTimestampPrev = System.currentTimeMillis();
//如果还有可调用次数,那么
if (times > 0) {
//在重新发送期间用名称空间重置topic
msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
}
//现在调用的开始时间 减去 开始时间,判断时候在调用发起之前就超时了
long costTime = beginTimestampPrev - beginTimestampFirst;
//如果已经超时了,那么直接结束循环,不再发送
//即超时的时候,即使还剩下重试次数,也不会再继续重试
if (timeout < costTime) {
callTimeout = true;
break;
}
/*
* 6 异步、同步、单向发送消息
*/
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
//方法调用结束时间戳
endTimestamp = System.currentTimeMillis();
/*
* 7 更新本地错误表缓存数据,用于延迟时间的故障转移的功能
*/
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
/*
* 8 根据发送模式执行不同的处理
*/
switch (communicationMode) {
//异步和单向模式直接返回null
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
//同步模式,如果开启了retryAnotherBrokerWhenNotStoreOK开关,那么如果不是返回SEND_OK状态,则仍然会执行重试发送
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
//如果发送成功,则返回
return sendResult;
default:
break;
}
} catch (RemotingException e) {
//RemotingException异常,会执行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQClientException e) {
//MQClientException异常,会执行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
continue;
} catch (MQBrokerException e) {
//MQBrokerException异常
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
exception = e;
//如果返回的状态码属于一下几种,则支持重试:
//ResponseCode.TOPIC_NOT_EXIST,
//ResponseCode.SERVICE_NOT_AVAILABLE,
//ResponseCode.SYSTEM_ERROR,
//ResponseCode.NO_PERMISSION,
//ResponseCode.NO_BUYER_ID,
//ResponseCode.NOT_IN_CURRENT_UNIT
if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {
continue;
} else {
//其他状态码不支持重试,如果有结果则返回,否则直接抛出异常
if (sendResult != null) {
return sendResult;
}
throw e;
}
} catch (InterruptedException e) {
//InterruptedException异常,不会执行重试
endTimestamp = System.currentTimeMillis();
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
log.warn(msg.toString());
log.warn("sendKernelImpl exception", e);
log.warn(msg.toString());
throw e;
}
} else {
break;
}
}
/*
* 抛出异常的操作
*/
if (sendResult != null) {
return sendResult;
}
String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
times,
System.currentTimeMillis() - beginTimestampFirst,
msg.getTopic(),
Arrays.toString(brokersSent));
info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
MQClientException mqClientException = new MQClientException(info, exception);
if (callTimeout) {
throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
}
if (exception instanceof MQBrokerException) {
mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
} else if (exception instanceof RemotingConnectException) {
mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
} else if (exception instanceof RemotingTimeoutException) {
mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
} else if (exception instanceof MQClientException) {
mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
}
throw mqClientException;
}
validateNameServerSetting();
throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
}
2.1 makeSureStateOK确定生产者服务状态
确定此producer的服务状态正常, 如果服务状态不是RUNNING, 那么抛出异常。
/**
* DefaultMQProducerImpl的方法
*/
private void makeSureStateOK() throws MQClientException {
//服务状态不是RUNNING,那么抛出MQClientException异常。
if (this.serviceState != ServiceState.RUNNING) {
throw new MQClientException("The producer service state not OK, "
+ this.serviceState
+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
null);
}
}
2.2 checkMessage校验消息的合法性
1、 如果msg消息为null,抛出异常;
2、 校验topic,如果topic为空,或者长度大于127字符,或者topic中有非法字符则抛出异常,如果当前topic是不为允许使用的系统topic,抛出异常;
3、 校验消息体,如果消息体为null,或者为空数组,或者消息字节数组长度大于4M,抛出异常;
/**
* Validators的方法
*/
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
//如果消息为null,抛出异常
if (null == msg) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
}
/*
* 校验topic
*/
//如果topic为空,或者长度大于127个字符,或者topic的字符串不符合 "^[%|a-zA-Z0-9_-]+$"模式,即包含非法字符,那么抛出异常
Validators.checkTopic(msg.getTopic());
//如果当前topic是不为允许使用的系统topic SCHEDULE_TOPIC_XXXX,那么抛出异常
Validators.isNotAllowedSendTopic(msg.getTopic());
// body
//如果消息体为null,那么抛出异常
if (null == msg.getBody()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
}
//如果消息体为空数组,那么抛出异常
if (0 == msg.getBody().length) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
}
//如果消息 字节数组长度大于4,194,304,即消息的大小大于4M,那么抛出异常
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
"the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
}
}
public static void checkTopic(String topic) throws MQClientException {
//如果topic为空,那么抛出异常
if (UtilAll.isBlank(topic)) {
throw new MQClientException("The specified topic is blank", null);
}
//如果topic长度大于127个字符,那么抛出异常
if (topic.length() > TOPIC_MAX_LENGTH) {
throw new MQClientException(
String.format("The specified topic is longer than topic max length %d.", TOPIC_MAX_LENGTH), null);
}
//如果topic字符串包含非法字符,那么抛出异常
if (isTopicOrGroupIllegal(topic)) {
throw new MQClientException(String.format(
"The specified topic[%s] contains illegal characters, allowing only %s", topic,
"^[%|a-zA-Z0-9_-]+$"), null);
}
}
2.3 tryToFindTopicPublishInfo查找topic的发布信息
该方法用于查找指定topic的发布信息TopicPublishInfo。
1、 首先在本地缓存topicPublishInfoTable获取;
2、 updateTopicRouteInfoFromNameServer()获取,从nameServer同步此topic的路由配置信息;
3、 从nameServer同步topic数据;
/**
* DefaultMQProducerImpl的方法
* <p>
* 查找指定topic的推送信息
*/
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//尝试直接从producer的topicPublishInfoTable中获取topic信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//如果没有获取到有效信息,
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
//那么立即创建一个TopicPublishInfo
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//立即从nameServer同步此topic的路由配置信息,并且更新本地缓存
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
//再次获取topicPublishInfo
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
//如果找到的路由信息是可用的,直接返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//再次从nameServer同步topic的数据,不过这次使用默认的topic “TBW102”去找路由配置信息作为本topic参数信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
- 首先在本地缓存topicPublishInfoTable获取, 如果没有获取到, 调用updateTopicRouteInfoFromNameServer方法从nameServer同步此topic的路由配置信息, 并更新缓存。如果还是没有获取到有效数据, 再次从nameServer同步topic数据, 不过这次默认的topic是 “TBW102”去找路由配置信息作为本topic参数信息。
TopicPublishInfo: topic的发布信息
/**
* 是否是顺序消息
*/
private boolean orderTopic = false;
/**
* 是否包含路由信息
*/
private boolean haveTopicRouterInfo = false;
/**
* topic的消息队列集合
*/
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
/**
* 当前线程线程的消息队列的下标,循环选择消息队列使用+1
*/
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
/**
* topic路由信息,包括topic的队列信息queueDatas,topic的broker信息brokerDatas,顺序topic配置orderTopicConf,消费过滤信息filterServerTable等属性
*/
private TopicRouteData topicRouteData;
2.4 计算发送次数timesTotal
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 +
this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
计算循环发送消息的总次数timesTotal, 默认情况下, 同步为3, 允许重试2次, 其他模式为1, 即不允许重试。实际上, 异步发送消息可以最多重试2次, 不是这里实现的。是MQClientAPIImpl#sendMessage方法中。
2.5 selectOneMessageQueue选择消息队列
方法内部调用mqFaultStrategy#selectOneMessageQueue方法:
/**
* DefaultMQProducerImpl的方法
*
* 选择一个消息队列
* @param tpInfo topic信息
* @param lastBrokerName 上次使用过的broker
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//调用mqFaultStrategy#selectOneMessageQueue方法
return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}
mqFaultStrategy#selectOneMessageQueue方法支持故障转移机制:
1、 首先判断是否开启了发送延迟故障转移机制,即判断sendLatencyFaultEnable属性是否为true,默认为false不开启;
1、 如果开启的话,首先仍然是遍历消息队列,按照轮询的方式选取一个消息队列,当消息队列可用时,选择消息队列的工作就结束,否则循环选择其他队列如果该mq的broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中,或者当前时间戳大于该broker下一次开始可用的时间戳,则表示无障碍;
2、 如果没有选出无故障mq,那么从LatencyFaultTolerance维护的不是最好的broker集合faultItemTable中随机选择一个broker,随后判断如果写队列数大于0,那么选择该Broker然后遍历消息队列,采用取模的方式获取一个队列,重置其brokerName和queueId,进行消息发送;
3、 如果上面的步骤抛出了异常,那么遍历消息队列,采用取模的方式获取一个队列;
2、 如果没有开启延迟故障转移机制,那么遍历消息队列,采用取模轮询的方式获取一个brokerName与lastBrokerName不相等的队列,即不会再次选择上次发送失败的broker如果没有找到一个不同broker的mq,那么退回到轮询的方式;
selectOneMessageQueue方法选择mq的时候的故障转移机制, 目的是为了保证每次消息尽快的发送, 是一种高可用手段。
1、 延迟时间的故障转移,消息队列选择时候,可用过滤mq认为不可用的broker,以免不断为宕机的broker发送消息,选取一个延迟比较短的broker,实现消息发送高可用;
2、 没有开启延迟时间的故障转移的时候,在轮询选择mq的时候,不会选择上一次发送失败的broker,实现消息发送高可用;
/**
* MQFaultStrategy的方法
* <p>
* 选择一个消息队列,支持故障延迟转移
*
* @param tpInfo topic信息
* @param lastBrokerName 上次使用过的broker
*/
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
/*
* 判断是否开启了发送延迟故障转移机制,默认false不打开
* 如果开启了该机制,那么每次选取topic下对应的queue时,会基于之前执行的耗时,在有存在符合条件的broker的前提下,优选选取一个延迟较短的broker,否则再考虑随机选取。
*/
if (this.sendLatencyFaultEnable) {
try {
//当前线程线程的消息队列的下标,循环选择消息队列使用+1
int index = tpInfo.getSendWhichQueue().incrementAndGet();
//遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
//取模
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
//获取该消息队列
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
//如果当前消息队列是可用的,即无故障,那么直接返回该mq
//如果该broker不存在LatencyFaultTolerance维护的faultItemTable集合属性中,或者当前时间已经大于该broker下一次开始可用的时间点,表示无故障
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
//没有选出无故障的mq,那么一个不是最好的broker集合中随机选择一个
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
//如果写队列数大于0,那么选择该broker
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
//遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
//重置其brokerName,queueId,进行消息发送
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
//如果写队列数小于0,那么移除该broker
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//如果上面的步骤抛出了异常,那么遍历消息队列,采用取模的方式获取一个队列,即轮询的方式
return tpInfo.selectOneMessageQueue();
}
//如果没有发送延迟故障转移机制,那么那么遍历消息队列,即采用取模轮询的方式
//获取一个brokerName与lastBrokerName不相等的队列,即不会再次选择上次发送失败的broker
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
selectOneMessageQueue方法有两个重载方法, 一个无参的, 一个有参的。
无参的表示选择mq无限制:
/**
* TopicPublishInfo的方法
* <p>
* 轮询的选择一个mq
*/
public MessageQueue selectOneMessageQueue() {
//获取下一个index
int index = this.sendWhichQueue.incrementAndGet();
//取模计算索引
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
//获取该索引的mq
return this.messageQueueList.get(pos);
}
有参数, 其参数是上一次发送失败的brokerName, 表示不会选择上一次失败的brokerName的mq。如果最后没有选择出来, 那么走轮询的逻辑。
/**
* TopicPublishInfo的方法
*
* @param lastBrokerName 上一次发送失败的brokerName
*/
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
//如果lastBrokerName为null,即第一次发送,那么轮询选择一个
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {
for (int i = 0; i < this.messageQueueList.size(); i++) {
//轮询选择一个mq
int index = this.sendWhichQueue.incrementAndGet();
int pos = Math.abs(index) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
//如果mq的brokerName不等于lastBrokerName,就返回,否则选择下一个
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
//没有选出来,那么轮询选择一个
return selectOneMessageQueue();
}
}
2.6 sendKernelImpl发送消息
选择了队列后, 调用sendKernelImpl发送消息
1、 首先调用findBrokerAddressInPublish方法从brokerAddrTable中查找Masterbroker地址如果找不到,tryToFindTopicPublishInfo从nameServer远程拉取配置,并更新本地缓存,再次尝试获取Masterbroker地址;
2、 调用brokerVIPChannel判断是否开启vip通道,如果开启了,那么将brokerAddr的port–2,vip通道的端口为普通端口-2;
3、 如果不是批量消息,那么设置唯一的uniqId;
4、 如果不是批量消息,且消息体大于4K,那么压缩消息;
5、 如果存在CheckForbiddenHook,则执行checkForbidden钩子方法如果存在SendMessageHook,则执行sendMessageBefore钩子方法;
6、 设置请求头信息SendMessageRequestHeader,请求头包含各种基本属性,producerGroup,topic,queueId,并且将消息重试次数和最大重试次数存入请求头中;
7. 根据不同的发送模式发送消息, 如果是异步, 需要先克隆并还原消息, 最终异步, 同步, 单向都是调用MQClientAPIImpl#sendMessage方法发送消息的。 8. 如果MQClientAPIImpl#sendMessage方法正常发送或者抛出RemotingException、MQBrokerException、InterruptedException异常, 判断如果存在SendMessageHook, 执行sendMessageAfter钩子方法。 9、 finally中对消息进行恢复;
/**
* DefaultMQProducerImpl的方法
* 发送消息
*
* @param msg 消息
* @param mq mq
* @param communicationMode 发送模式
* @param sendCallback 发送回调
* @param topicPublishInfo topic信息
* @param timeout 超时时间
* @return 发送结果
*/
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
//开始时间
long beginStartTime = System.currentTimeMillis();
/*
* 1 根据brokerName从brokerAddrTable中查找broker地址
*/
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
//如果本地找不到 broker 的地址
if (null == brokerAddr) {
/*
* 2 从nameServer远程拉取配置,并更新本地缓存
* 该方法此前就学习过了
*/
tryToFindTopicPublishInfo(mq.getTopic());
//再次获取地址
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
/*
* 3 vip通道判断
*/
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
//for MessageBatch,ID has been set in the generating process
/*
* 4 如果不是批量消息,那么尝试生成唯一uniqId,即UNIQ_KEY属性。MessageBatch批量消息在生成时就已经设置uniqId
* uniqId也被称为客户端生成的msgId,从逻辑上代表唯一一条消息
*/
if (!(msg instanceof MessageBatch)) {
MessageClientIDSetter.setUniqID(msg);
}
/*
* 设置nameSpace为实例Id
*/
boolean topicWithNamespace = false;
if (null != this.mQClientFactory.getClientConfig().getNamespace()) {
msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());
topicWithNamespace = true;
}
//消息标识符
int sysFlag = 0;
//消息压缩标识
boolean msgBodyCompressed = false;
/*
* 5 尝试压缩消息
*/
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
//事务消息标志,prepare消息
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
/*
* 6 如果存在CheckForbiddenHook,则执行checkForbidden方法
* 为什么叫禁止钩子呢,可能是想要使用者将不可发送消息的检查放在这个钩子函数里面吧(猜测)
*/
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
/*
* 7 如果存在SendMessageHook,则执行sendMessageBefore方法
*/
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
context.setNamespace(this.defaultMQProducer.getNamespace());
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
/*
* 8 设置请求头信息
*/
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
//针对重试消息的处理
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
//获取消息重新消费次数属性值
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
//将重新消费次数设置到请求头中,并且清除该属性
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
//获取消息的最大重试次数属性值
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
//将最大重新消费次数设置到请求头中,并且清除该属性
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
/*
* 9 根据不同的发送模式,发送消息
*/
SendResult sendResult = null;
switch (communicationMode) {
/*
* 异步发送模式
*/
case ASYNC:
/*
* 首先克隆并还原消息
*
* 该方法的finally中已经有还原消息的代码了,为什么在异步发送消息之前,还要先还原消息呢?
*
* 因为异步发送时 finally 重新赋值的时机并不确定,有很大概率是在第一次发送结束前就完成了 finally 中的赋值,
* 因此在内部重试前 msg.body 大概率已经被重新赋值过,而 onExceptionImpl 中的重试逻辑 MQClientAPIImpl.sendMessageAsync 不会再对数据进行压缩,
* 简言之,在异步发送的情况下,如果调用 onExceptionImpl 内部的重试,有很大概率发送的是无压缩的数据
*/
Message tmpMessage = msg;
boolean messageCloned = false;
//如果开启了消息压缩
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
//克隆一个message
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
//恢复原来的消息体
msg.setBody(prevBody);
}
//如果topic整合了namespace
if (topicWithNamespace) {
if (!messageCloned) {
tmpMessage = MessageAccessor.cloneMessage(msg);
messageCloned = true;
}
//还原topic
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
/*
* 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常
*/
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
/*
* 10 发送异步消息
*/
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
/*
* 单向、同步发送模式
*/
case ONEWAY:
case SYNC:
/*
* 发送消息之前,进行超时检查,如果已经超时了那么取消本次发送操作,抛出异常
*/
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
/*
* 10 发送单向、同步消息
*/
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
/*
* 9 如果存在SendMessageHook,则执行sendMessageAfter方法
*/
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
//返回执行结果
return sendResult;
//如果抛出了异常,如果存在SendMessageHook,则执行sendMessageAfter方法
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
/*
* 对消息进行恢复
* 1、因为客户端可能还需要查看原始的消息内容,如果是压缩消息,则无法查看
* 2、另外如果第一次压缩后消息还是大于4k,如果不恢复消息,那么客户端使用该message重新发送的时候,还会进行一次消息压缩
*/
msg.setBody(prevBody);
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));
}
}
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}
2.6.1 findBrokerAddressInPublish查找broker地址#
根据brokerName向brokerAddrTable查找数据, 因为生产者只会向Master发送数据, 所以返回的是Master地址。
/**
* MQClientInstance的方法
*/
public String findBrokerAddressInPublish(final String brokerName) {
//查询brokerAddrTable缓存的数据
HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
//返回Mater节点的地址
if (map != null && !map.isEmpty()) {
return map.get(MixAll.MASTER_ID);
}
return null;
}
2.6.2 brokerVIPChannel判断vip通道#
调用brokerVIPChannel判断是否开启vip通道, 如果开启了, 那么将brokerAddr的port – 2, vip通道的端口为普通端口-2。
/**
* MixAll的方法
*/
public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {
//如果开启了vip通道
if (isChange) {
int split = brokerAddr.lastIndexOf(":");
String ip = brokerAddr.substring(0, split);
String port = brokerAddr.substring(split + 1);
//重新拼接brokerAddr,其中port - 2
String brokerAddrNew = ip + ":" + (Integer.parseInt(port) - 2);
return brokerAddrNew;
} else {
//如果没有开启vip通道,那么返回原地址
return brokerAddr;
}
}
消费者拉取消息只能请求普通通道, 但是生产者可以选择vip和普通通道。
2.6.3 setUniqID生成uniqId#
设置到UNIQ_KEY属性中, 批量消息在生成时就已经设置uniqId。
uniqId表示客户端生成的唯一一条消息。
/**
* MessageClientIDSetter的方法
*/
public static void setUniqID(final Message msg) {
//如果这条消息不存在"UNIQ_KEY"属性,那么创建uniqId并且存入"UNIQ_KEY"属性中
if (msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX) == null) {
msg.putProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, createUniqID());
}
}
2.6.4 tryToCompressMessage压缩消息#
消息压缩, 压缩比为5, 压缩之后, 设置压缩标志, 批量消息不支持压缩, 消息压缩有利于网络传输数据。
/**
* DefaultMQProducerImpl的方法
*/
private boolean tryToCompressMessage(final Message msg) {
//如果是批量消息,那么不进行压缩
if (msg instanceof MessageBatch) {
//batch dose not support compressing right now
return false;
}
byte[] body = msg.getBody();
if (body != null) {
//如果消息长度大于4K
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
//进行压缩,使用的JDK自带的压缩类
byte[] data = UtilAll.compress(body, zipCompressLevel);
if (data != null) {
//重新设置到body中
msg.setBody(data);
return true;
}
} catch (IOException e) {
log.error("tryToCompressMessage exception", e);
log.warn(msg.toString());
}
}
}
return false;
}
2.7 updateFaultItem更新故障表
发送消息后, 无论正常与否, 调用updateFaultItem更新故障表, 更新本地错误表缓存数据, 用于延迟时间的故障转移功能。
故障转移功能在此前的selectOneMessageQueue方法中被使用到, 用于查找一个可用的消息队列, updateFaultItem方法判断是否开启了故障转移功能, 会更新LatencyFaultTolerance维护的faultItemTable集合属性中的异常broker数据。
/**
* DefaultMQProducerImpl的方法
* @param brokerName brokerName
* @param currentLatency 当前延迟
* @param isolation 是否使用默认隔离
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
//调用MQFaultStrategy#updateFaultItem方法
this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}
MQFaultStrategy#updateFaultItem方法。其根据本次发送消息的延迟时间currentLatency, 计算出broker的隔离时间duration, 即是broker的下一个可用时间点。用于更新故障记录表。
/**
* MQFaultStrategy的方法
*
* @param brokerName brokerName
* @param currentLatency 当前延迟
* @param isolation 是否使用默认隔离时间
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
//如果开启了故障转移,即sendLatencyFaultEnable为true,默认false
if (this.sendLatencyFaultEnable) {
//根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration
//如果isolation为true,则使用默认隔离时间30000,即30s
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
//更新故障记录表
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
2.7.1 computeNotAvailableDuration计算隔离时间#
根据消息当前延迟currentLatency计算当前broker的故障延迟的时间duration, 据此即可以计算出该broker的下一个可用时间点。
//延迟等级
private long[] latencyMax = {
50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
//不可用时间等级
private long[] notAvailableDuration = {
0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
/**
* MQFaultStrategy的方法
*
* @param currentLatency 当前延迟
* @return 故障延迟的时间
*/
private long computeNotAvailableDuration(final long currentLatency) {
//倒叙遍历latencyMax
for (int i = latencyMax.length - 1; i >= 0; i--) {
//选择broker延迟时间对应的broker不可用时间,默认30000对应的故障延迟的时间为600000,即10分钟
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
latencyMax为延迟等级, notAvailableDuration为隔离时间。之间的关系:
2.7.2 updateFaultItem更新故障表#
该方法更新LatencyFaultToleranceImpl维护的faultItemTable集合属性中的异常broker的故障信息, 将会设置发送消息的延迟时间currentLatency属性, 以及下一个可用时间点LatencyFaultToleranceImpl属性。
下次可用时间LatencyFaultToleranceImpl属性= 现在的时间 + 隔离的时间, 在selectOneMessageQueue方法选取消息队列的时候, 如果开启了故障转移, 那么会查找下一个可用时间点小于当前时间点的broker的队列来发送消息。
/**
* LatencyFaultToleranceImpl的方法
*
* @param name brokerName
* @param currentLatency 当前延迟
* @param notAvailableDuration 隔离时间(不可用时间)
*/
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
//获取该broker此前的故障记录数据
FaultItem old = this.faultItemTable.get(name);
//如果此前没有数据,那么设置一个新对象肌凝乳
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
//设置当前延迟
faultItem.setCurrentLatency(currentLatency);
//设置下一次可用时间点
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
//已有故障记录,更新
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
//已有故障记录,更新
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
3.总结
1、 生产者消息重试:RocketMQ的消费者消息重试和生产者消息重投;
2、 生产者故障转移:通过sendLatencyFaultEnable属性配置是否开启,目的是为了保证每次发送消息成功,是一种高可用手段;
1、 延迟时间的故障转移,需要sendLatencyFaultEnable为true,消息队列选择时候,可用过滤mq认为不可用的broker,以免不断为宕机的broker发送消息,选取一个延迟比较短的broker,实现消息发送高可用;
2、 没有开启延迟时间的故障转移的时候,在轮询选择mq的时候,不会选择上一次发送失败的broker,实现消息发送高可用;
3、 Vip通道:VIP通道用于隔离读写操作,消费者拉取消息只能请求普通通道,生产者可用选择vip或者普通通道;
4、 故障转移表:RocketMQ的Producer生产者故障转移依赖于故障转移表实现,是一个HasmMap消息发送结束之后,会根据本次发送消息的延迟时间currentLatency,计算该broker的隔离时间duration,即为broker的下一次可用时间点然后更新故障记录表故障转移表的key为brokerName,value为未来该broker可用时间;