Redis项目实战

短信验证码

发送验证码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
    /**
* 发送手机验证码
*/
@PostMapping("code")
public Result sendCode(@RequestParam("phone") String phone, HttpSession session) {
// TODO 发送短信验证码并保存验证码
// 验证手机号是否合法
boolean isPhoneInvalid = RegexUtils.isPhoneInvalid(phone);
if(isPhoneInvalid){
return Result.fail("请输入正确手机号");
}

// 获取code,并保存session
String code = userService.getCode(phone);
session.setAttribute(phone,code);
// 模拟发送验证码
log.debug("验证码:{}",code);

return Result.ok();
}

@Override
public String getCode(String phone) {

/**
* @Description:
* @Params: [phone]
* 登录的手机号
* @Return java.lang.String
* 生成的验证码
*/
// 调用hutool的工具,随机生成6位验证码
String code = RandomUtil.randomNumbers(6);

return code;
}

校验验证码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 登录功能
* @param loginForm 登录参数,包含手机号、验证码;或者手机号、密码
*/
@PostMapping("/login")
public Result login(@RequestBody LoginFormDTO loginForm, HttpSession session){
// TODO 实现登录功能

return userService.login(loginForm,session);
}


public Result login(LoginFormDTO loginForm, HttpSession session) {

/**
* @Description:
* @Params: [loginForm, session]
* 前端传入的登录信息,还有服务端保存的session
* @Return java.lang.String
* 返回信息
*/
// 获取session并校验手机号
String phone = loginForm.getPhone();
// 获取session中的code
Object code = session.getAttribute(phone);
if(code == null){
return Result.fail("手机号不一致");
}else{
code = code.toString();
}
// 获取前端输入的验证码
String inCode = loginForm.getCode();

if(!inCode.equals(code)){
return Result.fail("验证码有误");
}
// 判断用户是否存在
LambdaQueryWrapper<User> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(User::getPhone,phone);
User one = iUserService.getOne(queryWrapper);

if(one == null){
// 不存在则创建用户
User user = new User();
user.setPhone(phone);
// 设置昵称
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
iUserService.save(user);

}else{
// do nothing
}
// 保存session
User u = iUserService.getOne(queryWrapper);
UserDTO userDTO = new UserDTO(u.getId(), u.getNickName(), u.getIcon());
session.setAttribute("user",userDTO);

return Result.ok();
}

登录拦截器

新建拦截器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

// 如果请求不是映射到方法直接通过
if(!(handler instanceof HandlerMethod)){
return true;
}
// 获取session
HttpSession session = request.getSession();
// 从session获得用户
Object user = session.getAttribute("user");

if(user == null){
// 不存在,直接拦截
throw new RuntimeException("请先登录!");
}else{
// 存在则保存到用户进程
UserHolder.saveUser((User)user);
return true;
}
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {

UserHolder.removeUser();

}
}

从session中获取登录信息,如果已登录则session中会存在user

配置拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Configuration
public class MvcConfig implements WebMvcConfigurer {

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
.excludePathPatterns(
"/user/code",
"/user/login",
"/blog/hot",
"/shop/**",
"shop-type/**",
"/upload/**",
"/voucher/**"
);
}
}

集群session共享问题

问题描述

当处在集群项目中时会出现下面这种情况:
当成功访问负责登录功能的服务器时,成功将session保存在该服务器。但在访问其他功能时,由于所处服务器不同,目标服务器并没有保存了我们登录信息的session,我们会被拦截器果断“踢”下线,但要让我们每个服务器都登录一次,这显然是不合理的。

这时候就需要使用radis作为一个公共的数据库

image-20240302232148785

redis代替session

将登录信息存到redis,这样请求每个服务器的资源时,只需要访问redis服务器根据key获取信息,就可以判断用户是否登录了

