跳到主要内容

20、RocketMQ 源码解析 - 事务消息

版本

基于rocketmq-all-4.3.1版本;

简介

1、 事务消息流程;

  • 应用程序事务发起者发送事务Prepare消息到MQ的broker端,发送成功后,Broker会回调事务监听器的本地事务执行方法执行本地事务

  • RocketMQ的broker收到Prepare消息后,先对消息的topic与消费队列进行备份,然后存储到主题为 RMQ_SYS_TRANS_HALF_TOPIC 的队列中(只有一个队列)

  • broker端启动时会启动一个定时任务,取出RMQ_SYS_TRANS_HALF_TOPIC中的消息向消息的发送者(生产组中的任意一个Producer)发起回查。发送端根据本地事务的具体执行状态返回提交/回滚/事务未知状态

  • 如果返回提交/回滚则broker对事务消息进行提交或者回滚

  • 如果返回了未知,则等待下次继续进行回查。达到回查最大次数依旧无法获取事务状态的消息,broker会对该事务消息做回滚操作。

  • 如果是提交事务,将事务消息恢复,写入到原始的Topic中,然后向RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中写入一条消息。消息体就是当前这条事务消息的队列偏移值

  • 如果是回滚事务,只是向RMQ_SYS_TRANS_OP_HALF_TOPIC的队列中写入一条消息。消息体就是当前这条事务消息的队列偏移值

2、 整体的交互过程;

 

3、 为什么要采用额外的两个Topic来实现事务消息呢?;

  • 为了让消费端不能消费Prepare消息,将Prepare消息先写入到RMQ_SYS_TRANS_HALF_TOPIC的队列中。这样的好处就是服务端处理发送事务Prepare消息的逻辑与普通消息的逻辑没什么区别,可以直接复用,只是额外做一下判断即可
  • RocketMQ所有的消息都是追加的方式写入到文件中,无论是提交或者回滚我们都不能直接修改或者删除原来的Prepare消息。这样会导致很多的脏页,严重影响性能。所以删除和修改就是向另一个RMQ_SYS_TRANS_HALF_TOPIC的队列里写入一条消息。这样当事务回查时,先从此队列中查询,如果找不到,说明是未知状态,此时需要再次回查
  • 以上这样做的缺点就是:所有消息都写入RMQ_SYS_TRANS_HALF_TOPIC队列,如果事务消息较多可能有瓶颈。并且消息会被存储多次

4、 如果回查次数达到最大值或者文件已经过期,当前版本只是打印日志如果Broker发现消息是未知状态,当再次处理时会重新追加这条消息;

发送事务消息

1、 发送事务消息使用TransactionMQProducer,此类继承DefaultMQProducer委托DefaultMQProducerImpl执行发送逻辑;

