企业项目实战问题
# 全局ID生成策略
在应用默认ID生成策略 , 不难发现 , 他们的规律太明显 (容易泄露信息)
在分布式系统下全局ID需要满足以下特性 :
- 唯一性
- 高性能
- 安全性
- 递增性
- 高可用
自定义全局ID
利用Redis递增特性 , 自定义个全局ID , 二进制展示形式
64位 => 1(符号位) + 31(时间戳) +32(序列号)
代码示例 点击展开
@Component
public class RedisIdWorker {
private StringRedisTemplate redisTemplate;
public static final long BEGIN_TIMESTAMP = 1672531200;
public static final long COUNT_BITS = 32;
public RedisIdWorker(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public long nextId(String keyPrefix) {
// 1. 生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2. 获取序列号
// 2.1 日期
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2 自增长
long count = redisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3. 拼接
return timestamp << COUNT_BITS | count;
}
}
# 高并发锁机制
高并发场景我们也不难发现 , 就是在 双十一 , 春运抢票 , 等等... 在巨大流量冲击下 , 数据很有可能会被击穿导致负数等异常现象 , 因此系统需要运用一些技术手段抵抗这大量请求冲击
我们可以通过锁机制来控制数据击穿问题 , 我们分别了解两个锁机制 :
- 悲观锁 在数据进行操作时 , 会进行先加锁 , 执行完后释放锁 , 供给其他线程使用 , 在一个锁执行的过程 , 其他线程则需要等待获取锁
- 乐观锁 不会进行加锁 , 而是在更新数据时检查是否被其他线程修改过 , 从而判断是否进行更改
区别 :
悲观锁 | 乐观锁 | |
---|---|---|
操作 | 加锁 | 不加锁 |
检验 | 业务逻辑层 | SQL操作层 |
# 一人一单业务
有些情况难免会被黄牛买爆(炒卖) , 因此需要避免一人多买抢购现象 !
需求 : 秒杀业务 , 一人仅一单功能
假设 : 有一个业务抢优惠券 , 需要传入用户ID 实现抢券
示例 : 分别展示应用优化过程
正常业务
正常业务 点击展开
// 优惠券订单业务实现类
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
// 优惠券信息Service
@Resource
private ISeckillVoucherService seckillVoucherService;
// Id生成
@Resource
private RedisIdWorker redisIdWorker;
@Override
@Transactional(rollbackFor = {RuntimeException.class})
public VoucherOrder seckillVoucher(Long voucherId) {
//1. 查询优惠券
SeckillVoucher sv = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = sv.getBeginTime();
LocalDateTime endTime = sv.getEndTime();
Integer stock = sv.getStock();
//2. 秒杀是否时间范围内
LocalDateTime now = LocalDateTime.now();
if (beginTime != null && endTime != null) {
if (now.isBefore(beginTime) && now.isAfter(endTime)) {
throw new BusinessException("未在活动时间内!");
}
}
//3. 库存收充足
if (stock < 1) throw new BusinessException("券已经抢没了!");
//4. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
if (!success) throw new BusinessException("券已经抢没啦!");
//5. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// UserHolder当前请求用户信息
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return voucherOrder;
}
}
# 初步优化
思路 : 通过SQL查询中的订单是否存在 , 如果存在则不给予领取 , 否则通过
实现 :
初步优化 点击展开
// 优惠券订单业务实现类
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
// 优惠券信息Service
@Resource
private ISeckillVoucherService seckillVoucherService;
// Id生成
@Resource
private RedisIdWorker redisIdWorker;
@Override
@Transactional(rollbackFor = {RuntimeException.class})
public VoucherOrder seckillVoucher(Long voucherId) {
//1. 查询优惠券
SeckillVoucher sv = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = sv.getBeginTime();
LocalDateTime endTime = sv.getEndTime();
Integer stock = sv.getStock();
//2. 秒杀是否时间范围内
LocalDateTime now = LocalDateTime.now();
if (beginTime != null && endTime != null) {
if (now.isBefore(beginTime) && now.isAfter(endTime)) {
throw new BusinessException("未在活动时间内!");
}
}
//3. 库存收充足
if (stock < 1) throw new BusinessException("券已经抢没了!");
//4. 一人一单逻辑
Long userId = UserHolder.getUser().getId();
// 根据库 用户id和优惠券id 数据判断是否可进行领取优惠券
int count = this.query()
.eq("user_id", userId)
.eq("voucher_id", voucherId).count();
if (count > 0) throw new BusinessException("一人仅限购买一张券!");
//5. 扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
if (!success) throw new BusinessException("券已经抢没啦!");
//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return voucherOrder;
}
}
问题 :
也有难免的情况 , 防君子不防小人~
当一个小人在同一时间向发送多个抢券请求 , 卡出多张优惠券 . 就是通过卡 数据库同一时间内 , 多条线程查询为订单不存在的情况 !
在以上代码的第四步逻辑开始 , 以时间线分析业务过程 : (不同线程可以理解为多个请求)
时间线 | 线程A | 线程B |
---|---|---|
T1 | 查库 | |
T2 | 查库 | |
T3 | 没有领取记录 | |
T4 | 没有领取记录 | |
T5 | 扣减库存 | 扣减库存 |
# 加锁优化
思路 : 通过封装方法添加 synchronized
锁 , 根据 用户Id 加锁
实现 :
加锁优化 点击展开
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public VoucherOrder seckillVoucher(Long voucherId) {
//1. 查询优惠券
SeckillVoucher sv = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = sv.getBeginTime();
LocalDateTime endTime = sv.getEndTime();
Integer stock = sv.getStock();
//2. 秒杀是否时间范围内
LocalDateTime now = LocalDateTime.now();
if (beginTime != null && endTime != null) {
if (now.isBefore(beginTime) && now.isAfter(endTime)) {
throw new BusinessException("未在活动时间内!");
}
}
//3. 库存收充足
if (stock < 1) throw new BusinessException("券已经抢没了!");
return createVoucherOrder(voucherId);
}
@Transactional(rollbackFor = {RuntimeException.class})
public VoucherOrder createVoucherOrder(Long voucherId) {
// 通过ID判断锁指定对象 , intern()保证常量池中拿到的数据是一直的
Long loginUserId = UserHolder.getUser().getId();
synchronized (loginUserId.toString().intern()) {
//4. 一人一单逻辑
Long userId = UserHolder.getUser().getId();
// 根据库 用户id和优惠券id 数据判断是否可进行领取优惠券
int count = this.query()
.eq("user_id", userId)
.eq("voucher_id", voucherId).count();
if (count > 0) throw new BusinessException("一人仅限购买一张券!");
//5. 扣减库存 (需要在数据库层进行--操作,高并发可能会错乱)
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
if (!success) throw new BusinessException("券已经抢没啦!");
//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(loginUserId);
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return voucherOrder;
}
}
}
问题 :
一人多买的问题得到根治 , 但我们可以发现 事务的提交和锁的释放 , 他们并非是我们理想中的 先提交事务后释放锁 的一个过程 . synchronized
锁在方法中就已经释放 , 而事务侦测的是整个方法 , 因此 其他线程很有可能会卡在 以上代码的33行中 Long loginUserId = UserHolder.getUser().getId();
, 仍然存在隐患
# 事务优化
思路 : 将 synchronized
锁 提取到封装的方法外 , 通过代理对象进行对其方法调用 , 从而满足 先提交事务后释放锁的目的 !
实现 :
实现前提 点击展开
代理依赖
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
启动类 添加注解 暴露代理
@EnableAspectJAutoProxy(exposeProxy = true)
业务接口
目的 让事务生效 . 事务是基于接口实现生效的
public interface IVoucherOrderService extends IService<VoucherOrder> {
VoucherOrder seckillVoucher(Long voucherId);
VoucherOrder createVoucherOrder(Long voucherId);
}
业务实现类 点击展开
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Override
public VoucherOrder seckillVoucher(Long voucherId) {
//1. 查询优惠券
SeckillVoucher sv = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = sv.getBeginTime();
LocalDateTime endTime = sv.getEndTime();
Integer stock = sv.getStock();
//2. 秒杀是否时间范围内
LocalDateTime now = LocalDateTime.now();
if (beginTime != null && endTime != null) {
if (now.isBefore(beginTime) && now.isAfter(endTime)) {
throw new BusinessException("未在活动时间内!");
}
}
//3. 库存收充足
if (stock < 1) throw new BusinessException("券已经抢没了!");
// 通过ID判断锁指定对象 , intern()保证常量池中拿到的数据是一直的
Long loginUserId = UserHolder.getUser().getId();
synchronized (loginUserId.toString().intern()) {
// 获取 启动类暴露 的代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
// 基于接口调用(保证实物有效性)
return proxy.createVoucherOrder(voucherId);
}
}
@Override
@Transactional(rollbackFor = {RuntimeException.class})
public VoucherOrder createVoucherOrder(Long voucherId) {
//4. 一人一单逻辑
Long userId = UserHolder.getUser().getId();
// 根据库 用户id和优惠券id 数据判断是否可进行领取优惠券
int count = this.query()
.eq("user_id", userId)
.eq("voucher_id", voucherId).count();
if (count > 0) throw new BusinessException("一人仅限购买一张券!");
//5. 扣减库存 (需要在数据库层进行--操作,高并发可能会错乱)
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock", 0)
.update();
if (!success) throw new BusinessException("券已经抢没啦!");
//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return voucherOrder;
}
}
问题 :
我们已经在单机中解决了一人一单的线程安全问题 . 那我们进行将视角放大 , 可知真实场景 , 是以多台服务器集群的形式运作 . 那么在这一情况下可能会出现 , 一个人多发请求分布到多台主机上实行业务 , 结果仍然会出现一人多买的现象 , 从而违背需求!
锁失效的原因
多台主机不能同步JVM中的常量池数据 , 每台主机中代码虽然是一样的 , 但内部的常量池存储的数据不一致 , 因此锁仅限于在当前主机的并发请求有效 !
# 集群优化
思路 : 在JVM外部实现锁机制 , 通过Redis中的setNX命令加锁实现 , 并且设置过期时间(防止宕机强锁)
实现 :
集群优化 点击展开
分布式锁控制类
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate redisTemplate;
public static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public SimpleRedisLock(String name, StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
this.name = name;
}
@Override
public boolean tryLock(long timeoutSec) {
// 线程名称
String threadInfo = ID_PREFIX + Thread.currentThread().getId();
Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadInfo, timeoutSec, TimeUnit.SECONDS);
// 拆箱 (防止null情况)
return BooleanUtil.isTrue(success);
}
@Override
public void unlock() {
// 线程名称
String key = KEY_PREFIX + name;
String threadInfo = ID_PREFIX + Thread.currentThread().getId();
String val = redisTemplate.opsForValue().get(key);
// 只能删除自己线程的锁(其他线程不能删除)
if (threadInfo.equals(val)) redisTemplate.delete(key);
}
}
业务分布式优化
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource(name = "stringRedisTemplate")
private StringRedisTemplate redisTemplate;
@Override
public VoucherOrder seckillVoucher(Long voucherId) {
//1. 查询优惠券
SeckillVoucher sv = seckillVoucherService.getById(voucherId);
LocalDateTime beginTime = sv.getBeginTime();
LocalDateTime endTime = sv.getEndTime();
Integer stock = sv.getStock();
//2. 秒杀是否时间范围内
LocalDateTime now = LocalDateTime.now();
if (beginTime != null && endTime != null) {
if (now.isBefore(beginTime) && now.isAfter(endTime)) {
throw new BusinessException("未在活动时间内!");
}
}
//3. 库存收充足
if (stock < 1) throw new BusinessException("券已经抢没了!");
//分布式锁
Long loginUserId = UserHolder.getUser().getId();
SimpleRedisLock lock = new SimpleRedisLock("voucherOrder:" + loginUserId, redisTemplate);
// 10min
boolean isLock = lock.tryLock(60 * 10);
if (!isLock) throw new BusinessException("一人仅限购买一张券!!");
try {
// 获取 启动类暴露 的代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
// 基于接口调用(保证实物有效性)
return proxy.createVoucherOrder(voucherId);
} finally {
lock.unlock();
}
}
@Override
@Transactional(rollbackFor = {RuntimeException.class})
public VoucherOrder createVoucherOrder(Long voucherId) {
//4. 一人一单逻辑
Long userId = UserHolder.getUser().getId();
// 根据库 用户id和优惠券id 数据判断是否可进行领取优惠券
int count = this.query()
.eq("user_id", userId)
.eq("voucher_id", voucherId).count();
if (count > 0) throw new BusinessException("一人仅限购买一张券!");
//5. 扣减库存 (需要在数据库层进行--操作,高并发可能会错乱)
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock", 0)
.update();
if (!success) throw new BusinessException("券已经抢没啦!");
//6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(userId);
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return voucherOrder;
}
}
以上方案虽说是完美中的完美 , 曾有很多企业优化至此 .
问题 :
举个较为极端的例子 , 在释放锁时 , 当判断if语句后出现阻塞的情况(true后阻塞) , 且阻塞时间远远超过了锁过期时间(锁过期) , 随后其他线程加锁进来时 , 可能会出现 当前线程阻塞竟然通了 , 那么即将 释放锁的操作 可对任意线程的锁进行释放 , 因此仍然会存在安全隐患
阻塞缘故
当JVM执行 Full CG
时 , 会阻塞所有代码(不会影响其他JVM) , 是JVM本身引起的阻塞
# JVM优化
思路 : 保证if语句和释放锁 的原子性 , 需要抛开 JVM层面 解决问题 , 往上层Redis的Lua脚本实现 , 一个脚本执行多条命令 , 并且保证这些命令的原子性
Lua脚本菜鸟教程 : https://www.runoob.com/lua/lua-tutorial.html (opens new window)
实现 :
JVM优化 点击展开
在资源目录创建 unlock.lua
脚本文件
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
分布式锁控制类
public class SimpleRedisLock implements ILock {
private StringRedisTemplate redisTemplate;
private String name;
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
public static final String KEY_PREFIX = "lock:";
// 提前加载lua文件 , 防止懒加载现象
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 指定资源路径
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 指定脚本返回类型
UNLOCK_SCRIPT.setResultType(Long.class);
}
public SimpleRedisLock(String name, StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
this.name = name;
}
@Override
public boolean tryLock(long timeoutSec) {
// 线程名称
String threadInfo = ID_PREFIX + Thread.currentThread().getId();
Boolean success = redisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadInfo, timeoutSec, TimeUnit.SECONDS);
// 拆箱 (防止null情况)
return BooleanUtil.isTrue(success);
}
@Override
public void unlock() {
String key = KEY_PREFIX + name;
String threadInfo = ID_PREFIX + Thread.currentThread().getId();
// Lua脚本 (保证 if和释放锁 步骤的原子性)
redisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(key),
threadInfo
);
}
}
# 消息队列消锋
消息队列 请求高峰期 必能少的功能 , 一般高峰时期下 写数据 必然会涉及到数据库操作 , 那么可通过消息队列进行缓存处理的数据 , 将 高峰值的请求量 降低到 数据库能够承受的量 , 以免 库宕机现象 .
需求 : 解决高峰期 , 高请求 , 快响应 , 并且保证主机不能宕机
思路 : 通过Redis存 优惠券票数 与 Set存用户id(防止重复) , 通过lua脚本执行完后 即将 处理的数据放入消息队列中 , 异步处理这些数据
示例 :
# JVM实现
通过 BlockingQueue
类 实现消息队列功能
lua脚本
实现 :
JVM实现 点击展开
seckill.lua 脚本
---1. 参数
--- keys : []
--- argv: [优惠券id,用户id,订单id]
local voucherId = ARGV[1]
local userId = ARGV[2]
--- 2. Redis key标识
--- 库存key & 订单key
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
--- 3. 校验
--- 3.1 判断库存信息
local n = tonumber(redis.call('get', stockKey))
--- 参数无效 || 库存不足
if (n == nil or n <= 0) then
return 1
end
--- 3.2 判断用户是否已经下单了(重复下单)
--- 用户id是否存在于set集合
if (redis.call('sismember', orderKey, userId) == 1) then
return 2
end
--- 4. 业务执行
--- 4.1 库存-1
redis.call("incrby", stockKey, -1)
--- 4.2 下单
redis.call("sadd", orderKey, userId)
return 0
业务代码
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private RedisIdWorker redisIdWorker;
@Resource(name = "stringRedisTemplate")
private StringRedisTemplate redisTemplate;
// 异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
// 消息队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
// 代理对象(内部线程使用)
IVoucherOrderService proxy;
// 类初始化运行线程
@PostConstruct
public void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 异步线程 执行SQL
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
try {
// 循环执行调用
while (true) {
// 防止过早执行无效代理类
if (proxy == null) continue;
VoucherOrder take = orderTasks.take();
handleVoucherOrder(take);
}
} catch (InterruptedException e) {
log.error("异步执行执行异常 => " + e);
}
}
// 创建订单
private void handleVoucherOrder(VoucherOrder voucherOrder) {
proxy.createVoucherOrder(voucherOrder);
}
}
// 加载lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 指定资源路径
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
// 指定脚本返回类型
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Long seckillVoucher(Long voucherId) {
// 1. 执行lua脚本 (判断是否有资格获取)
Long loginUserId = UserHolder.getUser().getId();
Long execute = redisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(),
loginUserId.toString()
);
// 2. 判断lua脚本结果
if (execute == null) throw new BusinessException("券已经抢没了");
int r = execute.intValue();
switch (r) {
case 0:
break;
case 1:
throw new BusinessException("券已经抢没了");
default:
throw new BusinessException("一人仅限购买一张券!!");
}
VoucherOrder voucherOrder = new VoucherOrder();
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(loginUserId);
voucherOrder.setVoucherId(voucherId);
// 代理本身对象(提供异步线程使用)
if (proxy == null)
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 添加 消息队列
orderTasks.add(voucherOrder);
return orderId;
}
// 创建业务执行的方法
private void createVoucherOrder(VoucherOrder voucherOrder) {
boolean save = this.save(voucherOrder);
if (!save) {
log.error("创建异常 => ({}:{})",
"优惠券" + voucherOrder.getVoucherId(), "用户" + voucherOrder.getUserId());
}
}
}
问题 : 当执行业务时JVM宕机了 , 那么内存中的消息队列的数据就会丢失 !!
# Redis消息队列优化
思路 : 通过Redis Steam
实现 消息队列功能 (传送门了解) , 实现耦合实现消息队列拆分
实现 :
Redis消息队列优化 点击展开
seckill.lua 脚本
---1. 参数
--- keys : []
--- argv: [优惠券id,用户id,订单id]
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
--- 2. Redis key标识
--- 库存key & 订单key
local stockKey = 'seckill:stock:' .. voucherId
local orderKey = 'seckill:order:' .. voucherId
--- 3. 校验
--- 3.1 判断库存信息
local n = tonumber(redis.call('get', stockKey))
--- 参数无效 || 库存不足
if (n == nil or n <= 0) then
return 1
end
--- 3.2 判断用户是否已经下单了(重复下单)
--- 用户id是否存在于set集合
if (redis.call('sismember', orderKey, userId) == 1) then
return 2
end
--- 4. 业务执行
--- 4.1 库存-1
redis.call("incrby", stockKey, -1)
--- 4.2 下单
redis.call("sadd", orderKey, userId)
--- 4.3 发送消息到队列中 , 等待处理数据
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId)
return 0
业务代码
@Service
@Slf4j
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource(name = "stringRedisTemplate")
private StringRedisTemplate redisTemplate;
@Resource
private RedissonClient redissonClient;
// 异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
// 消息队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
// 代理对象(内部线程使用)
IVoucherOrderService proxy;
@PostConstruct
public void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
// 异步线程 执行SQL
private class VoucherOrderHandler implements Runnable {
String queueName = "stream.orders";
@Override
public void run() {
try {
while (true) {
if (proxy == null) continue;
// 获取Redis队列消息
// 命令结构 : XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order >
List<MapRecord<String, Object, Object>> recordList = redisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty()
.count(1)
.block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
if (recordList == null || recordList.isEmpty()) continue;
// 取出消息
MapRecord<String, Object, Object> record = recordList.get(0);
Map<Object, Object> map = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
//VoucherOrder take = orderTasks.take();
// 执行业务
handleVoucherOrder(voucherOrder);
// ack确认消息
// 命令结构 : SACK stream.orders g1 id
redisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
}
} catch (Exception e) {
log.error("异步订单执行异常 => " + e);
handlePendingList();
}
}
// 异常消息处理
private void handlePendingList() {
try {
while (true) {
// 获取Redis队列消息
// 命令结构 : XREADGROUP GROUP g1 c1 COUNT 1 STREAMS streams.order 0
List<MapRecord<String, Object, Object>> recordList = redisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 无异常跳出
if (recordList == null || list().isEmpty()) break;
// 取出消息
MapRecord<String, Object, Object> record = recordList.get(0);
Map<Object, Object> map = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(map, new VoucherOrder(), true);
// 执行业务
handleVoucherOrder(voucherOrder);
// ack确认消息
// 命令结构 : SACK stream.orders g1 id
redisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
}
} catch (Exception e){
this.handlePendingList();
}
}
// 创建订单
private void handleVoucherOrder(VoucherOrder voucherOrder) {
proxy.createVoucherOrder(voucherOrder);
//5. 扣减库存 (需要在数据库层进行--操作,高并发可能会错乱)
seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0)
.update();
}
}
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 指定资源路径
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
// 指定脚本返回类型
SECKILL_SCRIPT.setResultType(Long.class);
}
@Override
public Long seckillVoucher(Long voucherId) {
// 1. 执行lua脚本 (判断是否有资格获取)
Long loginUserId = UserHolder.getUser().getId();
// 订单id
Long orderId = redisIdWorker.nextId("order");
Long execute = redisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(),
voucherId.toString(),
loginUserId.toString(),
orderId.toString()
);
// 2. 判断lua脚本结果
if (execute == null) throw new BusinessException("券已经抢没了");
int r = execute.intValue();
switch (r) {
case 0:
break;
case 1:
throw new BusinessException("券已经抢没了");
default:
throw new BusinessException("一人仅限购买一张券!!");
}
VoucherOrder voucherOrder = new VoucherOrder();
voucherOrder.setId(orderId);
voucherOrder.setUserId(loginUserId);
voucherOrder.setVoucherId(voucherId);
orderTasks.add(voucherOrder);
// 代理本身对象(提供异步线程使用)
if (proxy == null)
proxy = (IVoucherOrderService) AopContext.currentProxy();
return orderId;
}
@Override
@Transactional(rollbackFor = {RuntimeException.class})
public void createVoucherOrder(VoucherOrder voucherOrder) {
boolean save = this.save(voucherOrder);
if (!save) {
log.error("创建异常 => ({}:{})",
"优惠券" + voucherOrder.getVoucherId(), "用户" + voucherOrder.getUserId());
}
}
}
# 点赞功能
需求 : 一人只能点赞一次 , 再次点击取消
思路 : 通过Redis中的 Set
实现存id判断用户 点赞/取消点赞 , 从而实现点赞功能
假设 : 数据库有个字段记录点赞总数
实现 :
点赞功能 点击展开
点赞业务
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {
@Resource
private IUserService userService;
@Resource(name = "stringRedisTemplate")
private StringRedisTemplate redisTemplate;
@Override
public boolean likeBlog(Long id) {
// 获取当前用户
Long loginUserId = UserHolder.getUser().getId();
String key = BLOG_LIKED_KEY + id;
// 查指定 铺店Id 是否包含指定 用户Id
Boolean isMember = redisTemplate.opsForSet().isMember(key, loginUserId.toString());
boolean success;
// 点赞/取消点赞 功能 (包含取消/不包含点赞)
if (BooleanUtil.isTrue(isMember)) {
// sql逻辑去除
success = this.update().setSql("liked = liked - 1").eq("id", id).update();
// redis 去除
if (success) redisTemplate.opsForSet().remove(key, loginUserId.toString());
}else {
// sql逻辑添加
success = this.update().setSql("liked = liked + 1").eq("id", id).update();
// redis 添加
if (success) redisTemplate.opsForSet().add(key, loginUserId.toString());
}
return success;
}
// 判断用户是否已点赞
// 无返回值 , 直接在blog参数设置是否有点赞功能即可
private void isBlogLiked(Blog blog) {
// 获取当前用户
UserDTO user = UserHolder.getUser();
// 未登录(无需判断点赞状态)
if (user == null) return;
Long loginUserId = user.getId();
String key = BLOG_LIKED_KEY + blog.getId();
// 查指定 铺店Id 是否包含指定 用户Id
Boolean isMember = redisTemplate.opsForSet().isMember(key, loginUserId.toString());
blog.setIsLike(isMember);
}
}
# 排行榜优化
需求 : 获取 点赞排行榜前Top5的消息
思路 : 通过Redis中 SortedSet
实现 排行应用 , 缓存提取id后查库 , 并且采用拼接 ORDER BY FIELD
SQL约束 , 保证查询的排行榜顺序!
实现 :
排行榜优化 点击展开
业务代码
@Override
public List<UserDTO> queryBlogLikes(Long id) {
String key = BLOG_LIKED_KEY + id;
// 查top5
Set<String> top5 = redisTemplate.opsForZSet().range(key, 0, 4);
if (top5 == null) throw new BusinessException("数据异常");
String idStr = StrUtil.join(",", top5);
// 保证顺序的SQL查询 : WHERE id IN (5, 1) ORDER BY FIELD(id, 5, 1)
List<User> users = userService.query()
.in("id", top5)
.last("ORDER BY FIELD(id, "+idStr+")")
.list();
// 类型转换
return users.stream().map(user -> BeanUtil.copyProperties(user, UserDTO.class)).collect(Collectors.toList());
}
# 关注推拉
网上冲浪经常遇见的场景 , 用户关注的对象/朋友圈添加的好友 推送的消息 , 都可以收到消息...
实现方案有三种模式 :
- 拉模式 (读扩散) 消费者每次上线拉取生产者一次消息 , 退出舍弃
- 推模式 (写扩散) 生产者每次发布消息都会往消费者发送一次消息 ,
- 推拉结合 (读写混合) 对生产者和消费者进行划分 活跃/普通 两种类型 , 根据类型选择 推/拉 模式
模式区别 :
拉模式 | 推模式 | 推拉结合 | |
---|---|---|---|
写性能 | 低 | 高 | 中 |
读性能 | 高 | 低 | 中 |
资源占用 | 中 | 高 | 中 |
复杂度 | 中 | 低 | 高 |
应用场景 | 少应用 | 千万用户量一下 | 千万用户量以上 |
# 坐标附近
不难理解 , 就是手机定位
思路 : 通过手机定位获取经纬度信息 , 存储采用Redis的GEO数据结构 , 实现距离附近等功能
# 签到功能
问题 : 每天签到作为一次记录 , 如果在千万用户的场景下 , 可能会出现内存爆炸 !!
思路 : Redis提供的Bit位应用 , 通过记录 0未签到 ; 1签到 . 了解Redis的 Bit
提示
Redis中的bit 能够存储31位bit位 , 可以联想到每个月最多有31天 , 通过当中的31天和签到功能联系起来~
优缺点 :
- 将天数压缩为bit存储 , 节省资源损耗
- 支持千万用户数据签到应用
- 较高的复杂度 (bit位运算操作)
场景 :
- 获取用户最近连续的签到数
连续签到数 点击展开
业务代码
@Override
public int signCount() {
// 1. 获取当前用户
Long loginUserId = UserHolder.getUser().getId();
// 2. 获取当前日期
LocalDateTime now = LocalDateTime.now();
String keySuffix = now.format(DateTimeFormatter.ofPattern("yyyyMM"));
int dayOfMonth = now.getDayOfMonth() - 1;
// 3. 拼接key
String key = USER_SIGN_KEY + loginUserId + ":" + keySuffix;
// 4. 获取本月的十进制数据
// Redis命令 : BITFIELD key GET u[dayOfMonth] 0
List<Long> longs = redisTemplate.opsForValue().bitField(key, BitFieldSubCommands.create().get(BitFieldSubCommands.BitFieldType.unsigned(dayOfMonth)).valueAt(0)
);
if (longs == null || longs.isEmpty()) return 0;
Long num = longs.get(0);
if (num == null || num == 0) return 0;
// 5. 循环遍历bit位
int count = 0;
// 统计距离本月距离到今天连续签到数
while (true) {
// 让 num 与 1 做 与运算 判断 , 得到数据最后的bit位 , 通过判断bit可得知签到数据
// 与运算 (0 & 0 => 0 ; 1 & 1 => 0 ; 0 & 1 => 0)
if ((num & 1) == 0) break;
count++;
// num往右移一位 , 舍弃最后一位 bit位
num >>>= 1;
}
// 6. 与运算判断
return count;
}