1
2
3
4
5
//        获取code,并保存session
String code = userService.getCode(phone);
// 保存session到redis,key是前缀加电话号码的形式,加前缀是为了防止不同业务之间可能造成覆盖,
// 过期时间还有前缀都是使用常量,很优雅
redisTemplate.opsForValue().set(LOGIN_CODE_KEY + phone,code,LOGIN_CODE_TTL, TimeUnit.MINUTES);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    @Override
public Result login(LoginFormDTO loginForm, HttpSession session) {

/**
* @Description:
* @Params: [loginForm, session]
* 前端传入的登录信息,还有服务端保存的session
* @Return java.lang.String
* 返回信息
*/
//// 获取session并校验手机号
// String phone = loginForm.getPhone();
// // 获取session中的code
// Object code = session.getAttribute(phone);
// 获取手机号
String phone = loginForm.getPhone();
String code = redisTemplate.opsForValue().get(LOGIN_CODE_KEY + phone);

// 通过radis获取code
if(code == null){
return Result.fail("手机号不一致");
}
// 获取前端输入的验证码
String inCode = loginForm.getCode();

if(!inCode.equals(code)){
return Result.fail("验证码有误");
}
// 判断用户是否存在
LambdaQueryWrapper<User> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(User::getPhone,phone);
User one = iUserService.getOne(queryWrapper);

if(one == null){
// 不存在则创建用户
User user = new User();
user.setPhone(phone);
// 设置昵称
user.setNickName(USER_NICK_NAME_PREFIX + RandomUtil.randomString(10));
iUserService.save(user);

}else{
// do nothing
}
// 保存session
User u = iUserService.getOne(queryWrapper);

// UserDTO userDTO = new UserDTO(u.getId(), u.getNickName(), u.getIcon());
UserDTO userDTO = BeanUtil.copyProperties(u,UserDTO.class);
// session.setAttribute("user",userDTO);

// 生成随机token,作为令牌,还有redis中的key
String token = UUID.randomUUID().toString(true);
String tokenkey = LOGIN_USER_KEY + token; //也使用常量拼装的方式

// 将对象转为map格式
Map<String, Object> map = BeanUtil.beanToMap(userDTO);
// 将map中的值全变string,否则在使用stringredis存储时会报类型转换错误
map.forEach((key, value) -> {
if (null != value) map.put(key, String.valueOf(value));
});
// 存储到redis
redisTemplate.opsForHash().putAll(tokenkey,map);
// 设置有效期
redisTemplate.expire(tokenkey,LOGIN_USER_TTL, TimeUnit.MINUTES);

// token返回前端,作为令牌
return Result.ok(token);
}

这里有个坑,由于我们用的是StringRedisTemplate,在存储非string时会自动转换,这时可能会出现转换错误(Long无法转换为String),因此需要提前对map处理一下

1
2
3
4
//      将map中的值全变string,否则在使用stringredis存储时会报类型转换错误
map.forEach((key, value) -> {
if (null != value) map.put(key, String.valueOf(value));
});

拦截器登录验证

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//        获取token
String token = request.getHeader("authorization");

if(StrUtil.isBlank(token)){
// token不存在,说明未登录
throw new RuntimeException("未携带token");
}
// 基于token,获取redis中的数据
String key = LOGIN_USER_KEY + token;
Map<Object, Object> user = redisTemplate.opsForHash().entries(key);


// 转换map为对象,然后存储
UserDTO userDTO = BeanUtil.fillBeanWithMap(user, new UserDTO(), false);
// 存在则保存到用户进程
log.debug(userDTO.toString());
UserHolder.saveUser(userDTO);
// 重置有效期
redisTemplate.expire(key,36000L, TimeUnit.MINUTES);

优化拦截器

目前我们的拦截器只对非一些需要登录校验的网页进行拦截,同时延长他们token的有效时间,但是如果出现用户一直访问一些不需要校验的,那就一直没有进行拦截并延长有效期。其实延长token的有效期应该是用户每次访问都要延长,不论访问的是什么资源,因此我们需要再加一个拦截器,用来拦截全部请求,并在这个拦截器里面延长token有效期,而在login拦截器中只进行登录校验。

拦截器一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
public class RefreshTokenInterceptor implements HandlerInterceptor {

@Autowired
private StringRedisTemplate redisTemplate;
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
// 如果请求不是映射到方法直接通过
if(!(handler instanceof HandlerMethod)){
return true;
}
// 获取token
String token = request.getHeader("authorization");

if(StrUtil.isBlank(token)){
// token不存在,说明未登录
// throw new RuntimeException("未携带token");
return true;
}
// 基于token,获取redis中的数据
String key = LOGIN_USER_KEY + token;
Map<Object, Object> user = redisTemplate.opsForHash().entries(key);

if(user.isEmpty()){
// 不存在,直接拦截
// throw new RuntimeException("请先登录!");
return true;
}else{
// 转换map为对象,然后存储
UserDTO userDTO = BeanUtil.fillBeanWithMap(user, new UserDTO(), false);
// 存在则保存到用户进程
log.debug(userDTO.toString());
UserHolder.saveUser(userDTO);
// 重置有效期
redisTemplate.expire(key,36000L, TimeUnit.MINUTES);
return true;
}
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
UserHolder.removeUser();
}
}

