目录
day07-积分系统
1积分获取规则和产品原型
2签到功能
3查询今日积分情况
4面试
day08-排行榜功能
1生成实时榜单(查询积分榜)
2怎么配置的动态表名及怎么发挥的作用?
3海量数据存储策略
4定时任务生成榜单表
5定时持久化任务
6清理积分明细
7面试题
day07-积分系统
1积分获取规则和产品原型
积分获取规则
签到规则
连续7天奖励10分 连续14天 奖励20 连续28天奖励40分, 每月签到进度当月第一天重置
学习规则
每学习一小节,积分+10,每天获得上限50分
交互规则(有效交互数据参与积分规则,无效数据会被删除)
写评价 积分+10
写问答 积分+5 每日获得上限为20分
写笔记 积分+3 每次被采集+2 每日获得上限为20分
综上,我们要实现的接口有:
业务
编号
接口简述
签到
1
签到
2
查询本月签到记录
积分
3
新增积分记录
4
查询今日积分情况
排行榜
5
查询本赛季的积分排行榜
6
查询赛季列表
7
查询历史赛季积分排行榜
2签到功能
思路:获取用户id及当前时间年月和日的信息->保存签到记录到redis中->去查询历史连续签到天数(位图bitMap)->统计得分并异步发送保存积分明细记录->封装vo并返回
//签到功能接口 public SignResultVO addSignRecords() { //1.获取登录用户 Long user = UserContext.getUser(); //2.签到 //2.1获取当前日期作为key LocalDate now = LocalDate.now(); //todo :yyyyMM? String format = now.format(DateTimeFormatter.ofPattern(":yyyyMM")); String key = RedisConstants.SIGN_RECORD_KEY_PREFIX + user + format; //2.2使用bitmap尝试进行签到 //得到今日在bitmap的坐标(偏移量) //todo 为什么需要-1? int offset = now.getDayOfMonth() - 1; Boolean signSuccess = redisTemplate.opsForValue().setBit(key, offset, true); log.info("用户{}(id)于{}签到[{}]", user, format, signSuccess?"失败":"成功"); if(signSuccess){ throw new BizIllegalException("签到失败,当日已签到"); } //3.获取连续签到天数 int signDays = getSignedDays(now.getDayOfMonth(), key); //4.计算当日连续签到奖励(每七天叠加一次积分,每次10分) int signReward = signDays / 7 * 10; //5.签到结果 SignResultVO signResultVO = SignResultVO.builder() .signDays(signDays) .signPoints(signReward) .build(); // 6.保存积分明细记录,发送MQ mqHelper.send( MqConstants.Exchange.LEARNING_EXCHANGE, MqConstants.Key.SIGN_IN, //todo SignInMessage? SignInMessage.of(user, signReward + 1));// 签到积分是奖励积分+基本得分 return signResultVO; }
3查询今日积分情况
思路:根据用户id和当前时间去查询积分记录->封装成map<类型,积分记录>->封装vo返回
//查询我的今日积分 public List<PointsStatisticsVO> queryMyPointsToday() { //1.获取用户 Long user = UserContext.getUser(); //2.获取当天开始时间和结束时间 LocalDateTime now = LocalDateTime.now(); LocalDateTime startTime = DateUtils.getDayStartTime(now); LocalDateTime endTime = DateUtils.getDayEndTime(now); //3.查询积分记录 QueryWrapper<PointsRecord> wrapper = new QueryWrapper<>(); wrapper.select("user_id, type, SUM(points) AS points"); wrapper.eq("user_id", user); wrapper.between("create_time", startTime, endTime); wrapper.groupBy("user_id", "type"); List<PointsRecord> records = list(wrapper); Map<PointsRecordType, PointsRecord> map = null; if(CollUtils.isNotEmpty(records)){ map = records.stream().collect(Collectors.toMap(PointsRecord::getType, c -> c)); } ArrayList<PointsStatisticsVO> list = new ArrayList<>(); //4.遍历几种枚举值,组装VO todo 怎么遍历几种枚举值? for (PointsRecordType type : PointsRecordType.values()) { PointsStatisticsVO vo = new PointsStatisticsVO(); //5.如果该类型积分有记录,则取出积分值,否则默认0 if(map!= null && map.containsKey(type)){ vo.setPoints(map.get(type).getPoints()); }else{ vo.setPoints(0); } vo.setType(type.getDesc()); vo.setMaxPoints(type.getMaxPoints()); list.add(vo); } return list; }
4面试
面试官:你项目中使用过Redis的那些数据结构啊?
答:很多,比如String、Hash、Set、SortedSet、BitMap等
面试官追问:能不能具体说说使用的场景?
答:比如很多的缓存,我们就使用了String结构来存储。还有点赞功能,我们用了Set结构和SortedSet结构。签到功能,我们用了BitMap结构。
就拿签到来说吧。因为签到数据量非常大嘛,而BitMap则是用bit位来表示签到数据,31bit位就能表示1个月的签到记录,非常节省空间,而且查询效率也比较高。
面试官追问:你使用Redis保存签到记录,那如果Redis宕机怎么办?
答:对于Redis的高可用数据安全问题,有很多种方案。
比如:我们可以给Redis添加数据持久化机制,比如使用AOF持久化。这样宕机后也丢失的数据量不多,可以接受。
或者呢,我们可以搭建Redis主从集群,再结合Redis哨兵。主节点会把数据持续的同步给从节点,宕机后也会有哨兵重新选主,基本不用担心数据丢失问题。
当然,如果对于数据的安全性要求非常高。肯定还是要用传统数据库来实现的。但是为了解决签到数据量较大的问题,我们可能就需要对数据做分表处理了。或者及时将历史数据存档。
总的来说,签到数据使用Redis的BitMap无论是安全性还是数据内存占用情况,都是可以接受的。但是具体是选择Redis还是数据库方案,最终还是要看公司的要求来选择。
day08-排行榜功能
1生成实时榜单(查询积分榜)
添加积分记录时候就进行统计
// 4.更新总积分到Redis String key = RedisConstants.POINTS_BOARD_KEY_PREFIX + now.format(DateUtils.POINTS_BOARD_SUFFIX_FORMATTER); redisTemplate.opsForZSet().incrementScore(key, userId.toString(), realPoints);
首先我们分析一下请求参数:
榜单数据非常多,不可能一次性查询出来,因此这里一定是分页查询(滚动分页),需要分页参数。
由于要查询历史榜单需要知道赛季,因此参数中需要指定赛季id。当赛季id为空,我们认定是查询当前赛季。这样就可以把两个接口合二为一。
然后是返回值,无论是历史榜单还是当前榜单,结构都一样。分为两部分:
当前用户的积分和排名。当前用户不一定上榜,因此需要单独查询
榜单数据。就是N个用户的积分、排名形成的集合。
思路:根据赛季id判断出是当前赛季还是历史赛季->先查个人信息(分当前赛季(当前的话从redis中找)还是历史赛季)->查赛季排行榜(分当前赛季还是历史赛季)->封装成VO并返回
//分页查询指定赛季的积分排行榜 public PointsBoardVO queryPointsBoardBySeason(PointsBoardQuery query) { //1.获取当前用户 Long user = UserContext.getUser(); //2.是否为历史赛季查询 Long seasonId = query.getSeason(); boolean isCurrent = seasonId == null || seasonId == 0; //3.查询本人的赛季排名和分数 PointsBoard pointsBoard = !isCurrent ? queryMyHistoryPoints(seasonId, user) : queryMyCurrentPoints(user); List<PointsBoard> pointsBoards = null; if(!isCurrent){ //4.分页查询历史赛季排行榜,fromDB pointsBoards = queryHistoryPoints(seasonId, query.getPageNo(), query.getPageSize()); }else{ //5.分页查询本赛季排行榜,fromRedis pointsBoards = queryCurrentPoints(query.getPageNo(), query.getPageSize(), false); } //6.封装List //6.1查询赛季排行榜成员名称 List<Long> uids = pointsBoards.stream().map(PointsBoard::getUserId).collect(Collectors.toList()); List<UserDTO> userDTOS = userClient.queryUserByIds(uids); Map<Long, String> userMap = userDTOS.stream().collect(Collectors.toMap(UserDTO::getId, UserDTO::getName)); //6.2将复制pointsBoard到PointsBoardItemVO List<PointsBoardItemVO> vos = pointsBoards.stream().map(i -> { PointsBoardItemVO itemVO = new PointsBoardItemVO(); //设置远程查询名称 Long userId = i.getUserId(); if (userMap != null && userMap.get(userId) != null) { itemVO.setName(userMap.get(userId)); } itemVO.setPoints(i.getPoints()); itemVO.setRank(i.getRank()); return itemVO; }).collect(Collectors.toList()); //6.封装vo PointsBoardVO vo = new PointsBoardVO(); //个人分数 vo.setPoints(pointsBoard.getPoints()); //个人排名 vo.setRank(pointsBoard.getRank()); vo.setBoardList(vos); return vo; }
2怎么配置的动态表名及怎么发挥的作用?
动态表名的配置与作用机制
整个机制由3个组件协同工作,核心思路是:在 SQL 执行前,通过 MybatisPlus 拦截器将原始表名
points_board替换为带赛季后缀的动态表名(如points_board_1、points_board_2)。
TableInfoContext— 动态表名的"中转站"public class TableInfoContext { private static final ThreadLocal<String> TL = new ThreadLocal<>(); public static void setInfo(String info) { TL.set(info); } // 存入动态表名 public static String getInfo() { return TL.get(); } // 取出动态表名 public static void remove() { TL.remove(); } // 清理,防止内存泄漏 }
- 基于
ThreadLocal,保证线程安全,每个线程互不干扰。- 作用:临时保存当前线程要使用的动态表名(如
"points_board_3")
MybatisConfiguration— 拦截器注册(配置动态表名替换规则)@Bean public DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor() { Map<String, TableNameHandler> map = new HashMap<>(1); // key="points_board":原始表名 // value=TableNameHandler:替换逻辑 map.put("points_board", (sql, tableName) -> TableInfoContext.getInfo() == null ? tableName : TableInfoContext.getInfo() ); return new DynamicTableNameInnerInterceptor(map); }关键逻辑:
map的key是原始表名"points_board"(与@TableName("points_board")对应)。map的value是一个TableNameHandler,定义替换规则:
- 如果
TableInfoContext中有值 → 用动态表名(如points_board_3)- 如果
TableInfoContext中没值 → 保持原表名points_board不变拦截器何时生效?
MyBatis 的设计是:所有 SQL 执行前都必须经过拦截器链。DynamicTableNameInnerInterceptor会在 SQL 执行前扫描 SQL 中的表名,如果发现表名匹配了 map 中的 key(即points_board),就调用对应的TableNameHandler来替换表名。/** * MP动态计算表名拦截器,在执行SQL之前,根据表名的不同,动态的替换表名 */ @Configuration public class MybatisConfiguration { //todo 它怎么知道何时替换表明的? MyBatis 设计如此,所有 SQL 必须经过拦截器链 @Bean public DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor() { // 准备一个Map,用于存储TableNameHandler Map<String, TableNameHandler> map = new HashMap<>(1); // 存入一个TableNameHandler,用来替换points_board表名称 // 替换方式,就是从TableInfoContext中读取保存好的动态表名,如果没有,则返回原来的表名 map.put("points_board", (sql, tableName) -> TableInfoContext.getInfo() == null ? tableName : TableInfoContext.getInfo()); return new DynamicTableNameInnerInterceptor(map); } }3️⃣ 业务代码中的使用(以你贴的代码为例)
private PointsBoard queryMyHistoryPoints(Long seasonId, Long userId) { // 1. 拼接动态表名,如 seasonId=3 → "points_board_3" String tableName = LearningConstants.POINTS_BOARD_TABLE_PREFIX + seasonId; // 2. 将动态表名存入 ThreadLocal TableInfoContext.setInfo(tableName); try { // 3. 执行查询 → SQL经过拦截器,表名被替换为 points_board_3 // 原始SQL: SELECT * FROM points_board WHERE user_id = ? // 替换后: SELECT * FROM points_board_3 WHERE user_id = ? PointsBoard one = lambdaQuery() .eq(PointsBoard::getUserId, userId) .one(); return one; } finally { // 4. 清理 ThreadLocal,防止线程复用导致表名错乱 TableInfoContext.remove(); } }🔗 完整调用链路图
业务代码设置动态表名
↓
TableInfoContext.setInfo("points_board_3") ← 存入ThreadLocal
↓
lambdaQuery().eq(...).one() ← 触发SQL执行
↓
DynamicTableNameInnerInterceptor 拦截 ← MyBatis拦截器链
↓
发现SQL中表名是 "points_board" ← 匹配map的key
↓
调用TableNameHandler ← 执行替换逻辑
↓
从TableInfoContext.getInfo()取出 "points_board_3"
↓
SQL中 points_board → points_board_3 ← 表名替换完成
↓
执行替换后的SQL: SELECT * FROM points_board_3 WHERE user_id = ?
↓
finally: TableInfoContext.remove() ← 清理ThreadLocal
3海量数据存储策略
水平分表和分区的区别?
- 水平分表:是业务层 / 中间件层面的拆分,表名不同,是物理上独立的多张表。
- 分区:是数据库内核层面的拆分,逻辑上还是一张表,底层被拆成多个分区文件。
维度 水平分表 分区(Partition) 谁来做的? 开发者 / 中间件(如 ShardingSphere) 数据库本身(MySQL 原生支持) 表名变化? 是,会变成 points_board_1、points_board_2等多张表否,逻辑上还是一张表,对应用户透明 数据隔离? 物理上完全隔离,是独立的表 逻辑上一张表,底层是多个分区文件 应用层是否感知? 是,需要动态切换表名(如你代码里的 TableInfoContext)否,SQL 写法和单表一样,无需修改 适用场景 数据量极大,单机数据库性能瓶颈,需要跨库 单库内数据量大,按时间、范围等规则做冷热数据分离
4定时任务生成榜单表
// 将上个赛季的积分排行表创建出来 // @Scheduled(cron = "0 0 3 1 * ?") // 每月1号,凌晨3点执行 //todo @XxlJob后面括号里面是什么? @XxlJob("createTableJob") public void createPointsBoardTableOfLastSeason(){ log.debug("定时任务启动->创建上个赛季积分排行表"); // 1.获取上月时间 LocalDateTime time = LocalDateTime.now().minusMonths(1); // 2.在赛季表中查询上赛季id Integer season = seasonService.querySeasonByTime(time); if (season == null) { // 赛季不存在 return; } // 3.创建赛季排行表 pointsBoardService.createPointsBoardTableBySeason(season); }下面代码是在common配置的Xxl-Job
package com.tianji.common.autoconfigure.xxljob; import com.tianji.common.utils.StringUtils; import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * XXL-Job 分布式任务调度自动配置类 * <p> * 该类负责在 Spring Boot 应用中自动配置 XXL-Job 执行器,使应用能够: * 1. 注册到 XXL-Job 调度中心 * 2. 接收并执行定时任务 * 3. 上报任务执行结果和日志 * </p> * <p> * 配置方式: * 在 application.yml 中配置 xxl.job 相关属性: * <pre> * xxl: * job: * admin: * addresses: http://127.0.0.1:8080/xxl-job-admin # 调度中心地址 * executor: * app-name: my-service # 执行器名称 * ip: # 执行器IP(可选) * port: 9999 # 执行器端口 * log-path: /data/applogs/xxl-job/jobhandler # 日志路径 * log-retention-days: 30 # 日志保留天数 * access-token: # 访问令牌(可选) * </pre> * </p> */ @Slf4j @Configuration // 标识这是一个配置类,Spring容器启动时会加载 @ConditionalOnClass(XxlJobSpringExecutor.class) // 只有当类路径中存在 XxlJobSpringExecutor 时才生效 @EnableConfigurationProperties(XxlJobProperties.class) // 启用配置属性绑定,从配置文件读取 xxl.job.* 配置 public class XxlJobConfig { /** * 创建并配置 XXL-Job 执行器 Bean * <p> * 该方法会创建一个 XxlJobSpringExecutor 实例,并从配置文件中读取各项参数进行初始化。 * 执行器启动后会: * 1. 向调度中心注册自己 * 2. 开启端口监听,接收调度中心的任务触发请求 * 3. 启动日志清理线程,定期清理过期日志 * </p> * * @param prop XXL-Job 配置属性对象,包含调度中心、执行器等配置信息 * @return XxlJobSpringExecutor 配置好的执行器实例,Spring 会自动管理其生命周期 */ @Bean public XxlJobSpringExecutor xxlJobExecutor(XxlJobProperties prop) { log.info(">>>>>>>>>>> xxl-job config init."); // 创建 XXL-Job Spring 执行器实例 XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); // ========== 配置调度中心地址 ========== // 调度中心是 XXL-Job 的管理后台,负责任务的调度和触发 XxlJobProperties.Admin admin = prop.getAdmin(); if (admin != null && StringUtils.isNotEmpty(admin.getAddress())) { // 设置调度中心地址,例如:http://127.0.0.1:8080/xxl-job-admin // 可以配置多个地址,用逗号分隔,实现高可用 xxlJobSpringExecutor.setAdminAddresses(admin.getAddress()); } // ========== 配置执行器参数 ========== // 执行器是当前应用的 XXL-Job 客户端,负责接收和执行任务 XxlJobProperties.Executor executor = prop.getExecutor(); if (executor != null) { // 设置执行器名称(必填) // 作用:在调度中心标识这个执行器,一个服务可以有多个执行器实例 if (executor.getAppName() != null) xxlJobSpringExecutor.setAppname(executor.getAppName()); // 设置执行器 IP(可选) // 如果不设置,XXL-Job 会自动获取本机 IP // 在多网卡或 Docker 环境下可能需要手动指定 if (executor.getIp() != null) xxlJobSpringExecutor.setIp(executor.getIp()); // 设置执行器端口(可选) // 默认端口是 9999,调度中心通过这个端口触发任务 // 如果同一台机器部署多个实例,需要设置不同端口 if (executor.getPort() != null) xxlJobSpringExecutor.setPort(executor.getPort()); // 设置任务日志存储路径 // XXL-Job 会将每个任务的执行日志保存到该目录 // 格式:{logPath}/{appname}/{date}/{jobId}.log if (executor.getLogPath() != null) xxlJobSpringExecutor.setLogPath(executor.getLogPath()); // 设置日志保留天数 // 超过该天数的日志会被自动清理,避免磁盘空间占用过大 // -1 表示永不清理(不推荐) if (executor.getLogRetentionDays() != null) xxlJobSpringExecutor.setLogRetentionDays(executor.getLogRetentionDays()); } // ========== 配置访问令牌(可选) ========== // 用于调度中心和执行器之间的安全认证 // 如果配置了 accessToken,调度中心和执行器必须使用相同的 token 才能通信 // 增强安全性,防止未授权的调度中心连接 if (prop.getAccessToken() != null) xxlJobSpringExecutor.setAccessToken(prop.getAccessToken()); log.info(">>>>>>>>>>> xxl-job config end."); // 返回配置好的执行器,Spring 会在应用启动时自动调用 start() 方法启动执行器 return xxlJobSpringExecutor; } }
5定时持久化任务
思路:使用@XxlJob 启用分片处理(提升效率)->然后根据时间找到上月的赛季id并且从
@XxlJob中找出当前分片的起始位置(页码)以及处理数据总量(跨度)->然后根据页码和每页大小去redis中查出赛季榜的信息->根据动态表明持久化到数据库中
// 从Redis中将上赛季积分排行表的数据同步到数据库(XxlJob分片广播执行) /** * 如果不进行分片操作,不管是轮询还是其它方式都只会让一台机器执行同步数据库的操作 * 所以需要分片执行,假如有20条数据,两台实例,每台实例一次插入5条数据 * 两台实例各自都会被调度两次,这样并发量就会提高 */ @XxlJob("savePointsBoard2DB") public void savePointsBoard2DB(){ log.debug("定时任务启动->从Redis中将上赛季积分排行表的数据同步到数据库"); // 1. 获取上月时间 LocalDateTime now = LocalDateTime.now().minusMonths(1); // 2. 从数据库points_season_board表获取上赛季id Integer season = seasonService.querySeasonByTime(now); // 3. 分页获取该赛季的积分排行信息 int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal(); //分页页码,由于分片广播,页码就是当前实例的坐标0加1; int pageNo = shardIndex + 1; //每一页查找并保存5条到数据库 int pageSize = 5; //何时停止? while(true){ log.info("处理第{}页数据", pageNo); // 4. 查询该赛季的积分排行信息 List<PointsBoard> pointsBoards = pointsBoardService.queryCurrentPoints(pageNo, pageSize, true); if(CollUtils.isEmpty(pointsBoards)){ break; } // 4. 持久化该赛季信息到数据库 List<PointsBoard> collect = pointsBoards.stream().map(i -> { i.setId(Long.valueOf(i.getRank())); //在数据库中,id字段就为rank字段 i.setRank(null); //数据库表中没有该字段,需要清空 return i; }).collect(Collectors.toList()); // save之前执行动态替换表名todo 动态表明是怎么操作的? DynamicTableNameInnerInterceptor // 将新表名存入threadLocal中,等待动态替换 // 一定要把setInfo放在所有crud操作前面 String tableName = POINTS_BOARD_TABLE_PREFIX + season; TableInfoContext.setInfo(tableName); log.info("待插入的动态表名为{}", tableName); pointsBoardService.saveBatch(collect); //第一轮结束,开始第二轮,页码为 当前页码 + 页跨度 pageNo += shardTotal; } // 5. 清空threadLocal中的数据 TableInfoContext.remove(); }
6清理积分明细
积分明细数据比积分榜单数据量更大,全部放到一张表中不太合适。建议按照赛季的日期对积分明细数据做水平拆分:
当前赛季的数据依然保存在points_record表不变
每个历史赛季的积分明细需要从points_record表迁移到一张独立的表中
表名称规则points_record_xx,这里的xx就是赛季id
通过一个定时任务在每月初完成数据迁移。
组件 状态 说明 PointsRecordMigrationHandler✅ 定时任务主体逻辑完整 @XxlJob("migratePointsRecord")✅ XxlJob 定时任务注册 createPointsRecordTable✅ Mapper 建表 SQL 已写好 queryRecordsBeforeTime✅ 分批查询历史数据已实现 saveBatch+removeByIds✅ 迁移插入 + 主表删除已实现 TableInfoContext.setInfo/remove✅ 动态表名设置和清理已配对
PointsRecordMigrationHandler 问:它的思路?根据时间找到赛季ID->根据id构造分表->从主库中查询需要迁移的积分明细数据->迁移到构造的分表中->删除主表中的已经迁移的数据(根据ids进行删除)
package com.tianji.learning.task; import com.tianji.common.utils.CollUtils; import com.tianji.learning.domain.po.PointsRecord; import com.tianji.learning.service.IPointsBoardSeasonService; import com.tianji.learning.service.IPointsRecordService; import com.tianji.learning.utils.TableInfoContext; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.List; import java.util.stream.Collectors; import static com.tianji.learning.constants.LearningConstants.POINTS_BOARD_TABLE_PREFIX; /** * 积分明细分表迁移定时任务处理器 * <p> * 功能:每月初将上月(历史赛季)的积分明细数据从 points_record 表迁移到分表 points_record_{seasonId} * </p> * <p> * 迁移策略: * 1. 当前赛季的数据保留在 points_record 主表中 * 2. 历史赛季的数据迁移到独立分表,便于管理和查询 * 3. 采用分批迁移,避免一次性操作大量数据导致超时 * </p> */ @Component @RequiredArgsConstructor @Slf4j public class PointsRecordMigrationHandler { private final IPointsBoardSeasonService seasonService; private final IPointsRecordService pointsRecordService; /** * 积分明细分表迁移任务 * <p> * 执行时机:每月初(建议配置为每月2号凌晨3点执行,避开1号凌晨的榜单持久化任务) * Cron表达式示例:0 0 3 2 * ? * </p> */ @XxlJob("migratePointsRecord") public void migratePointsRecordToShardTable() { log.info("========== 积分明细分表迁移任务开始 =========="); try { // 1. 获取上月时间(需要迁移的数据截止时间) LocalDateTime lastMonthEnd = LocalDateTime.now().minusMonths(1); // 设置为上月最后一天的23:59:59 lastMonthEnd = lastMonthEnd.withDayOfMonth(lastMonthEnd.toLocalDate().lengthOfMonth()) .withHour(23).withMinute(59).withSecond(59); log.info("需要迁移 {} 之前的积分记录", lastMonthEnd); // 2. 查询上赛季ID Integer season = seasonService.querySeasonByTime(lastMonthEnd); if (season == null) { log.warn("上月对应的赛季不存在,跳过迁移"); return; } log.info("上月对应赛季ID: {}", season); // 3. 构造分表名称 String tableName = "points_record_" + season; log.info("目标分表名称: {}", tableName); // 4. 创建分表(如果不存在) try { pointsRecordService.createPointsRecordTable(tableName); log.info("分表 {} 创建成功(或已存在)", tableName); } catch (Exception e) { log.error("创建分表 {} 失败: {}", tableName, e.getMessage()); // 表可能已存在,继续执行 } // 5. 分批迁移数据 int batchSize = 500; // 每批迁移500条 int offset = 0; int totalMigrated = 0; while (true) { log.info("正在迁移第 {} 批数据,偏移量: {}", (offset / batchSize + 1), offset); // 5.1 查询待迁移的数据 List<PointsRecord> records = pointsRecordService.queryRecordsBeforeTime( lastMonthEnd, offset, batchSize ); if (CollUtils.isEmpty(records)) { log.info("没有更多数据需要迁移,退出循环"); break; // 没有更多数据 } log.info("本批查询到 {} 条记录", records.size()); // 5.2 设置动态表名(指向分表) TableInfoContext.setInfo(tableName); try { // 5.3 批量插入到分表 pointsRecordService.saveBatch(records); log.info("成功插入 {} 条记录到分表", records.size()); // 5.4 从主表删除已迁移的数据 List<Long> ids = records.stream() .map(PointsRecord::getId) .collect(Collectors.toList()); boolean removed = pointsRecordService.removeByIds(ids); if (removed) { log.info("从主表删除 {} 条已迁移记录", ids.size()); } else { log.warn("从主表删除记录失败,IDs: {}", ids); } totalMigrated += records.size(); } finally { // 5.5 清理 ThreadLocal(必须!防止内存泄漏和表名错乱) TableInfoContext.remove(); } // 5.6 更新偏移量,处理下一批 offset += batchSize; // 可选:每批之间短暂休眠,降低数据库压力 try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("迁移任务被中断"); break; } } log.info("========== 积分明细分表迁移任务完成 =========="); log.info("总共迁移 {} 条记录到分表 {}", totalMigrated, tableName); } catch (Exception e) { log.error("积分明细分表迁移任务执行失败", e); throw new RuntimeException("迁移任务失败: " + e.getMessage(), e); } } }
7面试题
面试官:你在项目中负责积分排行榜功能,说说看你们排行榜怎么设计实现的?
答:我们的排行榜功能分为两部分:一个是当前赛季排行榜,一个是历史排行榜。
因为我们的产品设计是每个月为一个赛季,月初清零积分记录,这样学员就有持续的动力去学习。这就有了赛季的概念,因此也就有了当前赛季榜单和历史榜单的区分,其实现思路也不一样。
首先说当前赛季榜单,我们采用了Redis的SortedSet来实现。member是用户id,score就是当月积分总值。每当用户产生积分行为的时候,获取积分时,就会更新score值。这样Redis就会自动形成榜单了。非常方便且高效。
然后再说历史榜单,历史榜单肯定是保存到数据库了。不过由于数据过多,所以需要对数据做水平拆分,我们目前的思路是按照赛季来拆分,也就是每一个赛季的榜单单独一张表。这样做有几个好处:
拆分数据时比较自然,无需做额外处理
查询数据时往往都是按照赛季来查询,这样一次只需要查一张表,不存在跨表查询问题
因此我们就不需要用到分库分表的插件了,直接在业务层利用MybatisPlus就可以实现动态表名,动态插入了。简单高效。
我们会利用一个定时任务在每月初生成上赛季的榜单表,然后再用一个定时任务读取Redis中的上赛季榜单数据,持久化到数据库中。最后再有一个定时任务清理Redis中的历史数据。
这里要说明一下,这里三个任务是有关联的,之所以让任务分开定义,是为了避免任务耦合。这样在部分任务失败时,可以单独重试,无需所有任务从头重试。
当然,最终我们肯定要确保这三个任务的执行顺序,一定是依次执行的。
面试官追问:你们使用Redis的SortedSet来保存榜单数据,如果用户量非常多怎么办?
首先Redis的SortedSet底层利用了跳表机制,性能还是非常不错的。即便有百万级别的用户量,利用SortedSet也没什么问题,性能上也能得到保证。在我们的项目用户量下,完全足够。
当系统用户量规模达到数千万,乃至数亿时,我们可以采用分治的思想,将用户数据按照积分范围划分为多个桶。
然后为每个桶创建一个SortedSet类型的key,这样就可以将数据分散,减少单个KEY的数据规模了。
而要计算排名时,只需要按照范围查询出用户积分所在的桶,再累加分值范围比他高的桶的用户数量即可。依然非常简单、高效。
面试官追问:你们使用历史榜单采用的定时任务框架是哪个?处理数百万的榜单数据时任务是如何分片的?你们是如何确保多个任务依次执行的呢?
答:我们采用的是XXL-JOB框架。
XXL-JOB自带任务分片广播机制,每一个任务执行器都能通过API得到自己的分片编号、总分片数量。在做榜单数据批处理时,我们是按照分页查询的方式:
每个执行器的读取的起始页都是自己的分片编号+1,例如第一个执行器,其起始页就是1,第二个执行器,其起始页就是2,以此类推
然后不是逐页查询,而是有一个页的跨度,跨度值就是分片总数量。例如分了3片,那么跨度就是3
此时,第一个分片处理的数据就是第1、4、7、10、13等几页数据,第二个分片处理的就是第2、5、8、11、14等页的数据,第三个分片处理的就是第3、6、9、12、15等页的数据。
这样就能确保所有数据都会被处理,而且每一个执行器都执行的是不同的数据了。
最后,要确保多个任务的执行顺序,可以利用XXL-JOB中的子任务功能。比如有任务A、B、C,要按照字母顺序依次执行,我们就可以将C设置为B的子任务,再将B设置为A的子任务。然后给A设置一个触发器。
这样,当A触发时,就会依次执行这三个任务了。