@Override
public TransactionSendResult sendMessageInTransaction(final Message msg,
    final Object arg) throws MQClientException {
     
       
    if (null == this.transactionListener) {
     
       
        throw new MQClientException("TransactionListener is null", null);
    }

    return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

  1. DefaultMQProducerImpl#sendMessageInTransaction是发送的核心方法。主要逻辑
  • 校验事务监听器和消息相关配置(消息、topic、消息大小等)
  • 设置消息的事务属性,表示这是一个事务prepare消息。设置生产者组。用于回查本地事务,从生产者组中选择随机选择一个生产者。避免由于生产者挂掉导致一直回查失败
  • 发送prepare消息,返回成功结果后(SEND_OK)才执行本地回调事务监听器transactionListener。如果发送发生异常,则不会执行本地事务监听器
  • 发送本地处理结果给Broker,Broker根据状态回滚或者提交
public TransactionSendResult sendMessageInTransaction(final Message msg,
                                                      final LocalTransactionExecuter localTransactionExecuter, final Object arg)
    throws MQClientException {
     
       
    // 校验是否配置事务监听器
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
     
       
        throw new MQClientException("tranExecutor is null", null);
    }
    Validators.checkMessage(msg, this.defaultMQProducer);

    SendResult sendResult = null;
    //设置属性,表示这是一个Prepare消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
  	//设置生产者组。用于回查本地事务,从生产者组中选择随机选择一个生产者。避免由于生产者挂掉导致一直回查失败
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
    try {
     
       
        sendResult = this.send(msg);
    } catch (Exception e) {
     
       
        throw new MQClientException("send message Exception", e);
    }

    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
     
       
        case SEND_OK: {
     
       
            try {
     
       
                // 事务ID
                if (sendResult.getTransactionId() != null) {
     
       
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                }
                //UNIQ_KEY,客户端发送时生成的唯一ID
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                if (null != transactionId && !"".equals(transactionId)) {
     
       
                    msg.setTransactionId(transactionId);
                }
                // 执行本地配合的transactionListener逻辑。localTransactionExecuter已经过时
                if (null != localTransactionExecuter) {
     
       
                    localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                } else if (transactionListener != null) {
     
       
                    log.debug("Used new transaction API");
                    //如果这个执行出现异常可能导致localTransactionState默认就是UNKNOW,如果返回null,则需要赋值一个默认值UNKNOW
                    localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                }
                if (null == localTransactionState) {
     
       
                    localTransactionState = LocalTransactionState.UNKNOW;
                }

                if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
     
       
                    log.info("executeLocalTransactionBranch return {}", localTransactionState);
                    log.info(msg.toString());
                }
            } catch (Throwable e) {
     
       
                log.info("executeLocalTransactionBranch exception", e);
                log.info(msg.toString());
                localException = e;
            }
        }
        break;
        // 未发送成功,设置回滚状态
        case FLUSH_DISK_TIMEOUT:
        case FLUSH_SLAVE_TIMEOUT:
        case SLAVE_NOT_AVAILABLE:
            localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            break;
        default:
            break;
    }

    try {
     
       
        // 发送本地处理结果给Broker
        this.endTransaction(sendResult, localTransactionState, localException);
    } catch (Exception e) {
     
       
        log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
    }

    TransactionSendResult transactionSendResult = new TransactionSendResult();
    transactionSendResult.setSendStatus(sendResult.getSendStatus());
    transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
    transactionSendResult.setMsgId(sendResult.getMsgId());
    transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
    transactionSendResult.setTransactionId(sendResult.getTransactionId());
    transactionSendResult.setLocalTransactionState(localTransactionState);
    return transactionSendResult;
}
  1. DefaultMQProducerImpl#endTransaction发送本地处理结果给Broker,本地处理结果会做转换
  • 如果localTransactionState==COMMIT_MESSAGE,设置为MessageSysFlag.TRANSACTION_COMMIT_TYPE( 0x2 << 2;//1000)
  • 如果localTransactionState==ROLLBACK_MESSAGE,设置为MessageSysFlag.TRANSACTION_ROLLBACK_TYPE(0x3 << 2;//1100)
  • 如果localTransactionState==UNKNOW,设置为MessageSysFlag.TRANSACTION_NOT_TYPE(0;//0000)
public void endTransaction(
    final SendResult sendResult,
    final LocalTransactionState localTransactionState,
    final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
     
       
    final MessageId id;
    // 服务端的消息ID
    if (sendResult.getOffsetMsgId() != null) {
     
       
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
     
       
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    }
    String transactionId = sendResult.getTransactionId();
    // prepare发送到哪个broker,就提交或者回滚在哪个Broker
    final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    //事务消息的提交偏移量
    requestHeader.setCommitLogOffset(id.getOffset());
    switch (localTransactionState) {
     
       
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
            break;
        default:
            break;
    }

    requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
    requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    // 携带本地执行事务回调的异常信息
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;
    //发送事务本地处理结果给Broker
    this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
        this.defaultMQProducer.getSendMsgTimeout());
}

Broker处理Prepare消息

  1. 与处理普通消息一样,事务的prepare消息也是通过SendMessageProcessor#processRequest处理。针对事务Prepare消息的存储与普通消息不同的是,其委托TransactionalMessageService进行处理
 //获取producer发送的时候设置的事务消息属性[prepare消息  commit消息]
 String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
 if (traFlag != null && Boolean.parseBoolean(traFlag)) {
     
       
     //是否允许事务消息存储,默认允许
     if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
     
       
         response.setCode(ResponseCode.NO_PERMISSION);
         response.setRemark(
             "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                 + "] sending transaction message is forbidden");
         return response;
     }
     putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
 } else {
     
       
     putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
 }

  1. TransactionalMessageServiceImpl#prepareMessage调用TransactionalMessageBridge#putHalfMessage进行预存储事务消息,将消息原始的Topic即QueueId信息备份到属性中(为了后续提交时使用),将消息的原始Topic更改为RMQ_SYS_TRANS_HALF_TOPIC,此Topic只有一个队列0。默认是调用DefaultMessageStore#putMessage。这里就跟普通消息的存储没有任何区别了。
//TransactionalMessageServiceImpl#prepareMessage
@Override
public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
     
       
    return transactionalMessageBridge.putHalfMessage(messageInner);
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
     
       
    // 把消息的原始topic及队列信息存储到属性中,因为要写到事务prepare的主题RMQ_SYS_TRANS_HALF_TOPIC的队列里
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
        String.valueOf(msgInner.getQueueId()));
    // 去掉事务标记,设置为0
    msgInner.setSysFlag(
        MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
    // prepare消息的主题RMQ_SYS_TRANS_HALF_TOPIC
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    // RMQ_SYS_TRANS_HALF_TOPIC只有一个队列
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
    return msgInner;
}

