Skip to content

Commit fdab6e4

Browse files
committed
redis 实现延时队列demo
1 parent bfc3fbe commit fdab6e4

File tree

11 files changed

+254
-6
lines changed

11 files changed

+254
-6
lines changed

pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
<dependency>
3838
<groupId>com.alibaba</groupId>
3939
<artifactId>fastjson</artifactId>
40-
<version>1.2.45</version>
4140
</dependency>
4241
<dependency>
4342
<groupId>org.springframework.boot</groupId>
@@ -56,6 +55,16 @@
5655
</dependency>
5756
</dependencies>
5857

58+
<dependencyManagement>
59+
<dependencies>
60+
<dependency>
61+
<groupId>com.alibaba</groupId>
62+
<artifactId>fastjson</artifactId>
63+
<version>1.2.45</version>
64+
</dependency>
65+
</dependencies>
66+
</dependencyManagement>
67+
5968
<build>
6069
<pluginManagement>
6170
<plugins>

spring-case/124-redis-sitecount/src/main/java/com/git/hui/boot/redis/site/core/VisitService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public class VisitService {
2323

2424
/**
2525
* 获取pv
26-
*
26+
* <p>
2727
* pv存储结果为hash,一个应用一个key; field 为uri; value为pv
2828
*
2929
* @return null表示首次有人访问;这个时候需要+1
@@ -84,6 +84,8 @@ public Long doInRedis(RedisConnection connection) throws DataAccessException {
8484

8585

8686
/**
87+
* fixme 这个算法有误, 如 192.1.2.3 何 192.3.0.1 两个ip访问了, 那么也会将 192.3.2.1 判定为访问过; 使用布隆过滤器或者hyperloglog来替换
88+
*
8789
* 判断ip今天是否访问过
8890
* 采用bitset来判断ip是否有访问,key由app与uri唯一确定
8991
*
Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
## 125-redis-distribute-lock
1+
## 126-redis-delay-list
22

3-
redis分布式锁
3+
基于redis zset实现的一个非重复的延迟队列
44

55
相关技术文档
6-
7-
- [【DB系列】Redis实现分布式锁(应用篇)](https://spring.hhui.top/spring-blog/2020/10/30/201030-SpringBoot%E7%B3%BB%E5%88%97%E6%95%99%E7%A8%8BRedis%E5%AE%9E%E7%8E%B0%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%EF%BC%88%E5%BA%94%E7%94%A8%E7%AF%87%EF%BC%89/)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
.*
2+
target/*
3+
*.iml
4+
!.gitignore
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>spring-case</artifactId>
7+
<groupId>com.git.hui.boot</groupId>
8+
<version>0.0.1-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>126-redis-delay-list</artifactId>
13+
14+
<dependencies>
15+
<dependency>
16+
<groupId>org.springframework.boot</groupId>
17+
<artifactId>spring-boot-starter-data-redis</artifactId>
18+
</dependency>
19+
20+
<dependency>
21+
<groupId>com.alibaba</groupId>
22+
<artifactId>fastjson</artifactId>
23+
</dependency>
24+
<dependency>
25+
<groupId>org.aspectj</groupId>
26+
<artifactId>aspectjweaver</artifactId>
27+
</dependency>
28+
</dependencies>
29+
</project>
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
## 125-redis-distribute-lock
2+
3+
redis分布式锁
4+
5+
相关技术文档
6+
7+
- [【DB系列】Redis实现分布式锁(应用篇)](https://spring.hhui.top/spring-blog/2020/10/30/201030-SpringBoot%E7%B3%BB%E5%88%97%E6%95%99%E7%A8%8BRedis%E5%AE%9E%E7%8E%B0%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81%EF%BC%88%E5%BA%94%E7%94%A8%E7%AF%87%EF%BC%89/)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package com.git.hui.boot.redis.list;
2+
3+
import com.git.hui.boot.redis.list.component.Consumer;
4+
import com.git.hui.boot.redis.list.component.RedisDelayListWrapper;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.boot.SpringApplication;
7+
import org.springframework.boot.autoconfigure.SpringBootApplication;
8+
import org.springframework.scheduling.annotation.EnableScheduling;
9+
import org.springframework.web.bind.annotation.GetMapping;
10+
import org.springframework.web.bind.annotation.RestController;
11+
12+
import java.time.LocalDateTime;
13+
import java.util.Random;
14+
15+
/**
16+
* @author wuzebang
17+
* @date 2021/5/7
18+
*/
19+
@EnableScheduling
20+
@RestController
21+
@SpringBootApplication
22+
public class Application {
23+
private static final String TEST_DELAY_QUEUE = "test";
24+
private static final String DEMO_DELAY_QUEUE = "demo";
25+
@Autowired
26+
private RedisDelayListWrapper redisDelayListWrapper;
27+
28+
private Random random = new Random();
29+
30+
public static void main(String[] args) {
31+
SpringApplication.run(Application.class);
32+
}
33+
34+
@GetMapping(path = "publish")
35+
public String publish(String msg, Long delayTime) {
36+
if (delayTime == null) {
37+
delayTime = 10_000L;
38+
}
39+
40+
String queue = random.nextBoolean() ? TEST_DELAY_QUEUE : DEMO_DELAY_QUEUE;
41+
msg = queue + "#" + msg + "#" + (System.currentTimeMillis() + delayTime);
42+
redisDelayListWrapper.publish(queue, msg, delayTime);
43+
System.out.println("延时: " + delayTime + "ms后消费: " + msg + " now:" + LocalDateTime.now());
44+
return "success!";
45+
}
46+
47+
48+
@Consumer(topic = TEST_DELAY_QUEUE)
49+
public void consumer(RedisDelayListWrapper.DelayMsg delayMsg) {
50+
System.out.println("TEST消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());
51+
}
52+
53+
@Consumer(topic = DEMO_DELAY_QUEUE)
54+
public void consumerDemo(RedisDelayListWrapper.DelayMsg delayMsg) {
55+
System.out.println("DEMO消费延时消息: " + delayMsg + " at:" + System.currentTimeMillis());
56+
}
57+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.git.hui.boot.redis.list.component;
2+
3+
import org.springframework.context.event.EventListener;
4+
5+
import java.lang.annotation.*;
6+
7+
/**
8+
* @author wuzebang
9+
* @date 2021/5/7
10+
*/
11+
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
12+
@Retention(RetentionPolicy.RUNTIME)
13+
@Documented
14+
@EventListener
15+
public @interface Consumer {
16+
String topic();
17+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package com.git.hui.boot.redis.list.component;
2+
3+
import org.aspectj.lang.ProceedingJoinPoint;
4+
import org.aspectj.lang.annotation.Around;
5+
import org.aspectj.lang.annotation.Aspect;
6+
import org.springframework.stereotype.Component;
7+
8+
/**
9+
* @author wuzebang
10+
* @date 2021/5/7
11+
*/
12+
@Aspect
13+
@Component
14+
public class ConsumerAspect {
15+
16+
@Around("@annotation(consumer)")
17+
public Object around(ProceedingJoinPoint joinPoint, Consumer consumer) throws Throwable {
18+
Object[] args = joinPoint.getArgs();
19+
boolean check = false;
20+
for (Object obj : args) {
21+
if (obj instanceof RedisDelayListWrapper.DelayMsg) {
22+
check = consumer.topic().equals(((RedisDelayListWrapper.DelayMsg) obj).getTopic());
23+
}
24+
}
25+
26+
if (!check) {
27+
// 不满足条件,直接忽略
28+
return null;
29+
}
30+
31+
// topic匹配成功,执行
32+
return joinPoint.proceed();
33+
}
34+
35+
}
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package com.git.hui.boot.redis.list.component;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import lombok.Getter;
5+
import lombok.ToString;
6+
import org.springframework.beans.BeansException;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.context.ApplicationContext;
9+
import org.springframework.context.ApplicationContextAware;
10+
import org.springframework.context.ApplicationEvent;
11+
import org.springframework.data.redis.core.StringRedisTemplate;
12+
import org.springframework.scheduling.annotation.Scheduled;
13+
import org.springframework.stereotype.Component;
14+
import org.springframework.util.CollectionUtils;
15+
16+
import java.util.Set;
17+
import java.util.concurrent.CopyOnWriteArraySet;
18+
19+
/**
20+
* @author wuzebang
21+
* @date 2021/5/7
22+
*/
23+
@Component
24+
public class RedisDelayListWrapper implements ApplicationContextAware {
25+
private static final Long DELETE_SUCCESS = 1L;
26+
@Autowired
27+
private StringRedisTemplate redisTemplate;
28+
29+
private Set<String> topic = new CopyOnWriteArraySet<>();
30+
31+
public void publish(String key, Object val, long delayTime) {
32+
topic.add(key);
33+
String strVal = val instanceof String ? (String) val : JSONObject.toJSONString(val);
34+
35+
redisTemplate.opsForZSet().add(key, strVal, System.currentTimeMillis() + delayTime);
36+
}
37+
38+
public String fetchOne(String key) {
39+
Set<String> sets = redisTemplate.opsForZSet().rangeByScore(key, 0, System.currentTimeMillis(), 0, 3);
40+
if (CollectionUtils.isEmpty(sets)) {
41+
return null;
42+
}
43+
44+
for (String val : sets) {
45+
if (DELETE_SUCCESS.equals(redisTemplate.opsForZSet().remove(key, val))) {
46+
// 删除成功,表示抢占到
47+
return val;
48+
}
49+
}
50+
return null;
51+
}
52+
53+
@Scheduled(fixedRate = 10_000)
54+
public void schedule() {
55+
for (String specialTopic : topic) {
56+
String cell = fetchOne(specialTopic);
57+
if (cell != null) {
58+
applicationContext.publishEvent(new DelayMsg(this, cell, specialTopic));
59+
}
60+
}
61+
}
62+
63+
private ApplicationContext applicationContext;
64+
65+
@Override
66+
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
67+
this.applicationContext = applicationContext;
68+
}
69+
70+
@ToString
71+
public static class DelayMsg extends ApplicationEvent {
72+
@Getter
73+
private String msg;
74+
@Getter
75+
private String topic;
76+
77+
/**
78+
* Create a new {@code ApplicationEvent}.
79+
*
80+
* @param source the object on which the event initially occurred or with
81+
* which the event is associated (never {@code null})
82+
*/
83+
public DelayMsg(Object source, String msg, String topic) {
84+
super(source);
85+
this.msg = msg;
86+
this.topic = topic;
87+
}
88+
}
89+
}

0 commit comments

Comments
 (0)