行唐县网站建设,iis配置wordpress,家庭宽带做私人网站,wordpress数据库类型目录 延迟任务和定时任务
使用Redis设计延迟队列原理
点评项目中选用list和zset两种数据结构进行实现
如何缓解Redis内存的压力同时保证Redis中任务能够被正确消费不丢失
系统流程设计
使用Feign实现微服务间的任务消费以及文章自动审核
系统微服务功能介绍
提交文章-审核文章执行流程
Redis中SET NX实现分布式锁 延迟任务和定时任务
定时任务 有固定周期有明确的触发事件。
延迟任务 没有固定的开始时间它常常是由一个事件触发的而在这个事件触发之后的一段时间内触发另一个事件任务可以立即执行也可以延迟一段时间后执行。参考如下 延迟任务的实现常常基于一个延迟队列延迟队列的实现方案有 DelayQueue、RebbitMQ、Redis中基于Zset数据结构的实现。【本篇文章主要介绍项目中使用到的Redis实现的延迟队列后续会将其他方法实现的延迟队列逐步完善总结】
使用Redis设计延迟队列原理 Redis的基本数据结构中的Zset内部可以根据给定的权重对元素进行排序随后使用 stringRedisTemplate.opsForZSet().rangeByScore(key, min, max)对指定的Key寻找score在min-max间的元素。在向Zset中插入元素的时候可以将优先级设置为socre如果将时间作为优先级实现延迟队列可以在插入元素同时获取当前系统时间作为socre如果需要指定5min后执行则将当前系统获取的时间5min作为对应元素的socre值。实现基于Redis作为延迟队列。
点评项目中选用list和zset两种数据结构进行实现 常规需求下基于Redis实现的延迟队列只需要根据zset设置对应元素的score即可实现如果进一步考虑数据量非常大的情况下此时时间复杂度比较高。在zset中分别使用zadd(.)以及zrange(.)的时间复杂度分别为 ZADD时间复杂度O(M*logN)M成功添加的元素数N是有序集合的基数。ZRANGE按照从低到高的顺序获取指定排名范围内的成员。时间复杂度O(log(N)M)其中 N 是有序集合的基数M 是指定排名范围内的成员数量。 选用list和zset相结合的方式实现延迟队列list中存储当前需要执行的任务zset中存储需要延迟未来执行的任务此时向list的一端存储元素并从list的另一端取出元素不仅可以保证任务消费的有序性同时list中存储以及获取元素的时间复杂度均为O(1)在数据量大的情况下性能更优。
如何缓解Redis内存的压力同时保证Redis中任务能够被正确消费不丢失 Redis是基于内存的数据库有一定的存储容量可以采用RedisMySQL相结合的方式。
每次到达一个新的任务需要延迟消费时首先将对应任务存储到MySQL数据中其次将其根据消费时间立马消费、延迟消费存储到对应list或zset中。在任务被消费时首先从Redis的list中获取元素进行消费并将任务从Redis中删除同时将对应的任务从MySQL数据库中进行删除避免重复消费。任务需要消费时首先将其存储到MySQL中随后将对应时间范围内比如小于当前时间5min存入到Redis中时间大于规定范围的存储到MySQL数据库中并且每消费一条Redis中的任务同时将MySQL中对应的任务清理。所以MySQL中存储的任务均是未消费的任务使用定时任务从MySQL中提取任务并加载到Redis中进行消费此操作必须先将Redis中的任务全部清空避免相同的任务再次加载到Redis中被重复消费。zset中存储的任务借助Spring Task框架提供的定时任务功能按照一定时间间隔自动根据score提取对应范围的任务并将其加载到list中进行消费。
系统流程设计 使用Feign实现微服务间的任务消费以及文章自动审核
系统微服务功能介绍 ①feign微服务定义feign远程调用的接口。②article微服务app端数据存储以及实现feign中定义的保存文章配置相关接口。③schedule微服务消息队列微服务实现任务MySQL的记录以及Redis中任务的消费。同时实现feign中定义的调用延迟队列的接口。④wemedia微服务浏览器端/管理端实现用于实现保存自媒体文章调用sehedule微服务实现任务延迟消费以及调用article微服务实现文章自动审核后保存app端文章相关信息。
为什么需要将延迟队列相关实现单独防止在一个微服务schedule中 提高复用性如果将延迟队列实现防止在wemedia微服务中直接进行调用可以省去不必要的远程调用过程或者MQ实现。同时出现如果其他微服务也需要使用到Redis实现的消息队列此时需要重新实现所以将其抽取为一个单独的微服务提高复用性。
提交文章-审核文章执行流程 可以参考SpringCloud Feign实现微服务间的远程调用黑马头条Day04-CSDN博客 的了解Feign的远程调用的简单原理。
自媒体发布文章远程调用消息队列微服务将任务存入消息延迟队列。自媒体微服务通过远程调用定时拉取消息队列中的任务进行文章审核。自媒体微服务审核完文章后调用app端相关微服务将文章相关html页面对应的url路径等信息存入到文章相关数据表。
上图中有两个地方并没有画出
延迟队列微服务定期从zset中根据score范围取数据并放进list中进行消费。延迟队列微服务定期从MySQL数据库中加载未消费的任务到延迟队列。
贴两个小代码 /*** 定时刷新数据从ZSet到list中*/Scheduled(cron 0 */1 * * * ?)public void refresh(){// 添加分布式锁String token cacheService.tryLock(FUTURE_TASK_SYNC, 1000 * 30);if(StringUtils.isNotBlank(token)){log.info(启动定时刷新任务当前时间为{}, System.currentTimeMillis() / 1000);// 获取所有未来数据的集合的keySetString fututrKeys cacheService.scan(ScheduleConstants.FUTURE *);for (String fututrKey : fututrKeys) {// 根据futureKey计算topicKeyString topicKey ScheduleConstants.TOPIC fututrKey.split(ScheduleConstants.FUTURE)[1];// 获取该组key下需要消费的数据SetString tasks cacheService.zRangeByScore(fututrKey, 0, System.currentTimeMillis());// 将需要消费的任务添加list中if(!tasks.isEmpty()){cacheService.refreshWithPipeline(fututrKey, topicKey, tasks);log.info(成功的将{}对应的数据刷新到{}中, fututrKey, topicKey);}}}}/*** 定时加载数据库中的数据到Redis中*/PostConstruct // 开启即加载Scheduled(cron 0 */5 * * * ?)public void reloadData(){// 清理缓存中的数据clearCache();// 查询数据库中数据根据执行时间小于当前时间5min// 获取5分钟后的时间Calendar calendar Calendar.getInstance();calendar.add(Calendar.MINUTE, 5);ListTaskinfo taskinfos taskinfoMapper.selectList(Wrappers.TaskinfolambdaQuery().lt(Taskinfo::getExecuteTime, calendar.getTime()));if(taskinfos ! null taskinfos.size() 0){// 将查询的数据添加到缓存中for (Taskinfo taskinfo : taskinfos) {Task task new Task();BeanUtils.copyProperties(taskinfo, task);task.setExecuteTime(taskinfo.getExecuteTime().getTime());addTaskToRedis(task);log.info(添加任务到Redis中{}, task);}}}/*** 清理缓存中的数据*/private void clearCache() {SetString topicKey cacheService.scan(ScheduleConstants.TOPIC *);SetString futureKey cacheService.scan(ScheduleConstants.FUTURE *);cacheService.delete(topicKey);cacheService.delete(futureKey);}
Redis中SET NX实现分布式锁 为什么需要分布式锁控制分布式系统有序的去对共享资源进行操作通过互斥来保证数据的一致性。考虑以下场景如果两个延迟队列微服务同时从zset中刷新未来要执行的任务到list中由于两个微服务设置的定时时间都一样此时会出现共享变量的重复操作。 使用Redis实现的分布式锁保证同一时刻只有一个微服务操作共享资源。 sexnx SET if Not eXists 命令在指定的 key 不存在时为 key 设置指定的值。 这种加锁的思路是如果 key 不存在则为 key 设置 value如果 key 已存在则 SETNX 命令不做任何操作 客户端A请求服务器设置key的值如果设置成功就表示加锁成功 客户端B也去请求服务器设置key的值如果返回失败那么就代表加锁失败 客户端A执行代码完成删除锁 客户端B在等待一段时间后再去请求设置key的值设置成功 客户端B执行代码完成删除锁 可以参考Redission实现的分布式锁Redis分布式锁实现-CSDN博客。
暂时写到这里有时间再补.....