拦截器二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Slf4j
public class LoginInterceptor implements HandlerInterceptor {


//
// public LoginInterceptor(StringRedisTemplate redisTemplate) {
// this.redisTemplate = redisTemplate;
// }

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

// 如果请求不是映射到方法直接通过
if(!(handler instanceof HandlerMethod)){
return true;
}

if(UserHolder.getUser() == null){
throw new RuntimeException("请先登录");
}else{
return true;
}
}

@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {

UserHolder.removeUser();

}
}

新加入的拦截器没有拦截功能,但通过逻辑判断出未登录,则不会向UserHandle中添加用户进程,这样如果用户要非法访问一些需要登录验证得地址,那么会在第二个拦截器被拦下,如果用户本身就要访问一些不需要验证的,那么第一个拦截器放行后,并不会进入第二个拦截器(因为第二个拦截器规定了拦截地址)

image-20240303135930012

加入redis缓存

手动实现缓存无非就是以下几点步骤:

  1. 根据key查询缓存
  2. 如果缓存中存在数据,则直接返回
  3. 不存在则在数据库中查询
  4. 查询不到就报错
  5. 查询到信息则保存到redis
  6. 返回数据

店铺信息缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
    public Shop queryById(Long id) {

String key = CACHE_SHOP_KEY + id;
// 查询店铺缓存
Map<Object, Object> map = redisTemplate.opsForHash().entries(key);

if(!map.isEmpty()){
// 命中则直接返回
Shop shop = BeanUtil.fillBeanWithMap(map, new Shop(), false);
return shop;
}
// 未命中,则查询数据库
Shop shop = getById(id);

if(shop == null){
// 数据在数据库中不存在,则返回错误
throw new RuntimeException("店铺信息有误");
}
// 将数据保存到缓存
Map<String, Object> mapshop = BeanUtil.beanToMap(shop);
mapshop.forEach((k,value) ->{
if (null != value) mapshop.put(k, String.valueOf(value));
});
redisTemplate.opsForHash().putAll(key,mapshop);
redisTemplate.expire(key,CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 返回
return shop;
}

实现缓存与数据库一致

简单实现:
在进行更新数据库操作时,在数据库更新后就删除缓存中的数据,这样在下次访问时会将新的数据放入缓存。

1
2
3
4
5
6
7
8
9
10
@Override
@Transactional
public void update(Shop shop) {

updateById(shop);
Long id = shop.getId();

redisTemplate.delete(CACHE_SHOP_KEY + id);

}

缓存热点问题

缓存穿透

当用户查询一个不会存在的数据,比如null,那该请求一定会绕过redis然后直接访问数据库,当这样的请求足够多时,可能会导致数据库崩溃。

解决方案一:缓存空对象

即当在数据库中查找一个为null的对象时,数据库返回一个空对象,然后将此空对象存入缓存,这样短期内再收到类似请求时,就会在缓存中命中。

优点:实现简单,维护方便

缺点:额外内存消耗,可能造成短期的不一致

解决方案二:布隆过滤

即在客户端和缓存之间加一层过滤器,用以拦截数据库中不存在的数据,它是基于某种哈希算法将数据库中的数据存在与否存储中。

优点:内存占用少,没有多余key

缺点:实现较为复杂,且存在误判

缓存雪崩

缓存雪崩指的是在短时间内,大量的缓存失效或者是redis宕机,导致大量的请求直接到达数据库,带来巨大压力

解决方案:

  1. 给不同的key添加不同的有效时间
  2. 使用redis集群
  3. 缓存业务降级策略:就是当发生事故时,可以限制请求数量或者干脆拒绝请求,以此来保护数据库。
  4. 多级缓存

缓存击穿

又称热点Key问题,是一个被高并发访问并且缓存重建业务较复杂的key失效了。

方案一:互斥锁

image-20240303190741313

即当发生缓存未命中的情况时,第一个发现的进程请求到互斥锁,然后进行缓存重建,在这个过程中如果有其他请求到来也想重建进程,由于互斥锁的原因,后来的进程只能进行循环等待。

优点:省内存,保证一致性,实现简单

缺点:线程等待,性能受影响,有死锁风险

方案二:逻辑过期

image-20240303191038492

首先这种方法不会出现缓存击穿的问题,因为缓存不会自动销毁,所以除了第一次访问,其他一定会命中。当请求时发现需要的数据已经过期,那么该进程会获得互斥锁并调用一个单独的进程来更新缓存,这样自己也无需等待,直接返回旧的数据。(类似于抢票时,虽然用户还能看到有票,但是实际上已经没票了,用户看到的是旧数据)

优点:性能好

缺点:不保证一致性,有额外内存消耗,实现较复杂

缓存击穿案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
    public Shop hcjc(Long id){

String key = CACHE_SHOP_KEY + id;
String lock_key = LOCK_SHOP_KEY + id;
boolean t = true;

try{
while(t){
// 死循环等待
// 查询店铺缓存
Map<Object, Object> map = redisTemplate.opsForHash().entries(key);

// if(map == null){
// throw new RuntimeException("命中空数据");
// }


if(!map.isEmpty()){
// 命中则直接返回
Shop shop = BeanUtil.fillBeanWithMap(map, new Shop(), false);
return shop;
}
// 未命中,则获取互斥锁

boolean tag = tryLock(lock_key);

if(!tag){
// 获取互斥锁失败,休眠一段时间,循环等待
Thread.sleep(50);
}else{
t = false;
}
}
// 二次验证
// 查询店铺缓存
Map<Object, Object> map = redisTemplate.opsForHash().entries(key);

// if(map.isEmpty()){
// throw new RuntimeException("命中空数据");
// }

if(!map.isEmpty()){
// 命中则直接返回
Shop shop = BeanUtil.fillBeanWithMap(map, new Shop(), false);
return shop;
}

Shop shop = getById(id);

if(shop == null){
// 数据在数据库中不存在,则返回错误
redisTemplate.opsForValue().set(key,null,CACHE_NULL_TTL,TimeUnit.MINUTES);

throw new RuntimeException("店铺信息有误");
}
// 将数据保存到缓存
Map<String, Object> mapshop = BeanUtil.beanToMap(shop);
mapshop.forEach((k,value) ->{
if (null != value) mapshop.put(k, String.valueOf(value));
});
redisTemplate.opsForHash().putAll(key,mapshop);
redisTemplate.expire(key,CACHE_SHOP_TTL, TimeUnit.MINUTES);
// 返回
return shop;
}catch (Exception e){
throw new RuntimeException(e);
}finally {
unLock(lock_key);
}

}

秒杀业务

唯一ID

Redis自增

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Component
public class RedisIdWorker {

@Resource
private StringRedisTemplate stringRedisTemplate;
// 开始时间戳
private static final long BEGIN_TIMESTAMP = 1640995200L;

// 序列号位数
private static final int COUNT_BITS = 32;



public long nextId(String keyPrefix){
// 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 生成序列号
// 获取当前日期,作为序列号的一部分
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 自增长
Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 拼接返回
// 采用时间戳左移序列号位数,然后用序列号和时间戳或运算
return timestamp << COUNT_BITS | count;
}


}

实现基本下单功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
    @Resource
private ISeckillVoucherService iSeckillVoucherService;

@Resource
private RedisIdWorker redisIdWorker;

@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
/**
* @Description: 优惠卷秒杀
* @Params: [voucherId]
* @Return com.hmdp.dto.Result
*/

SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
if(voucher == null){
throw new RuntimeException("优惠券不存在!");
}

// 判断是否处于秒杀时间段
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀未开始");
}
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已结束");
}
// 判断是否售空
if(voucher.getStock() < 1){
return Result.fail("优惠劵不足");
}
// 优惠券数量减1
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).update();