//TransactionalMessageBridge#putHalfMessage
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
     
       
    // 将消息存储到CommitLog或者其他的存储中
    return store.putMessage(parseHalfMessageInner(messageInner));
}

3、 从上面可知,事务Prepare消息的存储与普通消息并没有太大区别,那Broker是如何保证Prepare不会被Consume消费掉的呢?主要通过一下方式;

  • 旧的实现方式(废弃)

    • Broker在消息写入CommitLog的时候会判断消息类型,如果是prepare或者rollback的事务消息,ConsumeQueue的queueOffset不会增加(queueOffset每追加一条就会自增)。
    • Broker在构造ConsumeQueue时会判断prepare和rollback消息,如果是则不会将消息写入ConsumeQueue。即此消息不会在ConsumeQueue中,所以Consumer也就不会消费

新的实现方式:更改原有的Topic,只有Commit消息后才会将其发送到原始的Topic下,这样就保证没有Commit前,Consumer无法消费

  1. 查看CommitLog.DefaultAppendMessageCallback#doAppend是事务消息相关的判断。由于前面将Topic换成事务的Topic,并且将事务的标记去掉了,所以这里标记永远是TRANSACTION_NOT_TYPE。之所以有这个逻辑,我猜测是之前的事务实现方式(没有更改Topic的方式)
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,final MessageExtBrokerInner msgInner) {
     
       
    ...省略...
    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(msgInner.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(msgInner.getQueueId());
    String key = keyBuilder.toString();
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) {
     
       
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }

    //事务消息Prepare和Rollback消息,队列偏移量都设置的是0
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) {
     
       
        // Prepared and Rollback message is not consumed, will not enter the
        // consumer queuec
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            queueOffset = 0L;
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    }

    ...省略...
    // 只有事务TRANSACTION_COMMIT_TYPE消息和TRANSACTION_NOT_TYPE才会设置队列偏移量
    switch (tranType) {
     
       
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            // The next update ConsumeQueue information
            CommitLog.this.topicQueueTable.put(key, ++queueOffset);
            break;
        default:
            break;
    }
    return result;
}

  1. 查看DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch是异步构造ConsumeQueue的地方。这里可以看到,如果是Prepare和Rollback消息,并不会构造。这样Consumer也就无法消费了
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
     
       

    @Override
    public void dispatch(DispatchRequest request) {
     
       
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) {
     
       
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            // 对于prepare和rollback消息不会构造ConsumeQueue
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        }
    }
}

