leniu-java-mq
SKILL.md
leniu-tengyun-core 消息队列规范
项目特征
| 特征 | 说明 |
|---|---|
| 包名前缀 | net.xnzn.core.* |
| JDK 版本 | 21 |
| 消息工具 | MqUtil |
| 消费者注解 | @MQMessageListener(group, topic, tag) |
| 消费者接口 | implements MQListener<MqPayload<String>> |
| 主题常量 | LeMqConstant.Topic |
| 延迟枚举 | LeMqConstant.DelayDuration |
| JSON工具 | JacksonUtil |
| 异常类 | LeException |
核心架构:三层分工
MsgSend(静态工具类)→ Listener(接收消息)→ Handler(分发处理)
| 层 | 职责 | 示例 |
|---|---|---|
| XxxMessageSend | 静态工具类,封装消息发送逻辑 | OrderMessageSend |
| XxxMqListenerYyy | 消费者,接收 MQ 消息并分发到 Handler | OrderMqListenerAsyncSave |
| XxxMqHandler | 业务处理,统一处理各类消息 | OrderMqHandler |
消息发送
三种发送方式
import net.xnzn.core.common.mq.MqUtil;
import net.xnzn.core.common.constant.LeMqConstant;
import net.xnzn.core.common.utils.JacksonUtil;
// 1. 普通消息(立即发送)
MqUtil.send(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC);
// 2. 事务消息(在 DB 事务提交后发送,保证一致性)
MqUtil.sendByTxEnd(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC);
// 3. 延迟消息(指定延迟时长后触发)
MqUtil.sendDelay(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC, LeMqConstant.DelayDuration.ONE_MINUTE);
关键点:
- 消息体必须用
JacksonUtil.writeValueAsString()序列化为 String 再发送 - 事务消息用
sendByTxEnd()(如 DB 事务回滚则不发送) - 延迟时间用
LeMqConstant.DelayDuration枚举(不用Duration.ofMinutes())
DelayDuration 枚举(常用值)
LeMqConstant.DelayDuration.ONE_MINUTE // 1分钟
LeMqConstant.DelayDuration.THIRTY_MINUTES // 30分钟
// 其他枚举值参见 LeMqConstant.DelayDuration
消息发送类(静态工具类模式)
/**
* 订单消息发送
* 关键特征:
* 1. 不是 @Component,是纯静态工具类
* 2. 私有构造器
* 3. PO 中包含 traceId 和 tenantId(用于跨线程追踪)
*/
@Slf4j
public class XxxMessageSend {
private XxxMessageSend() {} // 禁止实例化
private static final String MQ_ERROR_LOG = "发送MQ消息失败";
/**
* 普通消息(适用于非事务性发送)
*/
public static void sendXxxEvent(XxxPO po) {
log.info("[XxxMQ]发送xxx事件");
po.setTraceId(LogUtil.getCurrentTraceId());
po.setTenantId(TenantContextHolder.getTenantId());
MqUtil.send(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC);
}
/**
* 事务消息(在 @Transactional 方法中使用,事务提交后才发送)
*/
public static void sendXxxEventByTx(XxxPO po) {
log.info("[XxxMQ]发送xxx事务消息");
po.setTraceId(LogUtil.getCurrentTraceId());
po.setTenantId(TenantContextHolder.getTenantId());
try {
MqUtil.sendByTxEnd(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC);
} catch (Exception e) {
log.error(MQ_ERROR_LOG, e);
}
}
/**
* 延迟消息(超时取消等场景)
*/
public static void sendXxxDelay(XxxPO po, LeMqConstant.DelayDuration delayDuration) {
log.info("[XxxMQ]发送xxx延迟消息");
po.setTraceId(LogUtil.getCurrentTraceId());
po.setTenantId(TenantContextHolder.getTenantId());
MqUtil.sendDelay(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.XXX_TOPIC, delayDuration);
}
}
PO 消息体规范
import lombok.Data;
/**
* MQ 消息 PO(Message Payload Object)
* 必须包含 traceId 和 tenantId 字段
*/
@Data
public class XxxPO {
/** 链路追踪ID */
private String traceId;
/** 租户ID */
private String tenantId;
/** 业务字段 */
private Long orderId;
private String outTradeNo;
// 其他字段...
}
消息消费
Listener 类(真实代码模式)
import lombok.extern.slf4j.Slf4j;
import net.xnzn.core.common.mq.MqPayload;
import net.xnzn.framework.mq.MQListener;
import net.xnzn.framework.mq.MQMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
/**
* MQ 消费者 Listener
* @see LeMqConstant.Topic#XXX_TOPIC
*/
@Slf4j
@MQMessageListener(
group = "module-xxx-topic-name", // 消费组,格式:模块名-topic-tag
topic = "xxx", // Topic 名称
tag = "xxx-topic-name" // Tag 名称(对应 LeMqConstant.Topic)
)
public class XxxMqListenerYyy implements MQListener<MqPayload<String>> {
@Autowired
@Lazy // ⚠️ 必须 @Lazy,避免循环依赖
private XxxMqHandler xxxMqHandler;
@Override
public void onMessage(MqPayload<String> payload) {
// 委托给 Handler 处理,使用方法引用
xxxMqHandler.handleMessage(payload, XxxPO.class, XxxMqHandler::handleXxx);
}
}
Handler 类(统一处理消息)
import cn.hutool.core.text.CharSequenceUtil;
import com.pig4cloud.pigx.common.core.exception.LeException;
import lombok.extern.slf4j.Slf4j;
import net.xnzn.core.common.export.util.I18nUtil;
import net.xnzn.core.common.mq.MqPayload;
import net.xnzn.core.common.utils.JacksonUtil;
import net.xnzn.framework.data.tenant.TenantContextHolder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.function.BiConsumer;
@Slf4j
@Service
public class XxxMqHandler {
@Lazy
@Autowired
private XxxService xxxService;
/**
* 统一处理调用(核心模板方法)
* 负责:反序列化、设置租户上下文、异常兜底
*/
public <T> void handleMessage(MqPayload<String> payload, Class<T> clz, BiConsumer<XxxMqHandler, T> handleFunc) {
I18nUtil.loadDefaultLocale();
try {
log.info("[Xxx消息]收到消息 {}", payload);
T payloadData = JacksonUtil.readValue(payload.getData(), clz);
if (payloadData != null) {
TenantContextHolder.setTenantId(payload.getTenantId()); // 设置租户上下文
handleFunc.accept(this, payloadData);
} else {
log.error("[Xxx消息]解析失败");
}
} catch (Exception e) {
log.error("[Xxx消息]处理异常", e);
}
}
/**
* 处理 xxx 事件
*/
public void handleXxx(XxxPO payload) {
try {
log.info("[Xxx事件]MQ消费:开始");
xxxService.processXxx(payload);
log.info("[Xxx事件]MQ消费:消息消费完成");
} catch (Exception e) {
log.error("[Xxx事件]MQ消费:处理异常", e);
}
}
}
常见场景
场景1:事务消息(下单后通知)
@Transactional(rollbackFor = Exception.class)
public void createOrder(OrderParam param) {
// 1. 保存订单
OrderInfo order = OrderInfo.newDefaultInstance();
order.setCanteenId(param.getCanteenId());
orderMapper.insert(order);
// 2. 事务提交后发送消息(保证一致性)
OrderPlacedPO po = new OrderPlacedPO();
po.setOrderInfo(order);
OrderMessageSend.sendOrderPlacedByTx(po); // 内部使用 sendByTxEnd
log.info("订单创建成功,orderId:{}", order.getId());
}
场景2:延迟消息(订单超时取消)
public static LocalDateTime sendOrderTimeout(String macOrderId, LeMqConstant.DelayDuration delayDuration) {
log.info("[订单MQv3]发送未支付订单异步支付超时通知");
OrderCancelPO po = new OrderCancelPO();
po.setMacOrderId(macOrderId);
po.setTenantId(TenantContextHolder.getTenantId());
po.setTraceId(LogUtil.getCurrentTraceId());
// 延迟发送
MqUtil.sendDelay(JacksonUtil.writeValueAsString(po), LeMqConstant.Topic.ORDER_V3_ASYNC_TIMEOUT, delayDuration);
// 返回预计触发时间
return LocalDateTime.now().plusSeconds(delayDuration.getMilliseconds() / 1000);
}
场景3:带 Redisson 分布式锁的 MQ 消费
public void orderAsyncSave(OrderSavePO payload) {
// 消费时加分布式锁(防止并发处理同一订单)
RLock lock = RedisUtil.getLock(OrderCacheConstants.orderCacheSaveLockKey(payload.getMacOrderId()));
lock.lock();
try {
log.info("[订单异步保存]MQ消费:开始");
doSaveOrder(payload);
log.info("[订单异步保存]MQ消费:消息消费完成");
} catch (Exception e) {
log.error("[订单异步保存]MQ消费:处理异常", e);
} finally {
// 安全释放锁
try {
if (lock.isHeldByCurrentThread() && lock.isLocked()) {
lock.unlock();
}
} catch (Exception e) {
log.error("解锁异常", e);
}
}
}
日志规范
// 发送端日志格式
log.info("[模块MQv3]发送xxx事件");
// 消费端日志格式
log.info("[xxx事件]MQ消费:开始");
log.info("[xxx事件]MQ消费:消息消费完成");
log.error("[xxx事件]MQ消费:处理异常", e);
常见错误
| 错误写法 | 正确写法 | 说明 |
|---|---|---|
MqUtil.send(dto, topic) 直接传对象 |
MqUtil.send(JacksonUtil.writeValueAsString(dto), topic) |
必须先序列化为 String |
@MqConsumer(topic = ...) |
@MQMessageListener(group, topic, tag) + implements MQListener<MqPayload<String>> |
实际框架注解不同 |
Duration.ofMinutes(30) |
LeMqConstant.DelayDuration.THIRTY_MINUTES |
延迟枚举不是 Duration |
忘记在 PO 中设置 traceId/tenantId |
po.setTraceId(LogUtil.getCurrentTraceId()) |
多租户追踪必须设置 |
消费方法直接 @Autowired 服务 |
@Autowired @Lazy |
避免循环依赖 |
在 MQ 发送类上加 @Component |
纯静态工具类(私有构造器,不注入 Spring) | 发送类是静态工具 |
Weekly Installs
3
Repository
xu-cell/ai-engi…ing-initGitHub Stars
8
First Seen
8 days ago
Security Audits
Installed on
github-copilot3
codex3
kimi-cli3
gemini-cli3
cursor3
amp3