if(!success){
// 减1失败
return Result.fail("优惠券不足");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 获得唯一ID
long order_id = redisIdWorker.nextId("order");
voucherOrder.setId(order_id); // 订单id来自idWorker生成唯一ID
voucherOrder.setVoucherId(voucherId); // 优惠券id来自传入的id
voucherOrder.setUserId(UserHolder.getUser().getId()); // userid来自用户进程
// 保存order
save(voucherOrder);

return Result.ok(order_id);

}

解决超卖问题

乐观锁:在更新时判断是否有线程也修改了数据

方法一:版本号法

给数据库中的数据添加版本字段,要修改数据库时,先查询版本号,然后在修改数据库时,对比此时的版本号和数据库里的版本号是否一致,一致再进行修改。

方法二:CAS法

利用库存数量作为版本号,只要在每次更新时判断stock是否大于0即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
    @Override
@Transactional
public Result seckillVoucher(Long voucherId) {
/**
* @Description: 优惠卷秒杀
* @Params: [voucherId]
* @Return com.hmdp.dto.Result
*/

SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
if(voucher == null){
throw new RuntimeException("优惠券不存在!");
}
Integer stock = voucher.getStock();
// 判断是否处于秒杀时间段
if(voucher.getBeginTime().isAfter(LocalDateTime.now())){
return Result.fail("秒杀未开始");
}
if(voucher.getEndTime().isBefore(LocalDateTime.now())){
return Result.fail("秒杀已结束");
}
// 判断是否售空
if(stock < 1){
return Result.fail("优惠劵不足");
}
// 优惠券数量减1
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();

if(!success){
// 减1失败
return Result.fail("优惠券不足");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 获得唯一ID
long order_id = redisIdWorker.nextId("order");
voucherOrder.setId(order_id); // 订单id来自idWorker生成唯一ID
voucherOrder.setVoucherId(voucherId); // 优惠券id来自传入的id
voucherOrder.setUserId(UserHolder.getUser().getId()); // userid来自用户进程
// 保存order
save(voucherOrder);

return Result.ok(order_id);

}

单机限制购买

实现一人一单,最简单的思路是每次下单之前先判断订单表里是否已经存在该用户的订单,但是这样无法解决并发问题。

乐观锁无法判断该在何时进行判断是否加锁,只能选择悲观锁,即在进行下单操作时,就根据登录用户的id进行加锁。

1
2
3
4
5
Long id = UserHolder.getUser().getId();
synchronized (id.toString().intern()){
IVoucherOrderService o = (IVoucherOrderService) AopContext.currentProxy();
return o.createVoucherOrder(voucherId);
}

synchronized (id.toString().intern()) 根据用户id作为加锁的参数,此处有细节,不能直接调用toString,因为toString的底层在每次访问id.toString都是创建一个新的String对象,而intern()方法会直接从常量池里取值,而不是创建一个新String。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    @Transactional
public Result createVoucherOrder(Long voucherId) {

Long id = UserHolder.getUser().getId();
int count = query().eq("user_id", id).eq("voucher_id", voucherId).count();
if(count > 0){
return Result.fail("请勿重复下单");
}
// 优惠券数量减1
boolean success = iSeckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock",0)
.update();

if(!success){
// 减1失败
return Result.fail("优惠券不足");
}
// 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 获得唯一ID
long order_id = redisIdWorker.nextId("order");
voucherOrder.setId(order_id); // 订单id来自idWorker生成唯一ID
voucherOrder.setVoucherId(voucherId); // 优惠券id来自传入的id
voucherOrder.setUserId(id); // userid来自用户进程
// 保存order
save(voucherOrder);

return Result.ok(order_id);
}

把下单业务封装成一个方法,方便进行加锁

1
2
3
4
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
1
@EnableAspectJAutoProxy(exposeProxy = true)

导入新的依赖,并在启动类默认启动

分布式锁

分布式锁满足在分布式系统或者集群系统下多进程可见且互斥的锁

主流做法是利用redis的setnx命令,setnx作用简单说就是当key不存在时才可以set成功,若key已存在则失败,这样在并发请求锁时,只有第一个请求的才能成功,释放锁时只需要删除这条key即可,安全性可以通过设置其过期时间来保证安全

setnx [key] [value]

set [key] [value] EX [seconds] NX //原子操作,设置锁的同时设置时间,防止因为宕机导致锁不释放

成功加锁返回1,失败返回0

分布式锁实现多线程限制购买

锁接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ILock {
/**
* @Description: 此乃获取分布式锁的接口
* @Params:
* @Return
*/

// 获取锁
boolean tryLock(long timeSec);

// 释放锁
void unLock();
}

分布式锁类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class SimpleRedisLock implements ILock {


private StringRedisTemplate stringRedisTemplate;
// 锁前缀
// 锁名
private static final String LOCK_PREFIX = "Lock:";
private String name;

public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}

@Override
public boolean tryLock(long timeSec) {
// 获取当前进程id
long id = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + name, id + "", timeSec, TimeUnit.SECONDS);

return BooleanUtil.isTrue(success);
}