Broker处理提交/回滚消息

  1. Producer发送本地结果给Broker,调用RequestCode.END_TRANSACTION命令,此命令在broker端是通过EndTransactionProcessor来进行处理的。EndTransactionProcessor#processRequest处理逻辑如下
  • 只有Master节点可以处理,打印相关日志。只有提交或者回滚的消息才会向下执行
  • 如果是回滚消息,根据偏移量从RMQ_SYS_TRANS_HALF_TOPIC查询出提交的消息,并检查Prepare消息的正确性。删除回滚消息(其实就是向RMQ_SYS_TRANS_OP_HALF_TOPIC主题写入消息,tag是d),标识此消息已经被删除
  • 如果是提交消息,根据偏移量从RMQ_SYS_TRANS_HALF_TOPIC查询出提交的消息,并检查Prepare消息的正确性。恢复原始消息,包括恢复原始Topic、Queue等,并且清除事务属性,并且将原始消息存储到CommitLog中,存储成功时删除prepare消息(其实就是向RMQ_SYS_TRANS_OP_HALF_TOPIC主题写入消息,tag是d),标识此消息已经被删除
  1. EndTransactionProcessor#processRequest源码
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws
    RemotingCommandException {
     
       
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final EndTransactionRequestHeader requestHeader =
        (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
    LOGGER.info("Transaction request:{}", requestHeader);
    // 从节点不允许处理事务消息
    if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
     
       
        response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
        LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
        return response;
    }
    // 事务回查标记,是否为事务回查(仅仅打印日志),只有提交或者回滚的消息才向后处理
    if (requestHeader.getFromTransactionCheck()) {
     
       
        switch (requestHeader.getCommitOrRollback()) {
     
       
            case MessageSysFlag.TRANSACTION_NOT_TYPE: {
     
       
                LOGGER.warn("Check producer[{}] transaction state, but it's pending status."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                return null;
            }

            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
     
       
                LOGGER.warn("Check producer[{}] transaction state, the producer commit the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());

                break;
            }

            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
     
       
                LOGGER.warn("Check producer[{}] transaction state, the producer rollback the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                break;
            }
            default:
                return null;
        }
    } else {
     
       
        switch (requestHeader.getCommitOrRollback()) {
     
       
            case MessageSysFlag.TRANSACTION_NOT_TYPE: {
     
       
                LOGGER.warn("The producer[{}] end transaction in sending message,  and it's pending status."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                return null;
            }

            case MessageSysFlag.TRANSACTION_COMMIT_TYPE: {
     
       
                break;
            }

            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: {
     
       
                LOGGER.warn("The producer[{}] end transaction in sending message, rollback the message."
                        + "RequestHeader: {} Remark: {}",
                    RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    requestHeader.toString(),
                    request.getRemark());
                break;
            }
            default:
                return null;
        }
    }
    OperationResult result = new OperationResult();
    if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
     
       
        // 根据之前提交的内部事务topic的偏移量查出来提交的这条消息
        result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
     
       
            // 校验查询出来的这条消息是否正确
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
     
       
                // 恢复原始消息
                MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
                msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
                msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
                //存储到CommitLog文件中,如果成功,则删除半消息
                RemotingCommand sendResult = sendFinalMessage(msgInner);
                if (sendResult.getCode() == ResponseCode.SUCCESS) {
     
       
                    // 删除prepare消息,其实就是向RMQ_SYS_TRANS_OP_HALF_TOPIC主题写入消息,tag是d
                    // 因为RocketMQ是追加消息,不支持更改和删除,所以删除就是在特有的主题下新增一条消息
                    // 这样无论是提交还是回滚,都可以找到,以此来判断是回滚还是提交了。如果没有则是未知状态
                    this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
                }
                return sendResult;
            }
            return res;
        }
    } else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
     
       
        result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
        if (result.getResponseCode() == ResponseCode.SUCCESS) {
     
       
            RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
            if (res.getCode() == ResponseCode.SUCCESS) {
     
       
                this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
            }
            return res;
        }
    }
    response.setCode(result.getResponseCode());
    response.setRemark(result.getResponseRemark());
    return response;
}

  1. TransactionalMessageServiceImpl#deletePrepareMessage删除消息(并不是物理删除,而是追加),删除消息本质是向RMQ_SYS_TRANS_OP_HALF_TOPIC主题的队列追加一条有特定tag的消息
TransactionalMessageServiceImpl#deletePrepareMessage
@Override
public boolean deletePrepareMessage(MessageExt msgExt) {
     
       
    if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
     
       
        log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
        return true;
    } else {
     
       
        log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
        return false;
    }
}

// TransactionalMessageBridge#putOpMessage 向RMQ_SYS_TRANS_OP_HALF_TOPIC追加消息
public boolean putOpMessage(MessageExt messageExt, String opType) {
     
       
    // messageExt是Prepare消息
    // 构建一个消息队列
    MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
        this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
    if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
     
       
        return addRemoveTagInTransactionOp(messageExt, messageQueue);
    }
    return true;
}

//TransactionalMessageBridge#addRemoveTagInTransactionOp
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
     
       
    Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG,
        String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
    writeOp(message, messageQueue);
    return true;
}

