网站建设 风险防控,深圳企业年报,昆明网红街,用群晖nas做网站文章目录 媒资管理6 视频处理6.1 需求6.1.1 总体需求6.7.3 FFmpeg 的基本使用6.7.4 视频处理工具类 6.2 分布式任务处理6.2.1 什么是分布式任务调度6.2.2 XXL-JOB介绍6.2.3 搭建XXL-JOB6.2.3.1 调度中心6.2.3.2 执行器6.2.3.3 执行任务 6.2.4 分片广播 6.3 技术方案6.3.1 作业分… 文章目录 媒资管理6 视频处理6.1 需求6.1.1 总体需求6.7.3 FFmpeg 的基本使用6.7.4 视频处理工具类 6.2 分布式任务处理6.2.1 什么是分布式任务调度6.2.2 XXL-JOB介绍6.2.3 搭建XXL-JOB6.2.3.1 调度中心6.2.3.2 执行器6.2.3.3 执行任务 6.2.4 分片广播 6.3 技术方案6.3.1 作业分片方案6.3.2 保证任务不重复执行6.3.3 视频处理方案 6.4 查询待处理任务6.4.1 需求分析6.4.2添加待处理任务6.4.3 查询待处理任务 6.5 开始执行任务6.5.1 分布式锁6.5.2 开启任务 6.6 更新任务状态6.7 视频处理6.8 测试6.8.1 基本测试6.8.3 抢占任务测试 6.9 其它问题6.9.1 任务补偿机制6.9.2 达到最大失败次数 7 绑定媒资7.1 需求分析7.1.1 业务流程7.1.2 数据模型 7.2 接口定义7.3 接口开发7.3.1 DAO开发7.3.2 Service开发7.3.3 接口层完善7.3.4 接口测试 7.4 实战 媒资管理 完成了实战部分 6 视频处理
6.1 需求
6.1.1 总体需求
视频上传成功需要对视频的格式进行转码处理比如avi转成mp4。如何用Java程序对视频进行处理呢当视频比较多的时候我们如何可以高效处理。 在一些云平台上对象存储产品就具有文件处理的功能如下图
所以一般做文件存储的服务都需要对文件进行处理例如对视频进行转码处理可能由于文件量较大需要使用多线程等技术进行高效处理。
6.7.2 什么是视频编码 视频上传成功后需要对视频进行转码处理。 什么是视频编码查阅百度百科如下
详情参考 详情 首先我们要分清文件格式和编码格式 文件格式是指.mp4、.avi、.rmvb等 这些不同扩展名的视频文件的文件格式 视频文件的内容主要包括视频和音频其文件格式是按照一 定的编码格式去编码并且按照该文件所规定的封装格式将视频、音频、字幕等信息封装在一起播放器会根据它们的封装格式去提取出编码然后由播放器解码最终播放音视频。 音视频编码格式通过音视频的压缩技术将视频格式转换成另一种视频格式通过视频编码实现流媒体的传输。比如一个.avi的视频文件原来的编码是a通过编码后编码格式变为b音频原来为c通过编码后变为d。
音视频编码格式各类繁多主要有几下几类
MPEG系列 由ISO[国际标准组织机构]下属的MPEG[运动图象专家组]开发 视频编码方面主要是Mpeg1vcd用的就是它、Mpeg2DVD使用、Mpeg4的DVDRIP使用的都是它的变种如divxxvid等、Mpeg4 AVC正热门音频编码方面主要是MPEG Audio Layer 1/2、MPEG Audio Layer 3大名鼎鼎的mp3、MPEG-2 AAC 、MPEG-4 AAC等等。注意DVD音频没有采用Mpeg的。 H.26X系列 由ITU[国际电传视讯联盟]主导侧重网络传输注意只是视频编码 包括H.261、H.262、H.263、H.263、H.263、H.264就是MPEG4 AVC-合作的结晶 目前最常用的编码标准是视频H.264音频AAC。
提问 H.264是编码格式还是文件格式 mp4是编码格式还是文件格式
6.7.3 FFmpeg 的基本使用
我们将视频录制完成后使用视频编码软件对视频进行编码本项目 使用FFmpeg对视频进行编码 。
FFmpeg被许多开源项目采用QQ影音、暴风影音、VLC等。 下载FFmpeg https://www.ffmpeg.org/download.html#build-windows 下载ffmpeg.zip,并将解压得到的exe文件加入环境变量。 测试是否正常cmd运行 ffmpeg -v
安装成功作下简单测试 将一个.avi文件转成mp4、mp3、gif等。 比如我们将nacos.avi文件转成mp4运行如下命令 D:\soft\ffmpeg\ffmpeg.exe -i 1.avi 1.mp4 可以将ffmpeg.exe配置到环境变量path中进入视频目录直接运行ffmpeg.exe -i 1.avi 1.mp4 转成mp3ffmpeg -i nacos.avi nacos.mp3 转成gifffmpeg -i nacos.avi nacos.gif 官方文档英文http://ffmpeg.org/ffmpeg.html
6.7.4 视频处理工具类
将课程资料目录中的util.zip解压将解压出的工具类拷贝至base工程。
其中Mp4VideoUtil类是用于将视频转为mp4格式是我们项目要使用的工具类。 下边看下这个类的代码并进行测试。 我们要通过ffmpeg对视频转码Java程序调用ffmpeg使用java.lang.ProcessBuilder去完成具体在Mp4VideoUtil类的63行下边进行简单的测试下边的代码运行本机安装的QQ软件。
Java
ProcessBuilder builder new ProcessBuilder();
builder.command(C:\\Program Files (x86)\\Tencent\\QQ\\Bin\\QQScLauncher.exe);
//将标准输入流和错误输入流合并通过标准输入流程读取信息
builder.redirectErrorStream(true);
Process p builder.start();对Mp4VideoUtil类需要学习使用方法下边代码将一个avi视频转为mp4视频如下
Java
public static void main(String[] args) throws IOException {//ffmpeg的路径String ffmpeg_path D:\\soft\\ffmpeg\\ffmpeg.exe;//ffmpeg的安装位置//源avi视频的路径String video_path D:\\develop\\bigfile_test\\nacos01.avi;//转换后mp4文件的名称String mp4_name nacos01.mp4;//转换后mp4文件的路径String mp4_path D:\\develop\\bigfile_test\\nacos01.mp4;//创建工具类对象Mp4VideoUtil videoUtil new Mp4VideoUtil(ffmpeg_path,video_path,mp4_name,mp4_path);//开始视频转换成功将返回successString s videoUtil.generateMp4();System.out.println(s);
}执行main方法最终在控制台输出 success 表示执行成功。
6.2 分布式任务处理
6.2.1 什么是分布式任务调度
如何去高效处理一批任务呢 1、多线程 多线程是充分利用单机的资源。 2、分布式加多线程 充分利用多台计算机每台计算机使用多线程处理。 方案2可扩展性更强。 方案2是一种分布式任务调度的处理方案。 什么是分布式任务调度 我们可以先思考一下下面业务场景的解决方案 每隔24小时执行数据备份任务。 12306网站会根据车次不同设置几个时间点分批次放票。 某财务系统需要在每天上午10点前结算前一天的账单数据统计汇总。 商品成功发货后需要向客户发送短信提醒。 类似的场景还有很多我们该如何实现 多线程方式实现 学过多线程的同学可能会想到我们可以开启一个线程每sleep一段时间就去检查是否已到预期执行时间。 以下代码简单实现了任务调度的功能
Java
public static void main(String[] args) { //任务执行间隔时间final long timeInterval 1000;Runnable runnable new Runnable() {public void run() {while (true) {//TODOsomethingtry {Thread.sleep(timeInterval);} catch (InterruptedException e) {e.printStackTrace();}}}};Thread thread new Thread(runnable);thread.start();
}上面的代码实现了按一定的间隔时间执行任务调度的功能。 Jdk也为我们提供了相关支持如Timer、ScheduledExecutor下边我们了解下。 Timer方式实现
Java
public static void main(String[] args){ Timer timer new Timer(); timer.schedule(new TimerTask(){Override public void run() { //TODOsomething} }, 1000, 2000); //1秒后开始调度每2秒执行一次
}Timer 的优点在于简单易用每个Timer对应一个线程因此可以同时启动多个Timer并行执行多个任务同一个Timer中的任务是串行执行。 ScheduledExecutor方式实现
Java
public static void main(String [] agrs){ScheduledExecutorService service Executors.newScheduledThreadPool(10);service.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {//TODOsomethingSystem.out.println(todo something);}}, 1,2, TimeUnit.SECONDS);
}Java 5 推出了基于线程池设计的 ScheduledExecutor其设计思想是每一个被调度的任务都会由线程池中一个线程去执行因此任务是并发执行的相互之间不会受到干扰。
Timer 和 ScheduledExecutor 都仅能提供基于开始时间与重复间隔的任务调度不能胜任更加复杂的调度需求。比如设置每月第一天凌晨1点执行任务、复杂调度任务的管理、任务间传递数据等等。第三方Quartz方式实现项目地址https://github.com/quartz-scheduler/quartz Quartz 是一个功能强大的任务调度框架它可以满足更多更复杂的调度需求Quartz 设计的核心类包括 Scheduler, Job 以及 Trigger。其中Job 负责定义需要执行的任务Trigger 负责设置调度策略Scheduler 将二者组装在一起并触发任务开始执行。Quartz支持简单的按时间间隔调度、还支持按日历调度方式通过设置CronTrigger表达式包括秒、分、时、日、月、周、年进行任务调度。 下边是一个例子代码
Java
public static void main(String [] agrs) throws SchedulerException {//创建一个SchedulerSchedulerFactory schedulerFactory new StdSchedulerFactory();Scheduler scheduler schedulerFactory.getScheduler();//创建JobDetailJobBuilder jobDetailBuilder JobBuilder.newJob(MyJob.class);jobDetailBuilder.withIdentity(jobName,jobGroupName);JobDetail jobDetail jobDetailBuilder.build();//创建触发的CronTrigger 支持按日历调度CronTrigger trigger TriggerBuilder.newTrigger().withIdentity(triggerName, triggerGroupName).startNow().withSchedule(CronScheduleBuilder.cronSchedule(0/2 * * * * ?)).build();scheduler.scheduleJob(jobDetail,trigger);scheduler.start();
}public class MyJob implements Job {Overridepublic void execute(JobExecutionContext jobExecutionContext){System.out.println(todo something);}
}通过以上内容我们学习了什么是任务调度任务调度所解决的问题以及任务调度的多种实现方式。 任务调度顾名思义就是对任务的调度它是指系统为了完成特定业务基于给定时间点给定时间间隔或者给定执行次数自动执行任务。 什么是分布式任务调度 通常任务调度的程序是集成在应用中的比如优惠卷服务中包括了定时发放优惠卷的的调度程序结算服务中包括了定期生成报表的任务调度程序由于采用分布式架构一个服务往往会部署多个冗余实例来运行我们的业务在这种分布式系统环境下运行任务调度我们称之为分布式任务调度如下图
分布式调度要实现的目标 不管是任务调度程序集成在应用程序中还是单独构建的任务调度系统如果采用分布式调度任务的方式就相当于将任务调度程序分布式构建这样就可以具有分布式系统的特点并且提高任务的调度处理能力 1、并行任务调度 并行任务调度实现靠多线程如果有大量任务需要调度此时光靠多线程就会有瓶颈了因为一台计算机CPU的处理能力是有限的。 如果将任务调度程序分布式部署每个结点还可以部署为集群这样就可以让多台计算机共同去完成任务调度我们可以将任务分割为若干个分片由不同的实例并行执行来提高任务调度的处理效率。 2、高可用 若某一个实例宕机不影响其他实例来执行任务。 3、弹性扩容 当集群中增加实例就可以提高并执行任务的处理效率。 4、任务管理与监测 对系统中存在的所有定时任务进行统一的管理及监测。让开发人员及运维人员能够时刻了解任务执行情况从而做出快速的应急处理响应。 5、避免任务重复执行 当任务调度以集群方式部署同一个任务调度可能会执行多次比如在上面提到的电商系统中到点发优惠券的例子就会发放多次优惠券对公司造成很多损失所以我们需要控制相同的任务在多个运行实例上只执行一次。
6.2.2 XXL-JOB介绍
XXL-JOB是一个轻量级分布式任务调度平台其核心设计目标是开发迅速、学习简单、轻量级、易扩展。现已开放源代码并接入多家公司线上产品线开箱即用。 官网https://www.xuxueli.com/xxl-job/ 文档https://www.xuxueli.com/xxl-job/#%E3%80%8A%E5%88%86%E5%B8%83%E5%BC%8F%E4%BB%BB%E5%8A%A1%E8%B0%83%E5%BA%A6%E5%B9%B3%E5%8F%B0XXL-JOB%E3%80%8B XXL-JOB主要有调度中心、执行器、任务
调度中心 负责管理调度信息按照调度配置发出调度请求自身不承担业务代码 主要职责为执行器管理、任务管理、监控运维、日志管理等 任务执行器 负责接收调度请求并执行任务逻辑 只要职责是注册服务、任务执行服务接收到任务后会放入线程池中的任务队列、执行结果上报、日志服务等 任务负责执行具体的业务处理。
调度中心与执行器之间的工作流程如下
执行流程 1.任务执行器根据配置的调度中心的地址自动注册到调度中心 2.达到任务触发条件调度中心下发任务 3.执行器基于线程池执行任务并把执行结果放入内存队列中、把执行日志写入日志文件中 4.执行器消费内存队列中的执行结果主动上报给调度中心 5.当用户在调度中心查看任务日志调度中心请求任务执行器任务执行器读取任务日志文件并返回日志详情
6.2.3 搭建XXL-JOB
6.2.3.1 调度中心
首先下载XXL-JOB GitHubhttps://github.com/xuxueli/xxl-job 码云https://gitee.com/xuxueli0323/xxl-job 项目使用2.3.1版本 https://github.com/xuxueli/xxl-job/releases/tag/2.3.1 也可从课程资料目录获取解压xxl-job-2.3.1.zip 使用IDEA打开解压后的目录
xxl-job-admin调度中心 xxl-job-core公共依赖 xxl-job-executor-samples执行器Sample示例选择合适的版本执行器可直接使用 xxl-job-executor-sample-springbootSpringboot版本通过Springboot管理执行器推荐这种方式 xxl-job-executor-sample-frameless无框架版本 doc :文档资料包含数据库脚本 在下发的虚拟机的MySQL中已经创建了xxl_job_2.3.1数据库
如下图
执行sh /data/soft/restart.sh自动启动xxl-job调度中心 访问http://192.168.101.65:8088/xxl-job-admin/ 账号和密码admin/123456 如果无法使用虚拟机运行xxl-job可以在本机idea运行xxl-job调度中心。
6.2.3.2 执行器
下边配置执行器执行器负责与调度中心通信接收调度中心发起的任务调度请求。 1、下边进入调度中心添加执行器 http://192.168.101.65:8088/xxl-job-admin/jobgroup
点击新增填写执行器信息appname是前边在nacos中配置xxl信息时指定的执行器的应用名。
添加成功 2、首先在媒资管理模块的service工程添加依赖在项目的父工程已约定了版本2.3.1
dependencygroupIdcom.xuxueli/groupIdartifactIdxxl-job-core/artifactId
/dependency3、在nacos下的media-service-dev.yaml下配置xxl-job xxl:job:admin: addresses: http://192.168.101.65:8088/xxl-job-adminexecutor:appname: media-process-serviceaddress: ip: port: 9999logpath: /data/applogs/xxl-job/jobhandlerlogretentiondays: 30accessToken: default_token注意配置中的appname这是执行器的应用名port是执行器启动的端口如果本地启动多个执行器注意端口不能重复。 4、配置xxl-job的执行器 将xxl-job示例工程下配置类拷贝到媒资管理的service工程下
拷贝至
到此完成媒资管理模块service工程配置xxl-job执行器在xxl-job调度中心添加执行器下边准备测试执行器与调度中心是否正常通信因为接口工程依赖了service工程所以启动媒资管理模块的接口工程。 启动后观察日志出现下边的日志表示执行器在调度中心注册成功
同时观察调度中心中的执行器界面
在线机器地址处已显示1个执行器。
6.2.3.3 执行任务
下边编写任务参考示例工程中任务类的编写方法如下图
在媒资服务service包下新建jobhandler存放任务类下边参考示例工程编写一个任务类
package com.xuecheng.media.service.jobhandler;import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.concurrent.TimeUnit;/*** description 测试执行器* author Mr.M* date 2022/9/13 20:32* version 1.0*/ComponentSlf4j
public class SampleJob {/*** 1、简单任务示例Bean模式*/XxlJob(testJob)public void testJob() throws Exception {log.info(开始执行.....);}}下边在调度中心添加任务进入任务管理
点击新增填写任务信息
注意红色标记处 调度类型 固定速度指按固定的间隔定时调度。 Cron通过Cron表达式实现更丰富的定时调度策略。 Cron表达式是一个字符串通过它可以定义调度策略格式如下 {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)} xxl-job提供图形界面去配置
一些例子如下 30 10 1 * * ? 每天1点10分30秒触发 0/30 * * * * ? 每30秒触发一次
0/10 * * * ? 每10分钟触发一次 运行模式有BEAN和GLUEbean模式较常用就是在项目工程中编写执行器的任务代码GLUE是将任务代码编写在调度中心。 JobHandler即任务方法名填写任务方法上边XxlJob注解中的名称。 路由策略当执行器集群部署时调度中心向哪个执行器下发任务这里选择第一个表示只向第一个执行器下发任务路由策略的其它选项稍后在分片广播章节详细解释。 高级配置的其它配置项稍后在分片广播章节详细解释。
添加成功启动任务
通过调度日志查看任务执行情况 下边启动媒资管理的service工程启动执行器。 观察执行器方法的执行。
如果要停止任务需要在调度中心操作
任务跑一段时间注意清理日志
6.2.4 分片广播
掌握了xxl-job的基本使用下边思考如何进行分布式任务处理呢如下图我们会启动多个执行器组成一个集群去执行任务。
执行器在集群部署下调度中心有哪些调度策略呢 查看xxl-job官方文档阅读高级配置相关的内容 SQL
高级配置- 路由策略当执行器集群部署时提供丰富的路由策略包括FIRST第一个固定选择第一个机器LAST最后一个固定选择最后一个机器ROUND轮询RANDOM随机随机选择在线的机器CONSISTENT_HASH一致性HASH每个任务按照Hash算法固定选择某一台机器且所有任务均匀散列在不同机器上。LEAST_FREQUENTLY_USED最不经常使用使用频率最低的机器优先被选举LEAST_RECENTLY_USED最近最久未使用最久未使用的机器优先被选举FAILOVER故障转移按照顺序依次进行心跳检测第一个心跳检测成功的机器选定为目标执行器并发起调度BUSYOVER忙碌转移按照顺序依次进行空闲检测第一个空闲检测成功的机器选定为目标执行器并发起调度SHARDING_BROADCAST(分片广播)广播触发对应集群中所有机器执行一次任务同时系统自动传递分片参数可根据分片参数开发分片任务- 子任务每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取)当本任务执行结束并且执行成功时将会触发子任务ID所对应的任务的一次主动调度通过子任务可以实现一个任务执行完成去执行另一个任务。- 调度过期策略- 忽略调度过期后忽略过期的任务从当前时间开始重新计算下次触发时间- 立即执行一次调度过期后立即执行一次并从当前时间开始重新计算下次触发时间- 阻塞处理策略调度过于密集执行器来不及处理时的处理策略单机串行默认调度请求进入单机执行器后调度请求进入FIFO队列并以串行方式运行丢弃后续调度调度请求进入单机执行器后发现执行器存在运行的调度任务本次请求将会被丢弃并标记为失败覆盖之前调度调度请求进入单机执行器后发现执行器存在运行的调度任务将会终止运行中的调度任务并清空队列然后运行本地调度任务- 任务超时时间支持自定义任务超时时间任务运行超时将会主动中断任务- 失败重试次数支持自定义任务失败重试次数当任务失败时将会按照预设的失败重试次数主动进行重试下边要重点说的是分片广播策略分片是指是调度中心以执行器为维度进行分片将集群中的执行器标上序号0123…广播是指每次调度会向集群中的所有执行器发送任务调度请求中携带分片参数。 如下图
每个执行器收到调度请求同时接收分片参数。 xxl-job支持动态扩容执行器集群从而动态增加分片数量当有任务量增加可以部署更多的执行器到集群中调度中心会动态修改分片的数量。 作业分片适用哪些场景呢 •分片任务场景10个执行器的集群来处理10w条数据每台机器只需要处理1w条数据耗时降低10倍 •广播任务场景广播执行器同时运行shell脚本、广播集群节点进行缓存更新等。 所以广播分片方式不仅可以充分发挥每个执行器的能力并且根据分片参数可以控制任务是否执行最终灵活控制了执行器集群分布式处理任务。
使用说明 “分片广播” 和普通任务开发流程一致不同之处在于可以获取分片参数进行分片业务处理。 Java语言任务获取分片参数方式 BEAN、GLUE模式(Java)可参考Sample示例执行器中的示例任务ShardingJobHandler
Java
/*** 2、分片广播任务*/
XxlJob(shardingJobHandler)
public void shardingJobHandler() throws Exception {// 分片序号从0开始int shardIndex XxlJobHelper.getShardIndex();// 分片总数int shardTotal XxlJobHelper.getShardTotal();....下边测试作业分片 1、定义作业分片的任务方法
Java
/*** 2、分片广播任务*/XxlJob(shardingJobHandler)public void shardingJobHandler() throws Exception {// 分片参数int shardIndex XxlJobHelper.getShardIndex();int shardTotal XxlJobHelper.getShardTotal();log.info(分片参数当前分片序号 {}, 总分片数 {}, shardIndex, shardTotal);
log.info(开始执行第shardIndex批任务);}2、在调度中心添加任务
添加成功
启动任务观察日志
下边启动两个执行器实例观察每个实例的执行情况 首先在nacos中配置media-service的本地优先配置
#配置本地优先
spring:cloud:config:override-none: true将media-service启动两个实例 两个实例的在启动时注意端口不能冲突 实例1 在VM options处添加-Dserver.port63051 -Dxxl.job.executor.port9998 实例2 在VM options处添加-Dserver.port63050 -Dxxl.job.executor.port9999 例如
启动两个实例 观察任务调度中心稍等片刻执行器有两个
观察两个执行实例的日志
另一实例的日志如下
从日志可以看每个实例的分片序号不同。 如果其中一个执行器挂掉只剩下一个执行器在工作稍等片刻调用中心发现少了一个执行器将动态调整总分片数为1。 到此作业分片任务调试完成此时我们可以思考 当一次分片广播到来各执行器如何根据分片参数去分布式执行任务保证执行器之间执行的任务不重复呢
6.3 技术方案
6.3.1 作业分片方案
掌握了xxl-job的分片广播调度方式下边思考如何分布式去执行学成在线平台中的视频处理任务。 任务添加成功后对于要处理的任务会添加到待处理任务表中现在启动多个执行器实例去查询这些待处理任务此时如何保证多个执行器不会查询到重复的任务呢 XXL-JOB并不直接提供数据处理的功能它只会给执行器分配好分片序号在向执行器任务调度的同时下发分片总数以及分片序号等参数执行器收到这些参数根据自己的业务需求去利用这些参数。 下图表示了多个执行器获取视频处理任务的结构
每个执行器收到广播任务有两个参数分片总数、分片序号。每个执行从数据表取任务时可以让任务id 模上 分片总数如果等于分片序号则执行此任务。 上边两个执行器实例那么分片总数为2序号为0、1从任务1开始如下 1 % 2 1 执行器2执行 2 % 2 0 执行器1执行 3 % 2 1 执行器2执行 以此类推.
6.3.2 保证任务不重复执行
通过作业分片方案保证了执行器之间查询到不重复的任务如果一个执行器在处理一个视频还没有完成此时调度中心又一次请求调度为了不重复处理同一个视频该怎么办 首先配置调度过期策略 查看文档如下 - 调度过期策略调度中心错过调度时间的补偿处理策略包括忽略、立即补偿触发一次等 - 忽略调度过期后忽略过期的任务从当前时间开始重新计算下次触发时间 - 立即执行一次调度过期后立即执行一次并从当前时间开始重新计算下次触发时间 - 阻塞处理策略调度过于密集执行器来不及处理时的处理策略 这里我们选择忽略如果立即执行一次就可能重复执行相同的任务。
其次再看阻塞处理策略阻塞处理策略就是当前执行器正在执行任务还没有结束时调度中心进行任务调度此时该如何处理。 查看文档如下 单机串行默认调度请求进入单机执行器后调度请求进入FIFO队列并以串行方式运行 丢弃后续调度调度请求进入单机执行器后发现执行器存在运行的调度任务本次请求将会被丢弃并标记为失败 覆盖之前调度调度请求进入单机执行器后发现执行器存在运行的调度任务将会终止运行中的调度任务并清空队列然后运行本地调度任务 这里如果选择覆盖之前调度则可能重复执行任务这里选择 丢弃后续调度或单机串行方式来避免任务重复执行。 只做这些配置可以保证任务不会重复执行吗 做不到还需要保证任务处理的幂等性什么是任务的幂等性任务的幂等性是指对于数据的操作不论多少次操作的结果始终是一致的。在本项目中要实现的是不论多少次任务调度同一个视频只执行一次成功的转码。 什么是幂等性 它描述了一次和多次请求某一个资源对于资源本身应该具有同样的结果。 幂等性是为了解决重复提交问题比如恶意刷单重复支付等。 解决幂等性常用的方案 1数据库约束比如唯一索引主键。 2乐观锁常用于数据库更新数据时根据乐观锁状态去更新。 3唯一序列号操作传递一个唯一序列号操作时判断与该序列号相等则执行。 基于以上分析在执行器接收调度请求去执行视频处理任务时要实现视频处理的幂等性要有办法去判断该视频是否处理完成如果正在处理中或处理完则不再处理。这里我们在数据库视频处理表中添加处理状态字段视频处理完成更新状态为完成执行视频处理前判断状态是否完成如果完成则不再处理。
6.3.3 视频处理方案
确定了分片方案下边梳理整个视频上传及处理的业务流程。
上传视频成功向视频处理待处理表添加记录。 视频处理的详细流程如下
1、任务调度中心广播作业分片。 2、执行器收到广播作业分片从数据库读取待处理任务读取未处理及处理失败的任务。 3、执行器更新任务为处理中根据任务内容从MinIO下载要处理的文件。 4、执行器启动多线程去处理任务。 5、任务处理完成上传处理后的视频到MinIO。 6、将更新任务处理结果如果视频处理完成除了更新任务处理结果以外还要将文件的访问地址更新至任务处理表及文件表中最后将任务完成记录写入历史表。
6.4 查询待处理任务
6.4.1 需求分析
查询待处理任务只处理未提交及处理失败的任务任务处理失败后进行重试最多重试3次。 任务处理成功将待处理记录移动到历史任务表。 下图是待处理任务表
历史任务表与待处理任务表的结构相同。
6.4.2添加待处理任务
上传视频成功向视频处理待处理表添加记录暂时只添加对avi视频的处理记录。 根据MIME Type去判断是否是avi视频下边列出部分MIME Type
avi视频的MIME Type是video/x-msvideo 修改文件信息入库方法如下
Java
Transactional
public MediaFiles addMediaFilesToDb(Long companyId, String fileMd5, UploadFileParamsDto uploadFileParamsDto, String bucket, String objectName) {//从数据库查询文件MediaFiles mediaFiles mediaFilesMapper.selectById(fileMd5);if (mediaFiles null) {mediaFiles new MediaFiles();//拷贝基本信息BeanUtils.copyProperties(uploadFileParamsDto, mediaFiles);mediaFiles.setId(fileMd5);mediaFiles.setFileId(fileMd5);mediaFiles.setCompanyId(companyId);//媒体类型mediaFiles.setUrl(/ bucket / objectName);mediaFiles.setBucket(bucket);mediaFiles.setFilePath(objectName);mediaFiles.setCreateDate(LocalDateTime.now());mediaFiles.setAuditStatus(002003);mediaFiles.setStatus(1);//保存文件信息到文件表int insert mediaFilesMapper.insert(mediaFiles);if (insert 0) {log.error(保存文件信息到数据库失败,{}, mediaFiles.toString());XueChengPlusException.cast(保存文件信息失败);}//添加到待处理任务表addWaitingTask(mediaFiles);log.debug(保存文件信息到数据库成功,{}, mediaFiles.toString());}return mediaFiles;}/*** 添加待处理任务* param mediaFiles 媒资文件信息*/
private void addWaitingTask(MediaFiles mediaFiles){//文件名称String filename mediaFiles.getFilename();//文件扩展名String exension filename.substring(filename.lastIndexOf(.));//文件mimeTypeString mimeType getMimeType(exension);//如果是avi视频添加到视频待处理表if(mimeType.equals(video/x-msvideo)){MediaProcess mediaProcess new MediaProcess();BeanUtils.copyProperties(mediaFiles,mediaProcess);mediaProcess.setStatus(1);//未处理mediaProcess.setFailCount(0);//失败次数默认为0mediaProcessMapper.insert(mediaProcess);}
}6.4.3 查询待处理任务
如何保证查询到的待处理视频记录不重复 编写根据分片参数获取待处理任务的DAO方法定义DAO接口如下
Java
public interface MediaProcessMapper extends BaseMapperMediaProcess {/*** description 根据分片参数获取待处理任务* param shardTotal 分片总数* param shardindex 分片序号* param count 任务数* return java.util.Listcom.xuecheng.media.model.po.MediaProcess * author Mr.M* date 2022/9/14 8:54*/Select(select * from media_process t where t.id % #{shardTotal} #{shardIndex} and (t.status 1 or t.status 3) and t.fail_count 3 limit #{count})ListMediaProcess selectListByShardIndex(Param(shardTotal) int shardTotal,Param(shardIndex) int shardIndex,Param(count) int count);
}定义Service接口查询待处理
Java
package com.xuecheng.media.service;import com.xuecheng.base.model.PageParams;
import com.xuecheng.base.model.PageResult;
import com.xuecheng.base.model.RestResponse;
import com.xuecheng.media.model.dto.QueryMediaParamsDto;
import com.xuecheng.media.model.dto.UploadFileParamsDto;
import com.xuecheng.media.model.dto.UploadFileResultDto;
import com.xuecheng.media.model.po.MediaFiles;
import com.xuecheng.media.model.po.MediaProcess;
import org.springframework.transaction.annotation.Transactional;import java.io.File;
import java.util.List;/*** author Mr.M* version 1.0* description 媒资文件处理业务方法* date 2022/9/10 8:55*/
public interface MediaFileProcessService {/*** description 获取待处理任务* param shardIndex 分片序号* param shardTotal 分片总数* param count 获取记录数* return java.util.Listcom.xuecheng.media.model.po.MediaProcess* author Mr.M* date 2022/9/14 14:49*/public ListMediaProcess getMediaProcessList(int shardIndex,int shardTotal,int count);}service接口实现
Java
package com.xuecheng.media.service.impl;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xuecheng.media.mapper.MediaFilesMapper;
import com.xuecheng.media.mapper.MediaProcessHistoryMapper;
import com.xuecheng.media.mapper.MediaProcessMapper;
import com.xuecheng.media.model.po.MediaFiles;
import com.xuecheng.media.model.po.MediaProcess;
import com.xuecheng.media.model.po.MediaProcessHistory;
import com.xuecheng.media.service.MediaFileProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.time.LocalDateTime;
import java.util.List;/*** description TODO* author Mr.M* date 2022/9/14 14:41* version 1.0*/
Slf4j
Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {AutowiredMediaFilesMapper mediaFilesMapper;AutowiredMediaProcessMapper mediaProcessMapper;Overridepublic ListMediaProcess getMediaProcessList(int shardIndex, int shardTotal, int count) {ListMediaProcess mediaProcesses mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);return mediaProcesses;}}6.5 开始执行任务
6.5.1 分布式锁
为了避免多线程去争抢同一个任务可以使用synchronized同步锁去解决如下代码
Java
synchronized(锁对象){执行任务...
}synchronized只能保证同一个虚拟机中多个线程去争抢锁。
如果是多个执行器分布式部署并不能保证同一个视频只有一个执行器去处理。 现在要实现分布式环境下所有虚拟机中的线程去同步执行就需要让多个虚拟机去共用一个锁虚拟机可以分布式部署锁也可以分布式部署如下图
虚拟机都去抢占同一个锁锁是一个单独的程序提供加锁、解锁服务。 该锁已不属于某个虚拟机而是分布式部署由多个虚拟机所共享这种锁叫分布式锁。 实现分布式锁的方案有很多常用的如下 1、基于数据库实现分布锁 利用数据库主键唯一性的特点或利用数据库唯一索引、行级锁的特点多个线程同时去更新相同的记录谁更新成功谁就抢到锁。 2、基于redis实现锁 redis提供了分布式锁的实现方案比如SETNX、set nx、redisson等。 拿SETNX举例说明SETNX命令的工作过程是去set一个不存在的key多个线程去设置同一个key只会有一个线程设置成功设置成功的的线程拿到锁。 3、使用zookeeper实现 zookeeper是一个分布式协调服务主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录多线程向zookeeper创建一个子目录(节点)只会有一个创建成功利用此特点可以实现分布式锁谁创建该结点成功谁就获得锁。
6.5.2 开启任务
下边基于数据库方式实现分布锁开始执行任务将任务执行状态更新为4表示任务执行中。 下边的sql语句可以实现更新操作
update media_process m set m.status4 where m.id?如果是多个线程去执行该sql都将会执行成功但需求是只能有一个线程抢到锁所以此sql无法满足需求。 下边使用乐观锁的方式实现更新操作
javaupdate media_process m set m.status4 where (m.status1 or m.status3) and m.fail_count3 and m.id?多个线程同时执行上边的sql只会有一个线程执行成功。 什么是乐观锁、悲观锁 synchronized是一种悲观锁在执行被synchronized包裹的代码时需要首先获取锁没有拿到锁则无法执行是总悲观的认为别的线程会去抢所以要悲观锁。 乐观锁的思想是它不认为会有线程去争抢尽管去执行如果没有执行成功就再去重试。 实现如下 1、定义mapper
Java
public interface MediaProcessMapper extends BaseMapperMediaProcess {/*** 开启一个任务* param id 任务id* return 更新记录数*/Update(update media_process m set m.status4 where (m.status1 or m.status3) and m.fail_count3 and m.id#{id})int startTask(Param(id) long id);}2、service方法
Java/*** 开启一个任务* param id 任务id* return true开启任务成功false开启任务失败*/
public boolean startTask(long id);//实现如下
public boolean startTask(long id) {int result mediaProcessMapper.startTask(id);return result0?false:true;
}6.6 更新任务状态
任务处理完成需要更新任务处理结果任务执行成功更新视频的URL、及任务处理结果将待处理任务记录删除同时向历史任务表添加记录。 在MediaFileProcessService接口添加方法
Java
/*** description 保存任务结果* param taskId 任务id* param status 任务状态* param fileId 文件id* param url url* param errorMsg 错误信息* return void* author Mr.M* date 2022/10/15 11:29*/
void saveProcessFinishStatus(Long taskId,String status,String fileId,String url,String errorMsg);service接口方法实现如下
Java
package com.xuecheng.media.service.impl;import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.xuecheng.media.mapper.MediaFilesMapper;
import com.xuecheng.media.mapper.MediaProcessHistoryMapper;
import com.xuecheng.media.mapper.MediaProcessMapper;
import com.xuecheng.media.model.po.MediaFiles;
import com.xuecheng.media.model.po.MediaProcess;
import com.xuecheng.media.model.po.MediaProcessHistory;
import com.xuecheng.media.service.MediaFileProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.AutoConfigureOrder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;import java.time.LocalDateTime;
import java.util.List;/*** description TODO* author Mr.M* date 2022/9/14 14:41* version 1.0*/
Slf4j
Service
public class MediaFileProcessServiceImpl implements MediaFileProcessService {AutowiredMediaFilesMapper mediaFilesMapper;AutowiredMediaProcessMapper mediaProcessMapper;AutowiredMediaProcessHistoryMapper mediaProcessHistoryMapper;Transactional
Override
public void saveProcessFinishStatus(Long taskId, String status, String fileId, String url, String errorMsg) {//查出任务如果不存在则直接返回MediaProcess mediaProcess mediaProcessMapper.selectById(taskId);if(mediaProcess null){return ;}//处理失败更新任务处理结果LambdaQueryWrapperMediaProcess queryWrapperById new LambdaQueryWrapperMediaProcess().eq(MediaProcess::getId, taskId);//处理失败if(status.equals(3)){MediaProcess mediaProcess_u new MediaProcess();mediaProcess_u.setStatus(3);mediaProcess_u.setErrormsg(errorMsg);mediaProcess_u.setFailCount(mediaProcess.getFailCount()1);mediaProcessMapper.update(mediaProcess_u,queryWrapperById);log.debug(更新任务处理状态为失败任务信息:{},mediaProcess_u);return ;}//任务处理成功MediaFiles mediaFiles mediaFilesMapper.selectById(fileId);if(mediaFiles!null){//更新媒资文件中的访问urlmediaFiles.setUrl(url);mediaFilesMapper.updateById(mediaFiles);}//处理成功更新url和状态mediaProcess.setUrl(url);mediaProcess.setStatus(2);mediaProcess.setFinishDate(LocalDateTime.now());mediaProcessMapper.updateById(mediaProcess);//添加到历史记录MediaProcessHistory mediaProcessHistory new MediaProcessHistory();BeanUtils.copyProperties(mediaProcess, mediaProcessHistory);mediaProcessHistoryMapper.insert(mediaProcessHistory);//删除mediaProcessmediaProcessMapper.deleteById(mediaProcess.getId());}Overridepublic ListMediaProcess getMediaProcessList(int shardIndex, int shardTotal, int count) {ListMediaProcess mediaProcesses mediaProcessMapper.selectListByShardIndex(shardTotal, shardIndex, count);return mediaProcesses;}}6.7 视频处理
视频采用并发处理每个视频使用一个线程去处理每次处理的视频数量不要超过cpu核心数。 所有视频处理完成结束本次执行为防止代码异常出现无限期等待则添加超时设置到达超时时间还没有处理完成仍结束任务。 定义任务类VideoTask 如下
Java
package com.xuecheng.media.service.jobhander;import com.xuecheng.base.utils.Mp4VideoUtil;
import com.xuecheng.media.model.po.MediaProcess;
import com.xuecheng.media.service.MediaFileProcessService;
import com.xuecheng.media.service.MediaFileService;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.*;/*** author Mr.M* version 1.0* description TODO* date 2022/10/15 11:58*/
Slf4j
Component
public class VideoTask {AutowiredMediaFileService mediaFileService;AutowiredMediaFileProcessService mediaFileProcessService;Value(${videoprocess.ffmpegpath})String ffmpegpath;XxlJob(videoJobHandler)public void videoJobHandler() throws Exception {// 分片参数int shardIndex XxlJobHelper.getShardIndex();int shardTotal XxlJobHelper.getShardTotal();ListMediaProcess mediaProcessList null;int size 0;try {//取出cpu核心数作为一次处理数据的条数int processors Runtime.getRuntime().availableProcessors();//一次处理视频数量不要超过cpu核心数mediaProcessList mediaFileProcessService.getMediaProcessList(shardIndex, shardTotal, processors);size mediaProcessList.size();log.debug(取出待处理视频任务{}条, size);if (size 0) {return;}} catch (Exception e) {e.printStackTrace();return;}//启动size个线程的线程池ExecutorService threadPool Executors.newFixedThreadPool(size);//计数器CountDownLatch countDownLatch new CountDownLatch(size);//将处理任务加入线程池mediaProcessList.forEach(mediaProcess - {threadPool.execute(() - {try {//任务idLong taskId mediaProcess.getId();//抢占任务boolean b mediaFileProcessService.startTask(taskId);if (!b) {return;}log.debug(开始执行任务:{}, mediaProcess);//下边是处理逻辑//桶String bucket mediaProcess.getBucket();//存储路径String filePath mediaProcess.getFilePath();//原始视频的md5值String fileId mediaProcess.getFileId();//原始文件名称String filename mediaProcess.getFilename();//将要处理的文件下载到服务器上File originalFile mediaFileService.downloadFileFromMinIO(mediaProcess.getBucket(), mediaProcess.getFilePath());if (originalFile null) {log.debug(下载待处理文件失败,originalFile:{}, mediaProcess.getBucket().concat(mediaProcess.getFilePath()));mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), 3, fileId, null, 下载待处理文件失败);return;}//处理结束的视频文件File mp4File null;try {mp4File File.createTempFile(mp4, .mp4);} catch (IOException e) {log.error(创建mp4临时文件失败);mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), 3, fileId, null, 创建mp4临时文件失败);return;}//视频处理结果String result ;try {//开始处理视频Mp4VideoUtil videoUtil new Mp4VideoUtil(ffmpegpath, originalFile.getAbsolutePath(), mp4File.getName(), mp4File.getAbsolutePath());//开始视频转换成功将返回successresult videoUtil.generateMp4();} catch (Exception e) {e.printStackTrace();log.error(处理视频文件:{},出错:{}, mediaProcess.getFilePath(), e.getMessage());}if (!result.equals(success)) {//记录错误信息log.error(处理视频失败,视频地址:{},错误信息:{}, bucket filePath, result);mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), 3, fileId, null, result);return;}//将mp4上传至minio//mp4在minio的存储路径String objectName getFilePath(fileId, .mp4);//访问urlString url / bucket / objectName;try {mediaFileService.addMediaFilesToMinIO(mp4File.getAbsolutePath(), video/mp4, bucket, objectName);//将url存储至数据并更新状态为成功并将待处理视频记录删除存入历史mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), 2, fileId, url, null);} catch (Exception e) {log.error(上传视频失败或入库失败,视频地址:{},错误信息:{}, bucket objectName, e.getMessage());//最终还是失败了mediaFileProcessService.saveProcessFinishStatus(mediaProcess.getId(), 3, fileId, null, 处理后视频上传或入库失败);}}finally {countDownLatch.countDown();}});});//等待,给一个充裕的超时时间,防止无限等待到达超时时间还没有处理完成则结束任务countDownLatch.await(30, TimeUnit.MINUTES);}private String getFilePath(String fileMd5,String fileExt){return fileMd5.substring(0,1) / fileMd5.substring(1,2) / fileMd5 / fileMd5 fileExt;}}6.8 测试
6.8.1 基本测试
进入xxl-job调度中心添加执行器和视频处理任务 在xxl-job配置任务调度策略 1配置阻塞处理策略为丢弃后续调度。 2配置视频处理调度时间间隔不用根据视频处理时间去确定可以配置的小一些如5分钟即使到达调度时间如果视频没有处理完会丢弃调度请求。 配置完成开始测试视频处理 1、首先上传至少4个视频非mp4格式。 2、在xxl-job启动视频处理任务 3、观察媒资管理服务后台日志 7.8.2 失败测试 1、先停止调度中心的视频处理任务。 2、上传视频手动修改待处理任务表中file_path字段为一个不存在的文件地址 3、启动任务 观察任务处理失败后是否会重试并记录失败次数。
6.8.3 抢占任务测试
1、修改调度中心中视频处理任务的阻塞处理策略为“覆盖之间的调度”
2、在抢占任务代码处打断点并选择支持多线程方式
3、在抢占任务代码处的下边两行代码分别打上断点避免观察时代码继续执行。 4、启动任务 此时多个线程执行都停留在断点处
依次放行观察同一个任务只会被一个线程抢占成功。 6.9 其它问题
6.9.1 任务补偿机制
如果有线程抢占了某个视频的处理任务如果线程处理过程中挂掉了该视频的状态将会一直是处理中其它线程将无法处理这个问题需要用补偿机制。 单独启动一个任务找到待处理任务表中超过执行期限但仍在处理中的任务将任务的状态改为执行失败。 任务执行期限是处理一个视频的最大时间比如定为30分钟通过任务的启动时间去判断任务是否超过执行期限。 大家思考这个sql该如何实现 大家尝试自己实现此任务补偿机制。
6.9.2 达到最大失败次数
当任务达到最大失败次数时一般就说明程序处理此视频存在问题这种情况就需要人工处理在页面上会提示失败的信息人工可手动执行该视频进行处理或通过其它转码工具进行视频转码转码后直接上传mp4视频。
7 绑定媒资
7.1 需求分析
7.1.1 业务流程
到目前为止媒资管理已完成文件上传、视频处理、我的媒资功能等基本功能其它模块可以使用媒资文件本节要讲解课程计划绑定媒资文件。 如何将课程计划绑定媒资呢 首先进入课程计划界面然后选择要绑定的视频进行绑定即可。 具体的业务流程如下 1.教育机构用户进入课程管理页面并编辑某一个课程在课程大纲标签页的某一小节后可点击”添加视频“。
2.弹出添加视频对话框可通过视频关键字搜索已审核通过的视频媒资。 3.选择视频媒资点击提交按钮完成课程计划绑定媒资流程。 课程计划关联视频后如下图
点击已经绑定的视频名称即可解除绑定。
7.1.2 数据模型
课程计划绑定媒资文件后存储至课程计划绑定媒资表
7.2 接口定义
根据业务流程用户进入课程计划列表首先确定向哪个课程计划添加视频点击”添加视频“后用户选择视频选择视频点击提交前端以json格式请求以下参数 提交媒资文件id、文件名称、教学计划id 示例如下
JSON
{mediaId: 70a98b4a2fffc89e50b101f959cc33ca,fileName: 22-Hmily实现TCC事务-开发bank2的confirm方法.avi,teachplanId: 257
}此接口在内容管理模块提供。 在内容管理模块定义请求参数模型类型
JavaData
ApiModel(valueBindTeachplanMediaDto, description教学计划-媒资绑定提交数据)
public class BindTeachplanMediaDto {ApiModelProperty(value 媒资文件id, required true)
private String mediaId;ApiModelProperty(value 媒资文件名称, required true)
private String fileName;ApiModelProperty(value 课程计划标识, required true)private Long teachplanId;}在TeachplanController类中定义接口如下
Java
ApiOperation(value 课程计划和媒资信息绑定)
PostMapping(/teachplan/association/media)
public void associationMedia(RequestBody BindTeachplanMediaDto bindTeachplanMediaDto){}7.3 接口开发
7.3.1 DAO开发
对teachplanMedia表自动生成Mapper。
7.3.2 Service开发
根据需求定义service接口
Java
/*** description 教学计划绑定媒资* param bindTeachplanMediaDto* return com.xuecheng.content.model.po.TeachplanMedia* author Mr.M* date 2022/9/14 22:20
*/
public TeachplanMedia associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto);定义接口实现
JavaTransactionalOverride
public TeachplanMedia associationMedia(BindTeachplanMediaDto bindTeachplanMediaDto) {//教学计划idLong teachplanId bindTeachplanMediaDto.getTeachplanId();Teachplan teachplan teachplanMapper.selectById(teachplanId);if(teachplannull){XueChengPlusException.cast(教学计划不存在);}Integer grade teachplan.getGrade();if(grade!2){XueChengPlusException.cast(只允许第二级教学计划绑定媒资文件);}//课程idLong courseId teachplan.getCourseId();//先删除原来该教学计划绑定的媒资teachplanMediaMapper.delete(new LambdaQueryWrapperTeachplanMedia().eq(TeachplanMedia::getTeachplanId,teachplanId));//再添加教学计划与媒资的绑定关系TeachplanMedia teachplanMedia new TeachplanMedia();teachplanMedia.setCourseId(courseId);teachplanMedia.setTeachplanId(teachplanId);teachplanMedia.setMediaFilename(bindTeachplanMediaDto.getFileName());teachplanMedia.setMediaId(bindTeachplanMediaDto.getMediaId());teachplanMedia.setCreateDate(LocalDateTime.now());teachplanMediaMapper.insert(teachplanMedia);return teachplanMedia;
}7.3.3 接口层完善
完善接口层调用Service层的代码
Java
ApiOperation(value 课程计划和媒资信息绑定)
PostMapping(/teachplan/association/media)
void associationMedia(RequestBody BindTeachplanMediaDto bindTeachplanMediaDto){teachplanService.associationMedia(bindTeachplanMediaDto);
}7.3.4 接口测试
1、使用httpclient测试
Plain Text
### 课程计划绑定视频
POST {{media_host}}/media/teachplan/association/media
Content-Type: application/json{mediaId: ,fileName: ,teachplanId:
}2、前后端联调 此功能较为简单推荐直接前后端联调 向指定课程计划添加视频
7.4 实战
根据接口定义实现解除绑定功能。 点击已经绑定的视频名称即可解除绑定。
接口定义如下
Java
delete /teachplan/association/media/{teachPlanId}/{mediaId}在TeachplanController中定义接口
ApiOperation(课程计划解除媒资信息绑定)
DeleteMapping(/teachplan/association/media/{teachPlanId}/{mediaId})
public void unassociationMedia(PathVariable Long teachPlanId, PathVariable Long mediaId) {}根据需求定义Service接口
/** 解绑教学计划与媒资信息* param teachPlanId 教学计划id* param mediaId 媒资信息id*/
void unassociationMedia(Long teachPlanId, Long mediaId);定义接口实现
Override
public void unassociationMedia(Long teachPlanId, Long mediaId) {LambdaQueryWrapperTeachplanMedia queryWrapper new LambdaQueryWrapper();queryWrapper.eq(TeachplanMedia::getTeachplanId, teachPlanId).eq(TeachplanMedia::getMediaId, mediaId);teachplanMediaMapper.delete(queryWrapper);
}完善接口层调用service层的代码
ApiOperation(课程计划解除媒资信息绑定)
DeleteMapping(/teachplan/association/media/{teachPlanId}/{mediaId})
public void unassociationMedia(PathVariable Long teachPlanId, PathVariable String mediaId) {teachplanService.unassociationMedia(teachPlanId, mediaId);
}返回200状态码表示成功。
开发完成使用httpclient测试、前后端联调
### 课程计划接触视频绑定
DELETE {{media_host}}/media/teachplan/association/media/{teachPlanId}/{mediaId}