@Override
public void unLock() {
stringRedisTemplate.delete(LOCK_PREFIX + name);
}
}

修改原来的加锁逻辑,改用分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
//        获取锁
boolean isLock = lock.tryLock(120);
if(!isLock){
// 取锁失败
return Result.fail("服务器错误,请重试");
}
try{
IVoucherOrderService o = (IVoucherOrderService) AopContext.currentProxy();
return o.createVoucherOrder(voucherId);
}finally {
// 万一有异常,可以自动释放锁
lock.unLock();
}

从代码可以看出,如果获取锁失败,会返回服务器错误,只要返回这条信息,就可以确认我们的分布式锁功能是正常的。

分布式锁测试

首先在启动两个服务端,记得解决一下端口冲突

image-20240306112545976

利用apifox并发执行两条下单操作,用户的token保持一致代表同一用户

image-20240306112654582

image-20240306112713310

运行测试

image-20240306112915074

可以看到一个成功一个失败,失败请求的信息为

image-20240306113013190

成功请求自然就是订单号了

image-20240306113040465

数据库的订单也只增加了一条,测试成功

image-20240306113122958

解决锁误删问题

解决方法是每把锁里面的value都添加一段uuid,然后在删除前判断是否是自己的锁

1
2
private static final String ID_PREFIX = UUID.randomUUID().toString(true);
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(LOCK_PREFIX + name, id, timeSec, TimeUnit.SECONDS);
1
2
3
4
5
6
7
8
9
public void unLock() {
// 获取当前进程id
String id = ID_PREFIX + Thread.currentThread().getId();
String s = stringRedisTemplate.opsForValue().get(LOCK_PREFIX + name);
if(id.equals(s)){
stringRedisTemplate.delete(LOCK_PREFIX + name);
}

}