//TransactionalMessageBridge#writeOp 调用存储putMessage
private void writeOp(Message message, MessageQueue mq) {
     
       
  	//此处mq指的是Prepare消息队列(RMQ_SYS_TRANS_HALF_TOPIC主题的)
    //key=RMQ_SYS_TRANS_HALF_TOPIC队列与value=RMQ_SYS_TRANS_OP_HALF_TOPIC缓存
    MessageQueue opQueue;
    if (opQueueMap.containsKey(mq)) {
     
       
        opQueue = opQueueMap.get(mq);
    } else {
     
       
        // 创建一个RMQ_SYS_TRANS_OP_HALF_TOPIC主题的消息队列
        opQueue = getOpQueueByHalf(mq);
        // 如果已经存在不会覆盖已有的值,直接返回已有的值
        MessageQueue oldQueue = opQueueMap.putIfAbsent(mq, opQueue);
        if (oldQueue != null) {
     
       
            opQueue = oldQueue;
        }
    }
    //TODO by jannal 此处为什么会为null ??
    if (opQueue == null) {
     
       
        opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), mq.getBrokerName(), mq.getQueueId());
    }
    putMessage(makeOpMessageInner(message, opQueue));
}

Broker事务回查

1、 正常情况下如果客户端在处理回调后,会返回给Broker相关的状态假设Producer此时挂了,或者因为网络原因调用Broker失败了这个时候就需要Broker事务定期回查;
2. TransactionalMessageCheckService是一个服务线程,在Broker启动时,此服务线程会启动。默认60s执行一次回查,每次执行的超时时间是6s,最大回查次数15次。调用TransactionalMessageService#check方法做消息回查

BrokerController#start
 
...省略....
if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {
     
       
    if (this.transactionalMessageCheckService != null) {
     
       
        log.info("Start transaction service!");
        this.transactionalMessageCheckService.start();
    }
} 

TransactionalMessageCheckService#run
@Override
public void run() {
     
       
    log.info("Start transaction check service thread!");
  	// 默认60s
    long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
    while (!this.isStopped()) {
     
       
        this.waitForRunning(checkInterval);
    }
    log.info("End transaction check service thread!");
}

@Override
protected void onWaitEnd() {
     
       
    //默认6000ms
    long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
    //默认最大检查15次
    int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
    long begin = System.currentTimeMillis();
    log.info("Begin to check prepare message, begin time:{}", begin);
    this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
    log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}

  1. TransactionalMessageService#check主要流程
  • 根据RMQ_SYS_TRANS_HALF_TOPIC查找队列,目前只有一个0队列
  • 遍历RMQ_SYS_TRANS_HALF_TOPIC的MessageQueue,每个MessageQueue的处理时间是60s
  • 通过RMQ_SYS_TRANS_HALF_TOPIC的MessageQueue作为Key从缓存中获取RMQ_SYS_TRANS_OP_HALF_TOPIC的MessageQueue,如果不存在,则创建一个
  • 使用CID_RMQ_SYS_TRANS消费组拉取op队列里的消息,一次拉取32条
  • 判断prepare中获取到的消息与OP中的对比,如果OP中包含此消息,则不回查。如果回查超过15次、消息过期,则直接跳过
  • 如果处理时间已经超过了事务的回查时间,则进行回查,否则继续拉取消息。
  • 将消息重新追加prepare消息队列并更新偏移量
  • 发送回查消息给Producer
  • 更新prepare队列和op队列的消费进度
  1. TransactionalMessageService#check源码
