feat: 添加Redisson延时队列和模板方法模式文档
docs: 更新BigDecimal高精度计算文档分类
This commit is contained in:
@@ -4,7 +4,7 @@ published: 2025-07-26
|
||||
description: ''
|
||||
image: ''
|
||||
tags: ['Java', '面试','高精度计算','BigDecimal']
|
||||
category: 'Java > 面试'
|
||||
category: 'Java > 面试题'
|
||||
draft: false
|
||||
lang: ''
|
||||
---
|
||||
|
||||
173
src/content/posts/中间件/Redis/Redisson延时队列架构.md
Normal file
173
src/content/posts/中间件/Redis/Redisson延时队列架构.md
Normal file
@@ -0,0 +1,173 @@
|
||||
---
|
||||
title: Redisson延时队列架构
|
||||
published: 2025-07-27
|
||||
description: ''
|
||||
image: ''
|
||||
tags: ['Redis','Redisson','延时队列']
|
||||
category: '中间件 > Redis'
|
||||
draft: false
|
||||
lang: ''
|
||||
---
|
||||
|
||||
延时队列是一种特殊的消息队列,消息在发送后不会立即被消费,而是等待指定的时间后才被消费者处理。就像设置了一个"闹钟",到时间才响。
|
||||
|
||||
# 阻塞队列 RBlockingDeque - 阻塞双端队列
|
||||
特点:
|
||||
- 双端: 可以从两端插入和取出元素
|
||||
- 阻塞: 当队列为空的时候,取元素会阻塞等待
|
||||
- 线程安全: 多个线程可以安全操作
|
||||
|
||||
|
||||
```java
|
||||
// 特点:
|
||||
// - 双端:可以从两端插入和取出元素
|
||||
// - 阻塞:当队列为空时,取元素会阻塞等待
|
||||
// - 线程安全:多个线程可以安全操作
|
||||
|
||||
RBlockingDeque<String> deque = redissonClient.getBlockingDeque("myDeque");
|
||||
deque.offerFirst("头部元素");
|
||||
deque.offerLast("尾部元素");
|
||||
String element = deque.takeFirst(); // 阻塞获取
|
||||
|
||||
```
|
||||
|
||||
# RDelayedQueue - 延时队列
|
||||
|
||||
特点:
|
||||
- 自动延时:消息在指定时间后自动变为可消费状态
|
||||
- 精确控制:可以精确控制每个消息的延时时间
|
||||
- Redis实现:基于Redis的有序集合(ZSet)实现
|
||||
|
||||
```java
|
||||
|
||||
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(deque);
|
||||
delayedQueue.offer("消息内容", 30, TimeUnit.SECONDS); // 30秒后可消费
|
||||
```
|
||||
|
||||
## 完整实现示例
|
||||
|
||||
### 生产者端(消息发送)
|
||||
```java
|
||||
@Service
|
||||
public class DelayQueueProducer {
|
||||
|
||||
@Autowired
|
||||
private RedissonClient redissonClient;
|
||||
|
||||
public void sendDelayedMessage(String message, long delaySeconds) {
|
||||
try {
|
||||
// 创建队列
|
||||
RBlockingDeque<String> blockingDeque = redissonClient
|
||||
.getBlockingDeque("DELAY_QUEUE_EXAMPLE");
|
||||
RDelayedQueue<String> delayedQueue = redissonClient
|
||||
.getDelayedQueue(blockingDeque);
|
||||
|
||||
// 发送延时消息
|
||||
delayedQueue.offer(message, delaySeconds, TimeUnit.SECONDS);
|
||||
|
||||
System.out.println("发送延时消息: " + message +
|
||||
", 延时: " + delaySeconds + "秒");
|
||||
} catch (Exception e) {
|
||||
log.error("发送延时消息失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 消费者端(消息处理)
|
||||
```java
|
||||
@Component
|
||||
public class DelayQueueConsumer {
|
||||
|
||||
@Autowired
|
||||
private RedissonClient redissonClient;
|
||||
|
||||
@PostConstruct
|
||||
public void startConsumer() {
|
||||
// 启动独立线程消费延时消息
|
||||
new Thread(this::consumeMessages, "DelayQueueConsumer").start();
|
||||
}
|
||||
|
||||
private void consumeMessages() {
|
||||
try {
|
||||
RBlockingDeque<String> blockingDeque = redissonClient
|
||||
.getBlockingDeque("DELAY_QUEUE_EXAMPLE");
|
||||
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
// 阻塞获取消息(自动等待延时到期)
|
||||
String message = blockingDeque.take();
|
||||
System.out.println("消费延时消息: " + message);
|
||||
|
||||
// 处理业务逻辑
|
||||
processMessage(message);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
log.info("消费者线程被中断");
|
||||
} catch (Exception e) {
|
||||
log.error("消费消息异常", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void processMessage(String message) {
|
||||
// 实际的业务处理逻辑
|
||||
System.out.println("处理业务消息: " + message);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## 底层实现原理
|
||||
|
||||
### Redis数据结构使用
|
||||
```bash
|
||||
# Redisson使用以下数据结构:
|
||||
# 1. 有序集合(ZSet) - 存储延时消息和到期时间
|
||||
ZADD delay_queue 1640995200 "message1" # 到期时间戳作为score
|
||||
|
||||
# 2. 列表(List) - 存储已到期可消费的消息
|
||||
LPUSH ready_queue "message1"
|
||||
|
||||
# 3. 定时任务 - 定期检查到期消息
|
||||
# Redisson内部使用定时任务扫描ZSet,将到期消息移动到List
|
||||
```
|
||||
|
||||
### 延时检查机制
|
||||
```java
|
||||
// Redisson内部逻辑(简化版)
|
||||
public class DelayedQueueChecker {
|
||||
public void checkExpiredMessages() {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
// 从有序集合中获取已到期的消息
|
||||
Set<String> expiredMessages = redisTemplate
|
||||
.opsForZSet()
|
||||
.rangeByScore("delay_queue", 0, now);
|
||||
|
||||
for (String message : expiredMessages) {
|
||||
// 移动到可消费队列
|
||||
redisTemplate.opsForList().leftPush("ready_queue", message);
|
||||
redisTemplate.opsForZSet().remove("delay_queue", message);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
## 使用场景
|
||||
|
||||
```java
|
||||
// 1. 订单超时处理
|
||||
public void handleOrderTimeout(String orderId) {
|
||||
delayedQueue.offer(orderId, 30, TimeUnit.MINUTES);
|
||||
}
|
||||
|
||||
// 2. 优惠券到期提醒
|
||||
public void couponExpireReminder(String couponId) {
|
||||
delayedQueue.offer(couponId, 24, TimeUnit.HOURS);
|
||||
}
|
||||
|
||||
// 3. 消息重试机制
|
||||
public void messageRetry(String messageId) {
|
||||
delayedQueue.offer(messageId, 5, TimeUnit.SECONDS); // 5秒后重试
|
||||
}
|
||||
```
|
||||
|
||||
264
src/content/posts/设计模式/模板方法模式.md
Normal file
264
src/content/posts/设计模式/模板方法模式.md
Normal file
@@ -0,0 +1,264 @@
|
||||
---
|
||||
title: 模板方法模式
|
||||
published: 2025-07-27
|
||||
description: ''
|
||||
image: ''
|
||||
tags: [策略模式, 设计模式]
|
||||
category: '设计模式'
|
||||
draft: false
|
||||
lang: ''
|
||||
---
|
||||
|
||||
# 介绍
|
||||

|
||||
对原理类图的说明:
|
||||
AbstractClass 抽象类, 类中实现了模板方法(template),定义了算法的骨架,具体子类需要去实现 其它的抽象方法
|
||||
ConcreteClass 具体类, 实现了抽象类中的抽象方法
|
||||
|
||||
也就是父类模板方法定义不变的流程,子类重写流程中的方法。
|
||||
|
||||

|
||||
|
||||
# ①、AbstractClass 抽象模板
|
||||
|
||||
## 一、基本方法
|
||||
|
||||
上面的 baseOperation() 或者 customOperation() 方法,也叫基本操作,是由子类实现的方法,并且在模板方法中被调用。
|
||||
|
||||
基本方法尽量设计为protected类型, 符合迪米特法则, 不需要暴露的属性或方法尽量不要设置为protected类型。 实现类若非必要, 尽量不要扩大父类中的访权限。
|
||||
## 二、模板方法
|
||||
|
||||
上面的 templateMethod() 方法,可以有一个或者几个,实现对基本方法的调度,完成固定的逻辑。
|
||||
|
||||
为了防止恶意操作,通常模板方法都加上 final 关键字,不允许覆写。
|
||||

|
||||
|
||||
# ②、ConcreteClass 具体模板
|
||||
|
||||
实现父类定义的一个或多个抽象方法,也就是父类定义的基本方法在子类中得以实现。
|
||||
# 应用
|
||||
这个设计模式我们可以拿来做mq的发送和封装
|
||||
|
||||
我们在抽象模板中实现通用的`sendMessage方法`,在具体模板中实现具体的消息构建和封装发送逻辑。
|
||||
|
||||
## 定义消息发送事件基础扩充属性实体
|
||||
```java
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public final class BaseSendExtendDTO {
|
||||
|
||||
/**
|
||||
* 事件名称
|
||||
*/
|
||||
private String eventName;
|
||||
|
||||
/**
|
||||
* 主题
|
||||
*/
|
||||
private String topic;
|
||||
|
||||
/**
|
||||
* 标签
|
||||
*/
|
||||
private String tag;
|
||||
|
||||
/**
|
||||
* 业务标识
|
||||
*/
|
||||
private String keys;
|
||||
|
||||
/**
|
||||
* 发送消息超时时间
|
||||
*/
|
||||
private Long sentTimeout;
|
||||
|
||||
/**
|
||||
* 具体延迟时间
|
||||
*/
|
||||
private Long delayTime;
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
|
||||
我们这样定义抽象模板: `AbstractCommonSendProduceTemplate`
|
||||
|
||||
|
||||
## 抽象模板
|
||||
```java
|
||||
/**
|
||||
* RocketMQ 抽象公共发送消息组件
|
||||
*/
|
||||
@RequiredArgsConstructor
|
||||
@Slf4j(topic = "CommonSendProduceTemplate")
|
||||
public abstract class AbstractCommonSendProduceTemplate<T> {
|
||||
|
||||
private final RocketMQTemplate rocketMQTemplate;
|
||||
|
||||
/**
|
||||
* 构建消息发送事件基础扩充属性实体
|
||||
*
|
||||
* @param messageSendEvent 消息发送事件
|
||||
* @return 扩充属性实体
|
||||
*/
|
||||
protected abstract BaseSendExtendDTO buildBaseSendExtendParam(T messageSendEvent);
|
||||
|
||||
/**
|
||||
* 构建消息基本参数,请求头、Keys...
|
||||
*
|
||||
* @param messageSendEvent 消息发送事件
|
||||
* @param requestParam 扩充属性实体
|
||||
* @return 消息基本参数
|
||||
*/
|
||||
protected abstract Message<?> buildMessage(T messageSendEvent, BaseSendExtendDTO requestParam);
|
||||
|
||||
/**
|
||||
* 消息事件通用发送
|
||||
*
|
||||
* @param messageSendEvent 消息发送事件
|
||||
* @return 消息发送返回结果
|
||||
*/
|
||||
public SendResult sendMessage(T messageSendEvent) {
|
||||
BaseSendExtendDTO baseSendExtendDTO = buildBaseSendExtendParam(messageSendEvent);
|
||||
SendResult sendResult;
|
||||
try {
|
||||
// 构建 Topic 目标落点 formats: `topicName:tags`
|
||||
StringBuilder destinationBuilder = StrUtil.builder().append(baseSendExtendDTO.getTopic());
|
||||
if (StrUtil.isNotBlank(baseSendExtendDTO.getTag())) {
|
||||
destinationBuilder.append(":").append(baseSendExtendDTO.getTag());
|
||||
}
|
||||
|
||||
// 延迟时间不为空,发送任意延迟消息,否则发送普通消息
|
||||
if (baseSendExtendDTO.getDelayTime() != null) {
|
||||
sendResult = rocketMQTemplate.syncSendDeliverTimeMills(
|
||||
destinationBuilder.toString(),
|
||||
buildMessage(messageSendEvent, baseSendExtendDTO),
|
||||
baseSendExtendDTO.getDelayTime()
|
||||
);
|
||||
} else {
|
||||
sendResult = rocketMQTemplate.syncSend(
|
||||
destinationBuilder.toString(),
|
||||
buildMessage(messageSendEvent, baseSendExtendDTO),
|
||||
baseSendExtendDTO.getSentTimeout()
|
||||
);
|
||||
}
|
||||
|
||||
log.info("[生产者] {} - 发送结果:{},消息ID:{},消息Keys:{}", baseSendExtendDTO.getEventName(), sendResult.getSendStatus(), sendResult.getMsgId(), baseSendExtendDTO.getKeys());
|
||||
} catch (Throwable ex) {
|
||||
log.error("[生产者] {} - 消息发送失败,消息体:{}", baseSendExtendDTO.getEventName(), JSON.toJSONString(messageSendEvent), ex);
|
||||
throw ex;
|
||||
}
|
||||
|
||||
return sendResult;
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
# 具体模板
|
||||
|
||||
这里举个例子,我们做一个短信通知的,定义MessageNotifyEvent
|
||||
|
||||
```java
|
||||
@Slf4j
|
||||
@Component
|
||||
public class SmsNotificationProducer extends AbstractCommonSendProduceTemplate<MessageNotifyEvent> {
|
||||
|
||||
private final ConfigurableEnvironment environment;
|
||||
|
||||
public SmsNotificationProducer(@Autowired RocketMQTemplate rocketMQTemplate, @Autowired ConfigurableEnvironment environment) {
|
||||
super(rocketMQTemplate);
|
||||
this.environment = environment;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected BaseSendExtendDTO buildBaseSendExtendParam(MessageNotifyEvent messageSendEvent) {
|
||||
return BaseSendExtendDTO.builder()
|
||||
.eventName("短信通知发送")
|
||||
.keys(String.valueOf(messageSendEvent.getNotificationId()))
|
||||
.topic(environment.resolvePlaceholders(MerchantAdminRocketMQConstant.SMS_NOTIFICATION_TOPIC_KEY))
|
||||
.sentTimeout(3000L)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Message<?> buildMessage(MessageNotifyEvent messageSendEvent, BaseSendExtendDTO requestParam) {
|
||||
String keys = StrUtil.isEmpty(requestParam.getKeys()) ? UUID.randomUUID().toString() : requestParam.getKeys();
|
||||
return MessageBuilder
|
||||
.withPayload(new MessageWrapper(keys, messageSendEvent))
|
||||
.setHeader(MessageConst.PROPERTY_KEYS, keys)
|
||||
.setHeader(MessageConst.PROPERTY_TAGS, requestParam.getTag())
|
||||
.build();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 补充说明
|
||||
|
||||
为了使上述示例能够完整运行,还需要定义以下类:
|
||||
|
||||
1. `MessageNotifyEvent` - 短信通知事件类:
|
||||
|
||||
```java
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
@Builder
|
||||
public class MessageNotifyEvent {
|
||||
private Long notificationId;
|
||||
private String phoneNumber;
|
||||
private String messageContent;
|
||||
private String sender;
|
||||
}
|
||||
```
|
||||
|
||||
2. `MessageWrapper` - 消息包装类:
|
||||
|
||||
```java
|
||||
@Data
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class MessageWrapper {
|
||||
private String key;
|
||||
private Object payload;
|
||||
|
||||
public MessageWrapper(String key, Object payload) {
|
||||
this.key = key;
|
||||
this.payload = payload;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
3. `SmsNotificationConsumer` - 短信通知消费者类:
|
||||
|
||||
```java
|
||||
@Slf4j
|
||||
@Component
|
||||
@RocketMQMessageListener(
|
||||
topic = MerchantAdminRocketMQConstant.SMS_NOTIFICATION_TOPIC_KEY,
|
||||
consumerGroup = MerchantAdminRocketMQConstant.SMS_NOTIFICATION_CONSUMER_GROUP
|
||||
)
|
||||
public class SmsNotificationConsumer implements RocketMQListener<MessageWrapper> {
|
||||
|
||||
@Override
|
||||
public void onMessage(MessageWrapper messageWrapper) {
|
||||
MessageNotifyEvent event = (MessageNotifyEvent) messageWrapper.getPayload();
|
||||
log.info("[短信消费者] 收到短信通知消息 - ID: {}, 手机号: {}, 内容: {}, 发送者: {}",
|
||||
event.getNotificationId(),
|
||||
event.getPhoneNumber(),
|
||||
event.getMessageContent(),
|
||||
event.getSender());
|
||||
|
||||
// 模拟发送短信
|
||||
System.out.println("[短信服务] 向 " + event.getPhoneNumber() + " 发送短信: " + event.getMessageContent());
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
通过以上完整的代码示例,我们可以看到模板方法模式在MQ消息发送场景中的应用:
|
||||
- `AbstractCommonSendProduceTemplate` 定义了消息发送的通用流程(模板方法)
|
||||
- `SmsNotificationProducer` 实现了具体的业务逻辑(构建参数和消息)
|
||||
- `SmsNotificationConsumer` 监听消息并处理
|
||||
- 这样既保证了消息发送流程的一致性,又允许不同业务场景自定义具体实现
|
||||
Reference in New Issue
Block a user