解决删锁操作非原子性问题(lua脚本)

之前的释放锁逻辑不安全,如果发生阻塞,可能会导致误删,因此最好是将解锁操作整合成一个原子操作

redis执行一个lua脚本

脚本内容:

1
2
3
4
5
6
7
8
-- 获取锁中的value
local id = redis.call('get',KEYS[1])
-- 比较该id和传入id是否一致,一致则删除
if(id == ARGV[1]) then
-- 释放锁
return redis.call('del',KEYS[1])
end
return 0

改进代码:

1
2
3
4
5
6
7
8
//    redis脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static{
// 静态代码块,随类一起加载执行,防止浪费IO资源
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("delLock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
    public void unLock() {

// 调用redis脚本
stringRedisTemplate.execute(
/**
* @Description: 执行redis脚本
* @Params: [脚本对象,keys(Collection),args]
* @Return void
*/
UNLOCK_SCRIPT,
Collections.singletonList(LOCK_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId()
);
}

Redisson

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。

简而言之,redisson中提供了很多实用的方法,可以简化开发。

引入

pom:

1
2
3
4
5
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.19.1</version>
</dependency>

配置Redisson

1
2
3
4
5
6
7
8
9
10
11
12
13
//Redisson 配置
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient(){
// 配置
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
// 创建对象
return Redisson.create(config);
}
}

利用Redisson获取锁

1
2
3
        RLock lock = redissonClient.getLock("lock:order:" + id);
// 获取锁
boolean isLock = lock.tryLock();

redissonClient.getLock 里面是锁的名字

lock.tryLock() 参数分别可以是:锁的最大等待时间(如果获取不到锁,期间会重试,锁的最大持续时间(默认30),持续时间类型(默认秒))

方便快捷地实现线程安全

秒杀业务优化

目前我们的秒杀业务由于整体是串行模式,其中判断用户是否有下单资格这一部分其实耗时比较少,用时比较长的是之后的数据库操作,包括加锁。

我们可以思考,如果将后面的数据库操作拆分出来,单独利用一个线程来实现,这样当我们判断用户有下单资格时,直接“通知”该线程让他执行,这样性能就提高了

将优惠券信息存入redis

如果想进一步提高判断用户下单资格这一步的效率,那么也应该减少数据库搜索,如果是进行redis缓存操作,那么效率比起数据库,肯定是要高很多的

新增优惠券的同时,将优惠券信息加入缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
    @Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);