@Override
public void check(long transactionTimeout, int transactionCheckMax,
    AbstractTransactionalMessageCheckListener listener) {
     
       
    try {
     
       
        String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC;
        // RMQ_SYS_TRANS_HALF_TOPIC 只有一个队列
        Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
        if (msgQueues == null || msgQueues.size() == 0) {
     
       
            log.warn("The queue of topic is empty :" + topic);
            return;
        }
        log.info("Check topic={}, queues={}", topic, msgQueues);
        for (MessageQueue messageQueue : msgQueues) {
     
       
            // 队列处理开始时间
            long startTime = System.currentTimeMillis();
            // 一条prepare消息队列对应一个op队列(提交或回滚后),实际就一个队列
            MessageQueue opQueue = getOpQueue(messageQueue);
            // 获取prepare消息队列最新的的消费偏移量
            long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
            // 获取op消息队列最新的消费偏移量
            long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
            log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset);
            if (halfOffset < 0 || opOffset < 0) {
     
       
                log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue,
                    halfOffset, opOffset);
                continue;
            }
            // 已经被处理的Op消息的偏移量
            List<Long> doneOpOffset = new ArrayList<>();
            // 已经被删除的prepare消息
            HashMap<Long, Long> removeMap = new HashMap<>();
            // 确认prepare消息是否已经被删除。主要目的是为了避免重复调用事务回查请求
            PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
            if (null == pullResult) {
     
       
                log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null",
                    messageQueue, halfOffset, opOffset);
                continue;
            }
            // single thread
            // 获取空消息的次数
            int getMessageNullCount = 1;
            long newOffset = halfOffset;
            // 逻辑偏移量
            long i = halfOffset;
            while (true) {
     
       
                // 每一个MessageQueue处理时间限制在60s内
                if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) {
     
       
                    log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT);
                    break;
                }
                // 如果Prepare消息已经被处理过,则直接remove
                if (removeMap.containsKey(i)) {
     
       
                    log.info("Half offset {} has been committed/rolled back", i);
                    removeMap.remove(i);
                } else {
     
       
                    // 获取当前要处理的prepare消息
                    GetResult getResult = getHalfMsg(messageQueue, i);
                    MessageExt msgExt = getResult.getMsg();
                    // 消息不存在,直接退出循环
                    if (msgExt == null) {
     
       
                        if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) {
     
       
                            break;
                        }
                        if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) {
     
       
                            log.info("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i,
                                messageQueue, getMessageNullCount, getResult.getPullResult());
                            break;
                        } else {
     
       
                            log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}",
                                i, messageQueue, getMessageNullCount, getResult.getPullResult());
                            i = getResult.getPullResult().getNextBeginOffset();
                            newOffset = i;
                            continue;
                        }
                    }
                    // 超过15次丢弃,或者消息过期了(超过了设置的文件保存时间,默认72小时)
                    if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
     
       
                        // 默认是打印一条日志
                        listener.resolveDiscardMsg(msgExt);
                        newOffset = i + 1;
                        i++;
                        continue;
                    }
                    // 消息存储时间大于回查程序开始时间的不处理,这是一个防御
                    if (msgExt.getStoreTimestamp() >= startTime) {
     
       
                        log.info("Fresh stored. the miss offset={}, check it later, store={}", i,
                            new Date(msgExt.getStoreTimestamp()));
                        break;
                    }
                    // 消息已存储的时间差
                    long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp();
                    //默认超时6s
                    long checkImmunityTime = transactionTimeout;
                    //目前此属性只是在测试用例使用
                    String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
                    if (null != checkImmunityTimeStr) {
     
       
                        //checkImmunityTimeStr如果是-1,则使用transactionTimeout
                        checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout);
                        // 给事务预留时间用于提交事务,此时不应该做回查
                        if (valueOfCurrentMinusBorn < checkImmunityTime) {
     
       
                            // 超过检查时间,重新写回prepare消息队列
                            if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt, checkImmunityTime)) {
     
       
                                newOffset = i + 1;
                                i++;
                                continue;
                            }
                        }
                    } else {
     
       
                        // 新提交的prepare消息,暂不处理,此时可能正在提交的路上
                        if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) {
     
       
                            log.info("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i,
                                checkImmunityTime, new Date(msgExt.getBornTimestamp()));
                            break;
                        }
                    }
                    List<MessageExt> opMsg = pullResult.getMsgFoundList();
                    // checkImmunityTime默认是6秒,第一次可以检查的时间
                    // 时间超过事务超时时间、最后一条消息的存储时间减去处理的起始时间超过超时时间
                    // TODO valueOfCurrentMinusBorn什么情况会<=-1????
                    boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
                        || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
                        || (valueOfCurrentMinusBorn <= -1);

                    if (isNeedCheck) {
     
       
                        // 把这个消息重新写回prepare消息队列里并更新偏移量
                        if (!putBackHalfMsgQueue(msgExt, i)) {
     
       
                            continue;
                        }
                        // 事务回查(异步线程发送),发送消息给Producer
                        listener.resolveHalfMsg(msgExt);
                    } else {
     
       
                        // 如果没有超时继续拉
                        pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
                        log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                            messageQueue, pullResult);
                        continue;
                    }
                }
                // 消费偏移量加+1
                newOffset = i + 1;
                i++;
            }
            // 说明消费了,此时需要更新偏移量
            if (newOffset != halfOffset) {
     
       
                transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
            }

            long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
            if (newOpOffset != opOffset) {
     
       
                transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
            }
        }
    } catch (Exception e) {
     
       
        e.printStackTrace();
        log.error("Check error", e);
    }

}