// 优惠劵信息保存到缓存,方便之后进行资格验证
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(),voucher.getStock().toString());
}

redis消息队列实现异步秒杀

基于Stream的消息队列,消费者组

创建消费者组:
image-20240306201108622
key:队列名称
groupName:消费者组名称
ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
MKSTREAM:队列不存在时自动创建队列

删除指定的消费者组

1
XGROUP DESTORY key groupName

给指定的消费者组添加消费者

1
XGROUP CREATECONSUMER key groupname consumername

删除消费者组中的指定消费者

1
XGROUP DELCONSUMER key groupname consumername

从消费者组读取消息:

1
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
  • group:消费组名称
  • consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
  • count:本次查询的最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无需手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:

“>”:从下一个未消费的消息开始
其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

代码实现

创建线程池,用于存放处理消息的线程

1
2
3
4
5
6
7
//    线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

消息处理线程,线程里死循环读取消息队列,因为会堵塞,所以不必担心资源消耗

逻辑步骤

  1. stringRedisTemplate.opsForStream().read 读取消息队列中的消息
  2. 判断消息是否为空,为空则说明处理完毕
  3. 不为空则获取value: read.get(0) .getValue();
  4. 调用BeanUtil.fillBeanWithMap将value封装为对象,此处为订单对象
  5. 订单对象保存数据库
  6. 调用opsForStream().acknowledge 确认消息,将消息从队列中取出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//    订单处理线程
private class VoucherOrderHandler implements Runnable{

private String queueName = "streams.orders";
@Override
public void run() {

while(true){
try{
// 获取消息队列中的订单信息
List<MapRecord<String, Object, Object>> read = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 判断是否获取成功
if(read == null || read.isEmpty()){
// 获取失败,说明没待处理消息了,继续下次循环
continue;
}

// 获取成功执行下单业务
MapRecord<String, Object, Object> record = read.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 下单
handleVoucherOrder(voucherOrder);
// ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e){
throw new RuntimeException("处理错误订单");
}
}
}
}

这样就实现了异步更新

探店部分

探店笔记展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
    @Resource
private IUserService userService;

@Override
public Result queryBlogById(Long id) {
/**
* @Description: 根据id查询blog
* @Params: [id]
* @Return com.hmdp.dto.Result
*/

Blog blog = getById(id);
Long userId = blog.getUserId();
// 查询博文发布者的信息
User user = userService.getById(userId);

blog.setName(user.getNickName());
blog.setIcon(user.getIcon());

return Result.ok(blog);
}

在Blog另外封装了三个属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 用户图标
*/
@TableField(exist = false)
private String icon;
/**
* 用户姓名
*/
@TableField(exist = false)
private String name;
/**
* 是否点赞过了
*/
@TableField(exist = false)
private Boolean isLike;

用来在前端展示发布者的相关信息

点赞功能

基础的逻辑就是,如果用户没有点赞这篇文章,那就执行点赞业务,如果已经点过那就执行取消点赞,这两个业务时两个相反的过程。其中要实现的重点就是怎么判断用户已经点赞过了。

此处使用redis缓存,利用set结构,以blogid为key,用户id为value,这样每次只需要根据blogid判断用户是否在其中,即可判断是否点赞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
   public void likeBlog(Long id) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
Long userId = user.getId();
// 利用redis判断是否已点赞
String key = RedisConstants.BLOG_LIKED_KEY + id;
// Boolean b = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
// boolean isLiked = BooleanUtil.isTrue(b);
boolean isLiked = isLiked(id);

if(!isLiked){
// 未点赞,则更新数据库
boolean success = update().setSql("liked = liked + 1").eq("id", id).update();
if(success){
// 更新redis
stringRedisTemplate.opsForSet().add(key,userId.toString());
}
}else{
// 已点赞则取消点赞
// 更新数据库
boolean success = update().setSql("liked = liked - 1").eq("id",id).update();
if(success){
// 移除redis set
stringRedisTemplate.opsForSet().remove(key,userId.toString());
}

}

}

@Override
public boolean isLiked(Long blogId) {

Long userId = UserHolder.getUser().getId();

Boolean b = stringRedisTemplate.opsForSet().isMember(RedisConstants.BLOG_LIKED_KEY + blogId, userId.toString());

return BooleanUtil.isTrue(b);
}

利用sortedSet存储点赞信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
    @Override
public void likeBlog(Long id) {
// 获取登录用户
UserDTO user = UserHolder.getUser();
Long userId = user.getId();
// 利用redis判断是否已点赞
String key = RedisConstants.BLOG_LIKED_KEY + id;
// Boolean b = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
// boolean isLiked = BooleanUtil.isTrue(b);
boolean isLiked = isLiked(id);

if(!isLiked){
// 未点赞,则更新数据库
boolean success = update().setSql("liked = liked + 1").eq("id", id).update();
if(success){
// 更新redis
stringRedisTemplate.opsForZSet().add(key,userId.toString(),System.currentTimeMillis());
}
}else{
// 已点赞则取消点赞
// 更新数据库
boolean success = update().setSql("liked = liked - 1").eq("id",id).update();
if(success){
// 移除redis set
stringRedisTemplate.opsForSet().remove(key,userId.toString());
}

}

}

@Override
public boolean isLiked(Long blogId) {

Long userId = UserHolder.getUser().getId();

Double score = stringRedisTemplate.opsForZSet().score(RedisConstants.BLOG_LIKED_KEY + blogId, userId.toString());

return score != null;
}

获取点赞列表

  1. 利用range取前5条用户id
  2. 把取出来的用户id的string集合变为long集合
  3. 调用listbyids获得用户集合
  4. 将用户集合转为userdto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
    @Override
public Result getLikeList(Long id) {

// 根据blogid获得点赞用户的id集合
String key = RedisConstants.BLOG_LIKED_KEY + id;
Set<String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);

if(range == null || range.isEmpty()){
return Result.ok(Collections.emptyList());
}

// 获取点赞用户前5
List<Long> ids = range.stream().map(Long::valueOf).collect(Collectors.toList());
List<User> users = userService.listByIds(ids);
// 将用户集合转为userdto集合
List<UserDTO> list = users.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());

return Result.ok(list);
}

关注取关与判断是否关注

这一部分有点简单,不做解释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RestController
@RequestMapping("/follow")
public class FollowController {

@Resource
private IFollowService iFollowService;

@PutMapping("/{id}/{isFollow}")
public Result follow(@PathVariable("id")Long followed,@PathVariable("isFollow") boolean isFollow ){
iFollowService.follow(followed,isFollow);
return Result.ok();
}

@GetMapping("/or/not/{id}")
public Result isFollow(@PathVariable("id")Long writerId){
return Result.ok(iFollowService.isFollow(writerId));

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {

@Override
public void follow(Long followed, boolean isFollow) {
UserDTO user = UserHolder.getUser();
if(user == null){
throw new RuntimeException("请先登录!");
}
Long userId = user.getId();
// 关注
if(isFollow){
Follow follow = new Follow();
follow.setUserId(userId);
follow.setFollowUserId(followed);
// 保存
save(follow);
}else{
// 取关
QueryWrapper<Follow> followQueryWrapper = new QueryWrapper<>();
followQueryWrapper.eq("user_id",userId);
followQueryWrapper.eq("follow_user_id",followed);

remove(followQueryWrapper);
}
}

@Override
public boolean isFollow(Long writerId) {
UserDTO user = UserHolder.getUser();
if(user == null){
return false;
}
Long userId = user.getId();

QueryWrapper<Follow> followQueryWrapper = new QueryWrapper<>();
followQueryWrapper.eq("user_id",userId);
followQueryWrapper.eq("follow_user_id",writerId);

Follow one = getOne(followQueryWrapper);
return one != null;
}
}

获取他人主页信息

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/{id}")
public Result getOtherInfo(@PathVariable("id") Long otherId){
User user = userService.getById(otherId);
if(user == null){
return Result.fail("用户不存在");
}

UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);

return Result.ok(userDTO);
}