贵州省建设厅的网站,网站开发细节,网站建设属于什么开票类目,wordpress站点取名文章目录概要效果解释状态流转说明设计AI任务实体类AI任务状态枚举AI模型枚举基础实体类简单字典接口工厂策略模式 接口设计AiJobProcessorAiJobProcessorFactory观察者模式AI任务相关的EventMyEventListenerMyEventPubLisherRedissonConfig定时任务实现ReplicateJobProcessorR…
文章目录概要效果解释状态流转说明设计AI任务实体类AI任务状态枚举AI模型枚举基础实体类简单字典接口工厂策略模式 接口设计AiJobProcessorAiJobProcessorFactory观察者模式AI任务相关的EventMyEventListenerMyEventPubLisherRedissonConfig定时任务实现ReplicateJobProcessorReplicateApiOkHttpClientUtil 通用万能版新建任务参考定时任务实现IAiJobServiceAiJobServiceImpl整体业务流程总结概要
我发现在无论是什么项目中都几乎很难避免三方对接API的任务或者本地长时间的服务那么有必要设计一套稳定并发高且扩展性高的一套设计方案来完成这些任务的状态监听和进度更新那么我在多年的探索中设计一套适用于三方API的任务状态更新的解决方案的设计。
效果 解释
效果用的是webscoket通信来实现的这里就不侧重讲了本文章注重后端的设计包您满意
状态
- DRAFT(0, 草稿, 0)
- SUBMITTED(1, 已提交, 16.67)
- QUEUED(2, 排队中, 33.33)
- PROCESSING(3, 生成中, 50.00)
- GENERATED(4, 已生成, 66.67)
- MIGRATING(5, 迁移中, 83.33)
- SUCCESS(6, 成功, 100.00)
- FAILED(7, 失败, 0)
- TIMEOUT(8, 超时, 0)
- CANCELED(9, 取消, 0)
这些状态我发现是必不可少的几乎可以同时概括三方任务的状态为什么要有迁移这个也是必须三方给的文件图片视频什么的毕竟都是三方不可靠的要迁移到自己的oss或者存储系统中去。流转说明
- 正常流程进度递增 草稿(0) → 已提交(1) → 排队中(2) → 生成中(3) → 已生成(4) → 迁移中(5) → 成功(6)
- 异常终止流程
- 取消(CANCELED)可从任意非终止状态主动触发
- 超时(TIMEOUT)提交/排队/处理/迁移阶段超时时触发
- 失败(FAILED)提交/处理/迁移阶段执行异常时触发
- 终态节点 成功(6)、失败(7)、超时(8)、取消(9)为最终状态无后续流转设计
AI任务实体类
package com.cc672cc.entity.tb;import com.gitee.sunchenbin.mybatis.actable.annotation.*;
import com.gitee.sunchenbin.mybatis.actable.constants.MySqlTypeConstant;
import com.cc672cc.entity.BaseEntity;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import tk.mybatis.mapper.annotation.KeySql;
import javax.persistence.Id;
import javax.persistence.Table;
import java.math.BigDecimal;
import java.util.Date;/*** AI任务表*/
Data
Table(name tb_ai_job)
TableComment(AI任务表)
Schema(description AI任务表)
public class AiJob extends BaseEntity {/*** ID*/IdKeySql(useGeneratedKeys true)IsAutoIncrementColumn(name id, type MySqlTypeConstant.BIGINT, isKey true, isNull false, comment ID)Schema(description ID)private Long id;/*** 用户id*/Column(name user_id, type MySqlTypeConstant.BIGINT, comment 用户id)Schema(description 用户id)private Long userId;/*** 照片点评分析编码*/Column(name photo_review_analysis_code, type MySqlTypeConstant.VARCHAR, length 10, comment 照片点评分析编码)Schema(description 照片点评分析编码)private String photoReviewAnalysisCode;/*** 类型* 生文本 生图片 生视频*/Column(name type, type MySqlTypeConstant.VARCHAR, length 10, comment 类型生文本/生图片/生视频)Schema(description 类型)private String type;/*** 动作*/Column(name action, type MySqlTypeConstant.VARCHAR, length 20, comment 动作)Schema(description 动作)private String action;/*** 编码*/Column(name code, type MySqlTypeConstant.VARCHAR, length 10, comment 编码)Schema(description 编码)private String code;/*** 渠道*/Column(name channel, type MySqlTypeConstant.VARCHAR, length 20, comment 渠道)Schema(description 渠道)private String channel;/*** 平台*/Column(name platform, type MySqlTypeConstant.VARCHAR, length 20, comment 平台)Schema(description 平台)private String platform;/*** 模型*/Column(name model, type MySqlTypeConstant.VARCHAR, length 50, comment 模型)Schema(description 模型)private String model;/*** 是否异步* 0否1是*/Column(name asyn, type MySqlTypeConstant.TINYINT, length 1, comment 是否异步0否1是)Schema(description 是否异步)private Boolean asyn;/*** 模型版本*/Column(name model_version, type MySqlTypeConstant.VARCHAR, length 50, comment 模型版本)Schema(description 模型版本)private String modelVersion;/*** 模型id*/Column(name model_id, type MySqlTypeConstant.VARCHAR, length 50, comment 模型id)Schema(description 模型id)private String modelId;/*** 模型名称*/Column(name model_name, type MySqlTypeConstant.VARCHAR, length 50, comment 模型名称)Schema(description 模型名称)private String modelName;/*** 输出数量*/Column(name output_count, type MySqlTypeConstant.INT, comment 输出数量, defaultValue 1)Schema(description 输出数量)private Integer outputCount 1;/*** 创建日期*/Column(name create_date, type MySqlTypeConstant.VARCHAR, length 10, comment 创建日期)Schema(description 创建日期)private String createDate;/*** 请求时间*/Column(name req_time, type MySqlTypeConstant.DATETIME, comment 请求时间)Schema(description 请求时间)private Date reqTime;/*** 响应时间*/Column(name resp_time, type MySqlTypeConstant.DATETIME, comment 响应时间)Schema(description 响应时间)private Date respTime;/*** 耗时* 单位s*/Column(name cost_time, type MySqlTypeConstant.BIGINT, comment 耗时单位s)Schema(description 耗时 单位s)private Long costTime;/*** 三方id*/Column(name out_id, type MySqlTypeConstant.VARCHAR, length 100, comment 三方id)Schema(description 三方id)private String outId;/*** 请参json*/Column(name req_json, type MySqlTypeConstant.TEXT, comment 请参json)Schema(description 请参json)private String reqJson;/*** 反参json*/Column(name resp_json, type MySqlTypeConstant.TEXT, comment 反参json)Schema(description 反参json)private String respJson;/*** 任务状态(0草稿 1已提交 2排队中 3生成中 4已生成 5迁移中 6成功 7失败 8超时 9取消)*/Column(name job_status, type MySqlTypeConstant.INT, comment 任务状态(0草稿 1已提交 2排队中 3生成中 4已生成 5迁移中 6成功 7失败 8超时 9取消))Schema(description 任务状态(0草稿 1已提交 2排队中 3生成中 4已生成 5迁移中 6成功 7失败 8超时 9取消))private Integer jobStatus;/*** 单元进度* 对应的是每个任务阶段的进度*/Column(name unit_progress, type MySqlTypeConstant.DECIMAL, length 5, decimalLength 2, comment 单元进度, defaultValue 0)Schema(description 单元进度)private BigDecimal unitProgress;/*** 任务状态描述*/Column(name job_status_desc, type MySqlTypeConstant.VARCHAR, length 255, comment 任务状态描述)Schema(description 图片状态描述)private String jobStatusDesc;/*** 整体进度*/Column(name overall_progress, type MySqlTypeConstant.DECIMAL, length 5, decimalLength 2, comment 整体进度, defaultValue 0)Schema(description 整体进度)private BigDecimal overallProgress;}AI任务状态枚举
package com.cc672cc.enums.dict;import com.cc672cc.enums.IDict;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;import java.math.BigDecimal;
import java.util.LinkedHashMap;
import java.util.Map;/*** AI任务状态枚举** author CC* date 2019/5/5 14:34**/
Getter
Schema(description AI任务状态 0草稿 1已提交 2排队中 3生成中 4已生成 5迁移中 6成功 7失败 8超时 9取消)
public enum AiJobStatusEnum implements IDict {DRAFT(0, 草稿, BigDecimal.ZERO),SUBMITTED(1, 已提交, new BigDecimal(16.67)),QUEUED(2, 排队中, new BigDecimal(33.33)),PROCESSING(3, 生成中, new BigDecimal(50.00)),GENERATED(4, 已生成, new BigDecimal(66.67)),MIGRATING(5, 迁移中, new BigDecimal(83.33)),SUCCESS(6, 成功, new BigDecimal(100.00)),FAILED(7, 失败, BigDecimal.ZERO),TIMEOUT(8, 超时, BigDecimal.ZERO),CANCELED(9, 取消, BigDecimal.ZERO);private Integer code;private String description;/*** 进度*/private BigDecimal progress;public static MapString, String cdMap;public static MapInteger, AiJobStatusEnum map;AiJobStatusEnum(int code, String description, BigDecimal progress) {this.code code;this.description description;this.progress progress;}Overridepublic MapString, String dictMap() {if (cdMap null) {cdMap new LinkedHashMap();AiJobStatusEnum[] values values();for (AiJobStatusEnum value : values) {cdMap.put(String.valueOf(value.getCode()), value.getDescription());}}return cdMap;}public static MapInteger, AiJobStatusEnum getMap() {if (map null) {map new LinkedHashMap();AiJobStatusEnum[] values values();for (AiJobStatusEnum value : values) {map.put(value.getCode(), value);}}return map;}public static BigDecimal getProgress(Integer code) {return getMap().get(code).getProgress();}
}
AI模型枚举
package com.cc672cc.enums.dict;import com.cc672cc.enums.IDict;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter;import java.util.LinkedHashMap;
import java.util.Map;/*** AI模型枚举** author CC* date 2019/5/5 14:34**/
Getter
Schema(description AI模型枚举)
public enum AiModelEnum implements IDict {TTAPI_MIDJOURNEY_V7(ttapi_midjourney_v7, TTAPI_MIDJOURNEY_V7, TTApi Midjourney V7, Midjourney, 7.0, TTapi的Midjourney的第七版本, TTApi, TTApi, true, 1),REPLICATE_IMAGEUPSCALE_V1(replicate_imageupscale_v1, REPLICATE_IMAGEUPSCALE_V1, Replicate ImageUpscale V1, ImageUpscale, 1.0, Replicate的图片放大的第一版本, Replicate, Replicate, true, 2),PHOTOREVIEW_ANALYSIS_V1(photoReview_analysis_v1, PHOTOREVIEW_ANALYSIS_V1, PhotoReview Analysis V1, PhotoReviewAnalysis, 1.0, 照片点评的第一版本, PhotoReview, PhotoReview, true, 1);/*** 模型id*/private String modelId;/*** 模型编码* 自定义的编码 格式 {平台}_{模型}_{版本}*/private String modelCode;private String modelName;private String model;private String modelVersion;private String modelDesc;/*** 平台*/private String platform;/*** 渠道* 指哪个公司的 集团下面的*/private String channel;/*** 是否异步*/private Boolean asyn;/*** 单位积分* 就是每个输出的积分*/private Integer unitPoint;public static MapString, AiModelEnum map;public MapString, String inmap;AiModelEnum(String modelId, String modelCode, String modelName, String model, String modelVersion, String modelDesc, String platform, String channel, Boolean asyn, Integer unitPoint) {this.modelId modelId;this.modelCode modelCode;this.modelName modelName;this.model model;this.modelVersion modelVersion;this.modelDesc modelDesc;this.platform platform;this.channel channel;this.asyn asyn;this.unitPoint unitPoint;}Overridepublic MapString, String dictMap() {if (inmap null) {inmap new LinkedHashMap();AiModelEnum[] values values();for (AiModelEnum value : values) {inmap.put(value.getModelId(), value.getModelName());}}return inmap;}public static MapString, AiModelEnum getMap() {if (map null) {map new LinkedHashMap();AiModelEnum[] values values();for (AiModelEnum value : values) {map.put(value.getModelId(), value);}}return map;}}
基础实体类
package com.cc672cc.entity;import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableLogic;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.gitee.sunchenbin.mybatis.actable.annotation.Column;
import com.gitee.sunchenbin.mybatis.actable.annotation.ColumnComment;
import com.gitee.sunchenbin.mybatis.actable.constants.MySqlTypeConstant;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;import java.util.Date;Data
Schema(description 基础实体类)
public class BaseEntity {/*** 状态(0禁止1正常)*/Column(type MySqlTypeConstant.INT, length 1, defaultValue 1)Schema(description 状态(0禁止1正常))ColumnComment(状态(0禁止1正常))TableField(fill FieldFill.INSERT)private Integer status;/*** 删除状态(0否1是)*/TableLogicSchema(description 删除状态(0否1是))Column(type MySqlTypeConstant.INT, length 1, defaultValue 0)ColumnComment(删除状态(0否1是))TableField(fill FieldFill.INSERT)private Integer del;/*** 排序权重*/Schema(description 排序权重)Column(type MySqlTypeConstant.INT, length 4, defaultValue 0)ColumnComment(排序权重)private Integer sort;/*** 创建人*/Column(type MySqlTypeConstant.VARCHAR, length 64)ColumnComment(创建人)TableField(fill FieldFill.INSERT)private String createBy;/*** 创建时间*/JsonFormat(pattern yyyy-MM-dd HH:mm:ss)Schema(description 创建时间)Column(type MySqlTypeConstant.DATETIME, defaultValue CURRENT_TIMESTAMP)ColumnComment(创建时间)private Date createTime;/*** 更新人*/Column(type MySqlTypeConstant.VARCHAR, length 64)ColumnComment(更新人)TableField(fill FieldFill.UPDATE)private String updateBy;/*** 更新时间*/JsonFormat(pattern yyyy-MM-dd HH:mm:ss)Schema(description 更新时间)Column(type MySqlTypeConstant.DATETIME, defaultValue NULL ON UPDATE CURRENT_TIMESTAMP)ColumnComment(更新时间)private Date updateTime;/*** 版本*/Column(type MySqlTypeConstant.VARCHAR, length 10, defaultValue v1)TableField(fill FieldFill.INSERT)ColumnComment(版本)private String version;
}
简单字典接口
public interface IDict {MapString,String dictMap();
}
工厂策略模式 接口设计
AiJobProcessor
package com.cc672cc.processor;import com.cc672cc.entity.tb.AiJob;
import com.cc672cc.entity.tb.PhotoReviewAnalysis;
import com.cc672cc.pojo.vo.reqvo.BeautifyPhotoReqVO;public interface AiJobProcessor {/*** 创建请参Json** param reqVO* param photoReviewAnalysis* return*/String buildReqJson(BeautifyPhotoReqVO reqVO, PhotoReviewAnalysis photoReviewAnalysis);/*** 处理任务** param aiJob AI任务* return 任务状态*/Integer process(AiJob aiJob);/*** 查询AI任务状态** param aiJob AI任务* return 任务状态*/Integer query(AiJob aiJob);/*** 迁移AI任务** param aiJobId* return*/Integer migrate(Long aiJobId, Integer jobStatus);/*** 获取当前处理器支持的模型类型与AiJob.model字段对应** return 模型类型*/String getSupportedModel();/*** 业务超时时间秒** return*/Long businessTimeoutS();/*** 单元进度展示标志位* 0不展示 1展示* return*/Integer[] unitProgressShowFlag();
}
AiJobProcessorFactory
package com.cc672cc.processor;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** AI任务处理器工厂根据model动态获取处理器*/
Component
public class AiJobProcessorFactory {private final MapString, AiJobProcessor processorMap new HashMap();// 自动注入所有AiJobProcessor实现类Autowiredpublic AiJobProcessorFactory(ListAiJobProcessor processors) {for (AiJobProcessor processor : processors) {processorMap.put(processor.getSupportedModel(), processor);}}/*** 根据模型类型获取处理器* param model 模型类型AiJob.model* return 处理器实例* throws IllegalArgumentException 无对应处理器时抛出异常*/public AiJobProcessor getProcessor(String model) {AiJobProcessor processor processorMap.get(model);if (processor null) {throw new IllegalArgumentException(未找到模型[ model ]对应的处理器);}return processor;}
}观察者模式
AI任务相关的Event
public class AiJobMigrateEvent extends ApplicationEvent {private AiJob aiJob;public AiJobMigrateEvent(Object source) {super(source);}public AiJobMigrateEvent(Object source, AiJob aiJob) {super(source);this.aiJob aiJob;}public AiJob getAiJob() {return aiJob;}public void setAiJob(AiJob aiJob) {this.aiJob aiJob;}
}public class AiJobMsgEvent extends ApplicationEvent {private String msg;public AiJobMsgEvent(Object source) {super(source);}public AiJobMsgEvent(Object source, String msg) {super(source);this.msg msg;}public String getMsg() {return msg;}public void setMsg(String msg) {this.msg msg;}
}/*** AI任务需要立马提交事件*/
public class AiJobNeedSubmitRightNowEvent extends ApplicationEvent {private AiJob aiJob;public AiJobNeedSubmitRightNowEvent(Object source) {super(source);}public AiJobNeedSubmitRightNowEvent(Object source, AiJob aiJob) {super(source);this.aiJob aiJob;}public AiJob getAiJob() {return aiJob;}public void setAiJob(AiJob aiJob) {this.aiJob aiJob;}
}public class AiJobStatusRefreshEvent extends ApplicationEvent {private AiJob aiJob;public AiJobStatusRefreshEvent(Object source) {super(source);}public AiJobStatusRefreshEvent(Object source, AiJob aiJob) {super(source);this.aiJob aiJob;}public AiJob getAiJob() {return aiJob;}public void setAiJob(AiJob aiJob) {this.aiJob aiJob;}
}
MyEventListener
Component
Slf4j
public class MyEventListener {private static final String TIME_FORMAT yyyy-MM-dd HH:mm:ss;Autowiredprivate BusinessAsync businessAsync;/*** 统一监听** param applicationEvent*/EventListener(classes {LifyMsgEvent.class})public void listener(ApplicationEvent applicationEvent) {String simpleName applicationEvent.getClass().getSimpleName();log.info(***** listener reception time : {} , simpleName : {} ***** , context : {}, DateUtil.format(DateUtil.date(), TIME_FORMAT), simpleName, JSON.toJSONString(applicationEvent));}/*** param lifyMsgEvent*/EventListenerpublic void listener(LifyMsgEvent lifyMsgEvent) {businessAsync.saveChatContext(lifyMsgEvent);businessAsync.saveChatHistory(lifyMsgEvent);}/*** param photoReviewMsgEvent*/EventListenerpublic void listener(PhotoReviewMsgEvent photoReviewMsgEvent) {businessAsync.websocketMsg(photoReviewMsgEvent);}/*** param aiJobMsgEvent*/EventListenerpublic void listener(AiJobMsgEvent aiJobMsgEvent) {businessAsync.websocketMsg(aiJobMsgEvent);}/*** param taskMsgEvent*/EventListenerpublic void listener(TaskMsgEvent taskMsgEvent) {businessAsync.websocketMsg(taskMsgEvent);}/*** param aiJobNeedSubmitRightNowEvent*/EventListenerpublic void listener(AiJobNeedSubmitRightNowEvent aiJobNeedSubmitRightNowEvent) {businessAsync.submitAiJob(aiJobNeedSubmitRightNowEvent);}EventListenerpublic void listener(TaskNeedSubmitRightNowEvent taskNeedSubmitRightNowEvent) {businessAsync.submitTask(taskNeedSubmitRightNowEvent);}/*** param aiJobStatusRefreshEvent*/EventListenerpublic void listener(AiJobStatusRefreshEvent aiJobStatusRefreshEvent) {businessAsync.refreshAiJobStatusDetail(aiJobStatusRefreshEvent);}/*** param taskStatusRefreshEvent*/EventListenerpublic void listener(TaskStatusRefreshEvent taskStatusRefreshEvent) {businessAsync.refreshTaskStatusDetail(taskStatusRefreshEvent);}/*** param aiJobMigrateEvent*/EventListenerpublic void listener(AiJobMigrateEvent aiJobMigrateEvent) {businessAsync.migrateAiJobDetail(aiJobMigrateEvent);}/*** param taskMigrateEvent*/EventListenerpublic void listener(TaskMigrateEvent taskMigrateEvent) {businessAsync.migrateTaskDetail(taskMigrateEvent);}}MyEventPubLisher
/*** 我的事件发布器** author liaoqian* since 2024-01-24*/
Component
public class MyEventPubLisher {Autowiredprivate IRedisService redisService;Autowiredprivate ApplicationEventPublisher applicationEventPublisher;public void pushLifyMsgEvent(String msg) {applicationEventPublisher.publishEvent(new LifyMsgEvent(this, msg));}public void pushPhotoReviewMsgEvent(String msg) {applicationEventPublisher.publishEvent(new PhotoReviewMsgEvent(this, msg));}public void pushAiJobMsgEvent(String msg) {applicationEventPublisher.publishEvent(new AiJobMsgEvent(this, msg));}public void pushTaskMsgEvent(String msg) {applicationEventPublisher.publishEvent(new TaskMsgEvent(this, msg));}public void pushAiJobNeedSubmitRightNow(AiJob aiJob) {applicationEventPublisher.publishEvent(new AiJobNeedSubmitRightNowEvent(this, aiJob));}public void pushTaskNeedSubmitRightNow(Task task) {applicationEventPublisher.publishEvent(new TaskNeedSubmitRightNowEvent(this, task));}public void pushAiJobStatusRefreshEvent(AiJob aiJob) {String cacheAiJobTaskProcess RedisPreKey.CACHE_AI_JOB_PROCESS;String redisPreKey cacheAiJobTaskProcess aiJob.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new AiJobStatusRefreshEvent(this, aiJob));}}public void pushTaskStatusRefreshEvent(Task task) {String cacheTaskTaskProcess RedisPreKey.CACHE_TASK_PROCESS;String redisPreKey cacheTaskTaskProcess task.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new TaskStatusRefreshEvent(this, task));}}public void pushAiJobMigrateEvent(AiJob aiJob) {String cacheAiJobTaskProcess RedisPreKey.CACHE_AI_JOB_PROCESS;String redisPreKey cacheAiJobTaskProcess aiJob.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new AiJobMigrateEvent(this, aiJob));}}public void pushTaskMigrateEvent(Task task) {String cacheTaskTaskProcess RedisPreKey.CACHE_TASK_PROCESS;String redisPreKey cacheTaskTaskProcess task.getId();if (!redisService.hasKey(redisPreKey)) {applicationEventPublisher.publishEvent(new TaskMigrateEvent(this, task));}}
}RedissonConfig
/*** RedissonClient* 常用:分布式锁功能* author cc* date 2020/05/13*/
Configuration
public class RedissonConfig {Beanpublic RedissonClient redissonClient(RedisProperties prop) {String address redis://%s:%d;Config config new Config();config.useSingleServer().setPassword(prop.getPassword()).setAddress(String.format(address, prop.getHost(), prop.getPort())).setDatabase(0);return Redisson.create(config);}
}定时任务
package com.cc672cc.scheduler;import com.cc672cc.service.IAiJobService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import java.text.SimpleDateFormat;
import java.util.Date;/*** AI任务调度器*/
Slf4j
Component
public class AiJobScheduler {private SimpleDateFormat sdf new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);Autowiredprivate IAiJobService aiJobService;/*** 每5秒* 刷新AI任务状态*/Scheduled(initialDelay 10000, fixedDelay 5000)public void refreshAiJobStatus() {log.info(refreshAiJobStatus start--{}, sdf.format(new Date()));int refreshCount aiJobService.refreshAiJobStatus();log.info(refreshAiJobStatus end refreshCount--{}, refreshCount);}/*** 每5秒* 迁移AI任务*/Scheduled(initialDelay 10000, fixedDelay 5000)public void migrateAiJob() {log.info(migrateAiJob start--{}, sdf.format(new Date()));int migrateCount aiJobService.migrateAiJob();log.info(migrateAiJob end migrateCount--{}, migrateCount);}
}
实现
就拿其中的一个来举例具体实现还得看自己的业务拿其中的ReplicateJobProcessor举例
ReplicateJobProcessor
package com.cc672cc.processor.aijob;import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.cc672cc.common.utils.CodeUtil;
import com.cc672cc.common.utils.DateUtil;
import com.cc672cc.common.utils.MessageUtils;
import com.cc672cc.entity.tb.AiJob;
import com.cc672cc.entity.tb.PhotoBeautify;
import com.cc672cc.entity.tb.PhotoReviewAnalysis;
import com.cc672cc.enums.dict.AiJobStatusEnum;
import com.cc672cc.enums.dict.DelEnum;
import com.cc672cc.enums.dict.StatusEnum;
import com.cc672cc.pojo.vo.reqvo.BeautifyPhotoReqVO;
import com.cc672cc.pojo.vo.reqvo.ReplicateImageUpscaleBeautifyPhotoReqVO;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateCommonReqVO;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateImageUpscaleReqVO;
import com.cc672cc.pojo.vo.respvo.client.ReplicateCommonRespVO;
import com.cc672cc.processor.AiJobProcessor;
import com.cc672cc.service.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;Slf4j
Component
public class ReplicateJobProcessor implements AiJobProcessor {Autowiredprivate IReplicateService replicateService;AutowiredLazyprivate IAiJobService aiJobService;Autowiredprivate IQiNiuService qiNiuService;LazyAutowiredprivate IPhotoBeautifyService photoBeautifyService;private ListString enhanceModelList Arrays.asList(Standard V2, Low Resolution V2, CGI, High Fidelity V2, Text Refine);private ListInteger upscaleFactorList Arrays.asList(2, 4, 6);Overridepublic Integer[] unitProgressShowFlag() {return new Integer[]{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};}Overridepublic Long businessTimeoutS() {return 60 * 60 * 24L;}Overridepublic String buildReqJson(BeautifyPhotoReqVO reqVO, PhotoReviewAnalysis photoReviewAnalysis) {ReplicateImageUpscaleBeautifyPhotoReqVO detailReqVo JSON.parseObject(JSON.toJSONString(reqVO.getParams()), ReplicateImageUpscaleBeautifyPhotoReqVO.class);if (StringUtils.isBlank(detailReqVo.getEnhanceModel())) {throw new RuntimeException(增强模型不能为空);}if (!enhanceModelList.contains(detailReqVo.getEnhanceModel())) {throw new RuntimeException(增强模型错误);}if (detailReqVo.getUpscaleFactor() null) {throw new RuntimeException(放大倍数不能为空);}if (!upscaleFactorList.contains(detailReqVo.getUpscaleFactor())) {throw new RuntimeException(放大倍数错误);}ReplicateCommonReqVOReplicateImageUpscaleReqVO req new ReplicateCommonReqVO();ReplicateImageUpscaleReqVO input new ReplicateImageUpscaleReqVO();input.setImage(photoReviewAnalysis.getOriImgUrl());input.setEnhanceModel(detailReqVo.getEnhanceModel());input.setUpscaleFactor(detailReqVo.getUpscaleFactor() x);req.setInput(input);return JSON.toJSONString(req);}Overridepublic Integer process(AiJob aiJob) {String reqJson aiJob.getReqJson();if (StringUtils.isBlank(reqJson)) {aiJob.setJobStatusDesc(reqJson is blank);return AiJobStatusEnum.FAILED.getCode();}Type type new TypeReferenceReplicateCommonReqVOReplicateImageUpscaleReqVO() {}.getType();ReplicateCommonReqVOReplicateImageUpscaleReqVO reqVO JSON.parseObject(reqJson, type);ReplicateCommonRespVOString respVO null;String message ;try {respVO replicateService.imageUpscale(reqVO);message respVO ! null ? respVO.getError() : ;if (respVO ! null StringUtils.isNotBlank(respVO.getId())) {String jobId respVO.getId();aiJob.setOutId(jobId);return AiJobStatusEnum.SUBMITTED.getCode();}} catch (Exception e) {message MessageUtils.normalMaxLength(e.getMessage());}aiJob.setJobStatusDesc(message);return AiJobStatusEnum.FAILED.getCode();}Overridepublic Integer query(AiJob aiJob) {String jobId aiJob.getOutId();ReplicateCommonRespVOString ajax replicateService.query(jobId);Integer jobStatus aiJob.getJobStatus();String jobStatusDesc aiJob.getJobStatusDesc();String respJson aiJob.getRespJson();Date respTime aiJob.getRespTime() null ? new Date() : aiJob.getRespTime();Long costTime aiJob.getCostTime();Date reqTime aiJob.getReqTime();if (ajax ! null StringUtils.isNotBlank(ajax.getStatus())) {String message ajax.getError();String status ajax.getStatus();String data ajax.getOutput();respTime new Date();respJson JSON.toJSONString(ajax);if (processing.equals(status) AiJobStatusEnum.SUBMITTED.getCode().equals(jobStatus)) {jobStatus AiJobStatusEnum.QUEUED.getCode();} else if (processing.equals(status) AiJobStatusEnum.QUEUED.getCode().equals(jobStatus)) {jobStatus AiJobStatusEnum.PROCESSING.getCode();} else if (succeeded.equals(status) StringUtils.isNotBlank(data)) {jobStatus AiJobStatusEnum.GENERATED.getCode();}}costTime (respTime.getTime() - reqTime.getTime()) / 1000;if (costTime businessTimeoutS()) {jobStatus AiJobStatusEnum.TIMEOUT.getCode();jobStatusDesc 任务业务超时;}aiJob.setCostTime(costTime);aiJob.setRespTime(respTime);aiJob.setRespJson(respJson);aiJob.setJobStatus(jobStatus);aiJob.setJobStatusDesc(jobStatusDesc);aiJob.setUpdateTime(new Date());return jobStatus;}Overridepublic Integer migrate(Long aiJobId, Integer jobStatus) {Date now new Date();try {// 如果是已生成,则直接迁移if (AiJobStatusEnum.GENERATED.getCode().equals(jobStatus)) {jobStatus AiJobStatusEnum.MIGRATING.getCode();return jobStatus;}// 1. 查询AiJob信息AiJob aiJob aiJobService.selectOneById(aiJobId);if (aiJob null) {log.error(迁移任务失败未找到AiJob记录aiJobId{}, aiJobId);return jobStatus;}// 2. 解析响应JSONString respJson aiJob.getRespJson();if (StringUtils.isBlank(respJson)) {log.error(迁移任务失败AiJob[aiJobId{}]的respJson为空, aiJobId);return jobStatus;}Type type new TypeReferenceReplicateCommonRespVOString() {}.getType();ReplicateCommonRespVOString respVO;try {respVO JSON.parseObject(respJson, type);} catch (Exception e) {log.error(迁移任务失败AiJob[aiJobId{}]的respJson解析失败json{}, aiJobId, respJson, e);return jobStatus;}String data respVO.getOutput();if (data null) {log.error(迁移任务失败AiJob[aiJobId{}]的响应数据data为空, aiJobId);return jobStatus;}String mediaData data;ListString images new ArrayList();images.add(mediaData);// 3. 下载图片并创建PhotoBeautify列表ListPhotoBeautify addList new ArrayList();for (int i 0; i images.size(); i) {String originalImgUrl images.get(i);long start System.currentTimeMillis();try {// 下载网络图片到七牛云假设downloadWebFile已处理异常String imageUrl qiNiuService.downloadWebFile(originalImgUrl);PhotoBeautify photoBeautify new PhotoBeautify();photoBeautify.setAiJobId(aiJobId);photoBeautify.setStatus(StatusEnum.EFFECTIVE.getCode());photoBeautify.setDel(DelEnum.NOT_DELETED.getCode());photoBeautify.setCode(CodeUtil.getRandomCode(10));photoBeautify.setImgUrl(imageUrl);photoBeautify.setOriginalImgUrl(originalImgUrl);photoBeautify.setImgStatus(4);photoBeautify.setCreateDate(DateUtil.format(now, yyyy-MM-dd));photoBeautify.setCreateTime(now);photoBeautify.setPhotoReviewAnalysisCode(aiJob.getPhotoReviewAnalysisCode());photoBeautify.setModelName(aiJob.getModelName());long end System.currentTimeMillis();// 计算耗时 单位秒photoBeautify.setCostTime((end - start) / 1000);photoBeautify.setSort(i);addList.add(photoBeautify);} catch (IOException e) {log.error(迁移任务失败下载图片[url{}]失败aiJobId{}, originalImgUrl, aiJobId, e);// 可选择继续处理后续图片或直接返回失败根据业务需求return AiJobStatusEnum.FAILED.getCode();}}// 4. 批量插入数据库boolean insertResult photoBeautifyService.batchAdd(addList);if (!insertResult) {log.error(迁移任务失败批量插入PhotoBeautify失败aiJobId{}, aiJobId);return AiJobStatusEnum.FAILED.getCode();}log.info(迁移任务成功aiJobId{}共迁移图片{}张, aiJobId, images.size());return AiJobStatusEnum.SUCCESS.getCode();} catch (Exception e) {log.error(迁移任务发生未知异常aiJobId{}, aiJobId, e);return jobStatus;}}Overridepublic String getSupportedModel() {return ImageUpscale;}
}
ReplicateApi
package com.cc672cc.client;import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson2.TypeReference;
import com.cc672cc.common.utils.OkHttpClientUtil;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateCommonReqVO;
import com.cc672cc.pojo.vo.reqvo.client.ReplicateImageUpscaleReqVO;import com.cc672cc.pojo.vo.respvo.client.ReplicateCommonRespVO;
import com.cc672cc.properties.ReplicateProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.lang.reflect.Type;import java.util.Map;Component
public class ReplicateApi {Autowiredprivate ReplicateProperties replicateProperties;public ReplicateCommonRespVOString imageUpscale(ReplicateCommonReqVOReplicateImageUpscaleReqVO req) {String uri /v1/models/topazlabs/image-upscale/predictions;MapString, String headers Map.of(Authorization, String.format(Bearer %s, replicateProperties.getAppKey()));MapString, Object body JSON.parseObject(JSON.toJSONString(req), Map.class);// 使用 TypeReference 传递完整泛型类型Type type new TypeReferenceReplicateCommonRespVOString() {}.getType();return OkHttpClientUtil.ajax(replicateProperties.getBaseUrl(), uri, POST, headers,OkHttpClientUtil.EMPTY_MAP, body, type);}public ReplicateCommonRespVOString query(String jobId) {String uri /v1/predictions/ jobId;MapString, String headers Map.of(Authorization, String.format(Bearer %s, replicateProperties.getAppKey()));Type type new TypeReferenceReplicateCommonRespVOString() {}.getType();return OkHttpClientUtil.ajax(replicateProperties.getBaseUrl(), uri, GET, headers, OkHttpClientUtil.EMPTY_MAP, OkHttpClientUtil.EMPTY_MAP, type);}}
OkHttpClientUtil 通用万能版
package com.cc672cc.common.utils;import cn.hutool.http.ContentType;
import cn.hutool.json.JSONObject;
import com.alibaba.fastjson.JSON;
import com.cc672cc.common.model.ReturnInfo;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.lang3.StringUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
import java.util.HashMap;import java.util.Map;
import java.util.concurrent.TimeUnit;/*** 简单封装的okhttpcient工具用于返回同一反参** author liaoqian* date 2024-01-17*/
Slf4j
SuppressWarnings(all)
public class OkHttpClientUtil {private static final String okHttpClientName okHttpClientUtil;public static final String METHOD_POST POST;public static final String METHOD_GET GET;public static final MapString, Object EMPTY_MAP new HashMap();/*** 连接超时时间**/private static final int CONNECT_TIMEOUT_SECONDS 60;/*** 读取返回信息超时时间**/private static final int READ_TIMEOUT_SECONDS 60;/*** 读取返回信息超时时间*/private static final int CALL_TIMEOUT_SECONDS 120;/*** 读取返回信息超时时间**/private static final int WRITE_TIMEOUT_SECONDS 300;private static OkHttpClient okHttpClient;static {if (okHttpClient null) {synchronized (OkHttpClientUtil.class) {if (okHttpClient null) {okHttpClient new OkHttpClient.Builder().connectTimeout(CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS).readTimeout(READ_TIMEOUT_SECONDS, TimeUnit.SECONDS).callTimeout(CALL_TIMEOUT_SECONDS, TimeUnit.SECONDS).writeTimeout(WRITE_TIMEOUT_SECONDS, TimeUnit.SECONDS).retryOnConnectionFailure(true).build();}}}}/*** param host 请求host* param uri 请求uri* param method 请求方式* param headers 请求头* param paramsObj 请求参数* param bodyObj 请求体* return 返回JSONObject*/public static JSONObject ajax(String host, String uri, String method, MapString, String headers, Object paramsObj, Object bodyObj) {Map params JSON.parseObject(JSON.toJSONString(paramsObj), Map.class);Map body JSON.parseObject(JSON.toJSONString(bodyObj), Map.class);return ajax(host, uri, method, headers, params, body);}/*** param host 请求host* param uri 请求uri* param method 请求方式* param headers 请求头* param params 请求参数* param body 请求体* return 返回JSONObject*/public static JSONObject ajax(String host, String uri, String method, MapString, String headers, MapString, Object params, MapString, Object body) {Response response ajaxProcess(host, uri, method, headers, params, body);JSONObject jsonObject null;if (response ! null) {try (ResponseBody responseBody response.body()) {String respContentType response.header(Content-Type);if (StringUtils.isNotBlank(respContentType)) {if (text/event-stream.equals(respContentType)) {StringBuilder sb new StringBuilder();// 将这个response的内容转为字符串BufferedReader reader new BufferedReader(new InputStreamReader(responseBody.byteStream()));String line;while ((line reader.readLine()) ! null) {if (line.startsWith(event: text)) {sb.append(extractTextData(reader));}}ReturnInfo returnInfo new ReturnInfo(sb.toString());jsonObject new JSONObject(returnInfo);return jsonObject;}}String result responseBody.string();log.info(***** {} ajax result : {} *****, okHttpClientName, result);if (JSON.isValid(result)) {jsonObject new JSONObject(result);} else {ReturnInfo returnInfo new ReturnInfo(result);jsonObject new JSONObject(returnInfo);}} catch (Exception e) {log.error(***** {} ajaxProcess e : {} *****, okHttpClientName, e);}}return jsonObject;}private static String extractTextData(BufferedReader reader) throws IOException {StringBuilder sb new StringBuilder();String line;while ((line reader.readLine()) ! null !line.isEmpty()) {if (line.startsWith(data: )) {String substring line.substring(data: .length());substring substring.replace(\, );sb.append(substring);}}return sb.toString();}private static Response ajaxProcess(String host, String uri, String method, MapString, String headers, MapString, Object params, MapString, Object body) {OkHttpClient client okHttpClient;String url host uri;Request.Builder builder new Request.Builder();// 请求头处理if (headers ! null !headers.isEmpty()) {builder.headers(Headers.of(headers));}// 请求方式处理if (true) {if (params ! null !params.isEmpty()) {StringBuilder sb new StringBuilder();sb.append(?);params.entrySet().stream().forEach(e - {sb.append(e.getKey()).append().append(String.valueOf(e.getValue())).append();});sb.delete(sb.length() - 1, sb.length());url sb.toString();}builder.get();}if (METHOD_POST.equals(method.toUpperCase())) {if (body ! null) {builder.post(RequestBody.create(MediaType.parse(ContentType.JSON.toString()), JSON.toJSONString(body)));}}Request request builder.url(url).build();Response response null;try {response client.newCall(request).execute();} catch (IOException e) {log.error(***** {} ajaxProcess e : {} *****, okHttpClientName, e);}return response;}public static T T ajax(String host, String uri, String method, MapString, String headers, MapString, Object params, MapString, Object body, Type type) {JSONObject ajax ajax(host, uri, method, headers, params, body);if (ajax ! null) {return JSON.parseObject(ajax.toString(), type);} else {return null;}}}
新建任务参考
public AiJobStatusModel beautifyPhoto(BeautifyPhotoReqVO reqVO) {UserInfo userInfo userService.getCurLoginUser(true);String photoReviewAnalysisCode reqVO.getPhotoReviewAnalysisCode();PhotoReviewAnalysis photoReviewAnalysis photoReviewService.selectOneByCode(photoReviewAnalysisCode);if (photoReviewAnalysis null) {throw new BusinessException(ExceptionEnum.DATA_NOT_FOUND);}if (!userInfo.getId().equals(photoReviewAnalysis.getUserId())) {throw new BusinessException(ExceptionEnum.PERMISSION_DENIED);}Boolean beautifyAbility photoReviewAnalysis.getBeautifyAbility();if (Boolean.FALSE.equals(beautifyAbility)) {throw new RuntimeException(图片暂不支持美化,请查看【开启美化能力依据】);}MapString, AiModelEnum map AiModelEnum.getMap();AiModelEnum aiModelEnum map.get(reqVO.getModelId());if (aiModelEnum null) {throw new RuntimeException(模型不存在);}Long userId userInfo.getId();Date now new Date();String dateFormat DateUtil.format(now, yyyy-MM-dd);UserPointPerDayCheckModel checkModel userBenefitsService.checkPointLimitPerDay(userId, dateFormat);if (checkModel.getLimit()) {throw new RuntimeException(已到达今日次数上限,请明日再来试试吧);}String code CodeUtil.getRandomCode(10);AiJob aiJob new AiJob();aiJob.setStatus(StatusEnum.EFFECTIVE.getCode());aiJob.setDel(DelEnum.NOT_DELETED.getCode());aiJob.setJobStatus(AiJobStatusEnum.DRAFT.getCode());aiJob.setUnitProgress(BigDecimal.ZERO);aiJob.setOverallProgress(BigDecimal.ZERO);aiJob.setUserId(userInfo.getId());aiJob.setPhotoReviewAnalysisCode(photoReviewAnalysis.getCode());aiJob.setAction(reqVO.getAction());aiJob.setType(reqVO.getType());aiJob.setCode(code);aiJob.setModel(aiModelEnum.getModel());aiJob.setModelVersion(aiModelEnum.getModelVersion());aiJob.setModelId(aiModelEnum.getModelId());aiJob.setModelName(aiModelEnum.getModelName());aiJob.setOutputCount(reqVO.getOutputCount());aiJob.setPlatform(aiModelEnum.getPlatform());aiJob.setChannel(aiModelEnum.getChannel());aiJob.setAsyn(aiModelEnum.getAsyn());aiJob.setCreateDate(DateUtil.format(now, yyyy-MM-dd));aiJob.setReqTime(now);String reqJson processorFactory.getProcessor(aiModelEnum.getModel()).buildReqJson(reqVO, photoReviewAnalysis);aiJob.setReqJson(reqJson);aiJob.setCreateTime(now);Long id aiJobService.add(aiJob);if (id 0) {// 给个人账户加1userBenefitsService.addPointCountPerDay(userId, dateFormat, aiModelEnum, reqVO.getOutputCount());AiJobStatusModel res BeanHelper.copyProperties(aiJob, AiJobStatusModel.class);// 这里推送消息myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(res));// 立马去提交任务myEventPubLisher.pushAiJobNeedSubmitRightNow(aiJob);return res;}throw new RuntimeException(生成美化图片创建失败,请稍后再试);}定时任务实现
IAiJobService
package com.cc672cc.service;import com.cc672cc.entity.tb.AiJob;public interface IAiJobService {/*** 添加任务** param aiJob* return 任务ID*/Long add(AiJob aiJob);/*** 提交任务* param submitAiJob 提交的任务* return*/int submitAiJob(AiJob submitAiJob);/*** 刷新AI任务状态** return 刷新的任务数量*/int refreshAiJobStatus();/*** 刷新AI任务状态详情* param aiJob AI任务* return*/int refreshAiJobStatusDetail(AiJob aiJob);/*** 迁移AI任务* return*/int migrateAiJob();/*** 迁移AI任务详情* param aiJob AI任务* return*/int migrateAiJobDetail(AiJob aiJob);/**** param aiJobId* return*/AiJob selectOneById(Long aiJobId);}
AiJobServiceImpl
package com.cc672cc.service.impl;import com.alibaba.fastjson2.JSON;
import com.cc672cc.common.constants.RedisPreKey;
import com.cc672cc.common.model.AiJobStatusModel;
import com.cc672cc.common.utils.BeanHelper;
import com.cc672cc.dp.listenermode.publisher.MyEventPubLisher;
import com.cc672cc.entity.tb.AiJob;
import com.cc672cc.entity.tb.PhotoReviewAnalysis;
import com.cc672cc.enums.dict.AiJobStatusEnum;
import com.cc672cc.enums.dict.AiModelEnum;
import com.cc672cc.enums.dict.DelEnum;
import com.cc672cc.enums.dict.StatusEnum;
import com.cc672cc.mapper.AiJobMapper;
import com.cc672cc.processor.AiJobProcessorFactory;
import com.cc672cc.service.IAiJobService;
import com.cc672cc.service.IPhotoReviewService;
import com.cc672cc.service.IUserBenefitsService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import tk.mybatis.mapper.entity.Example;import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;Slf4j
Service
public class AiJobServiceImpl implements IAiJobService {Autowiredprivate AiJobMapper aiJobMapper;Autowiredprivate RedissonClient redissonClient;LazyAutowiredprivate AiJobProcessorFactory processorFactory;LazyAutowiredprivate IUserBenefitsService userBenefitsService;LazyAutowiredprivate IPhotoReviewService photoReviewService;LazyAutowiredprivate MyEventPubLisher myEventPubLisher;Overridepublic Long add(AiJob aiJob) {int insert aiJobMapper.insert(aiJob);return insert 0 ? aiJob.getId() : null;}Overridepublic int submitAiJob(AiJob submitAiJob) {int res 0;ListAiJob aiJobs new ArrayList();if (submitAiJob ! null) {aiJobs.add(submitAiJob);} else {Example example new Example(AiJob.class);Example.Criteria criteria example.createCriteria();criteria.andEqualTo(status, StatusEnum.EFFECTIVE.getCode());criteria.andEqualTo(del, DelEnum.NOT_DELETED.getCode());criteria.andIn(jobStatus, List.of(AiJobStatusEnum.DRAFT.getCode()));example.orderBy(reqTime).desc();aiJobs aiJobMapper.selectByExample(example);}String redisPreKey RedisPreKey.CACHE_AI_JOB_PROCESS;if (aiJobs ! null !aiJobs.isEmpty()) {for (AiJob aiJob : aiJobs) {String lock redisPreKey aiJob.getId();RLock rLock redissonClient.getLock(lock);try {boolean tryLock rLock.tryLock(5, 60, TimeUnit.SECONDS);if (!tryLock) {continue;}AiJob newAiJob aiJobMapper.selectByPrimaryKey(aiJob.getId());String model newAiJob.getModel();Integer aiJobStatus processorFactory.getProcessor(model).process(newAiJob);Integer[] unitProgressShowFlag processorFactory.getProcessor(model).unitProgressShowFlag();if (List.of(AiJobStatusEnum.SUBMITTED.getCode(),AiJobStatusEnum.QUEUED.getCode(),AiJobStatusEnum.PROCESSING.getCode(),AiJobStatusEnum.GENERATED.getCode(),AiJobStatusEnum.SUCCESS.getCode(),AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {newAiJob.setJobStatus(aiJobStatus);newAiJob.setOverallProgress(AiJobStatusEnum.getProgress(aiJobStatus));Integer showFlag unitProgressShowFlag[aiJobStatus];if (0 showFlag) {newAiJob.setUnitProgress(null);}int update aiJobMapper.updateByPrimaryKey(newAiJob);if (update 1) {AiJobStatusModel aiJobStatusModel BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));res;}}} catch (Exception e) {log.error(提交AI任务失败, e);} finally {if (rLock ! null rLock.isHeldByCurrentThread()) {rLock.unlock();}}}}return res;}Overridepublic int refreshAiJobStatus() {int res 0;Example example new Example(AiJob.class);Example.Criteria criteria example.createCriteria();criteria.andEqualTo(status, StatusEnum.EFFECTIVE.getCode());criteria.andEqualTo(del, DelEnum.NOT_DELETED.getCode());criteria.andIn(jobStatus, List.of(AiJobStatusEnum.SUBMITTED.getCode(),AiJobStatusEnum.QUEUED.getCode(),AiJobStatusEnum.PROCESSING.getCode()));example.orderBy(reqTime).desc();ListAiJob aiJobs aiJobMapper.selectByExample(example);if (aiJobs ! null !aiJobs.isEmpty()) {for (AiJob aiJob : aiJobs) {myEventPubLisher.pushAiJobStatusRefreshEvent(aiJob);res;}}return res;}Overridepublic int refreshAiJobStatusDetail(AiJob aiJob) {String redisPreKey RedisPreKey.CACHE_AI_JOB_PROCESS;if (aiJob ! null aiJob.getId() ! null) {myEventPubLisher.pushAiJobStatusRefreshEvent(aiJob);String lock redisPreKey aiJob.getId();RLock rLock redissonClient.getLock(lock);try {boolean tryLock rLock.tryLock(5, 30, TimeUnit.SECONDS);if (!tryLock) {return 0;}AiJob newAiJob aiJobMapper.selectByPrimaryKey(aiJob.getId());String model newAiJob.getModel();Integer aiJobStatus processorFactory.getProcessor(model).query(newAiJob);Integer[] unitProgressShowFlag processorFactory.getProcessor(model).unitProgressShowFlag();if (List.of(AiJobStatusEnum.QUEUED.getCode(),AiJobStatusEnum.PROCESSING.getCode(),AiJobStatusEnum.GENERATED.getCode(),AiJobStatusEnum.SUCCESS.getCode(),AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {newAiJob.setJobStatus(aiJobStatus);newAiJob.setOverallProgress(AiJobStatusEnum.getProgress(aiJobStatus));Integer showFlag unitProgressShowFlag[aiJobStatus];if (0 showFlag) {newAiJob.setUnitProgress(null);}int update aiJobMapper.updateByPrimaryKey(newAiJob);if (update 1) {AiJobStatusModel aiJobStatusModel BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));return 1;}}} catch (Exception e) {log.error(刷新AI任务失败, e);} finally {if (rLock ! null rLock.isHeldByCurrentThread()) {rLock.unlock();}}}return 0;}Overridepublic int migrateAiJob() {int res 0;Example example new Example(AiJob.class);Example.Criteria criteria example.createCriteria();criteria.andEqualTo(status, StatusEnum.EFFECTIVE.getCode());criteria.andEqualTo(del, DelEnum.NOT_DELETED.getCode());criteria.andIn(jobStatus, List.of(AiJobStatusEnum.GENERATED.getCode(),AiJobStatusEnum.MIGRATING.getCode()));example.orderBy(reqTime).desc();ListAiJob aiJobs aiJobMapper.selectByExample(example);if (aiJobs ! null !aiJobs.isEmpty()) {for (AiJob aiJob : aiJobs) {myEventPubLisher.pushAiJobMigrateEvent(aiJob);res;}}return res;}Overridepublic int migrateAiJobDetail(AiJob aiJob) {String redisPreKey RedisPreKey.CACHE_AI_JOB_PROCESS;if (aiJob ! null aiJob.getId() ! null) {String lock redisPreKey aiJob.getId();RLock rLock redissonClient.getLock(lock);try {boolean tryLock rLock.tryLock(5, 300, TimeUnit.SECONDS);if (!tryLock) {return 0;}AiJob newAiJob aiJobMapper.selectByPrimaryKey(aiJob.getId());String model newAiJob.getModel();Integer jobStatus newAiJob.getJobStatus();if (AiJobStatusEnum.SUCCESS.getCode().equals(jobStatus)) {AiJobStatusModel aiJobStatusModel BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));return 1;}Integer aiJobStatus processorFactory.getProcessor(model).migrate(newAiJob.getId(), jobStatus);Integer[] unitProgressShowFlag processorFactory.getProcessor(model).unitProgressShowFlag();if (List.of(AiJobStatusEnum.MIGRATING.getCode(),AiJobStatusEnum.SUCCESS.getCode(),AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {newAiJob.setJobStatus(aiJobStatus);newAiJob.setOverallProgress(AiJobStatusEnum.getProgress(aiJobStatus));Integer showFlag unitProgressShowFlag[aiJobStatus];if (0 showFlag) {newAiJob.setUnitProgress(null);}int update aiJobMapper.updateByPrimaryKey(newAiJob);if (update 1) {AiJobStatusModel aiJobStatusModel BeanHelper.copyProperties(newAiJob, AiJobStatusModel.class);myEventPubLisher.pushAiJobMsgEvent(JSON.toJSONString(aiJobStatusModel));return 1;}}if (List.of(AiJobStatusEnum.FAILED.getCode(),AiJobStatusEnum.CANCELED.getCode(),AiJobStatusEnum.TIMEOUT.getCode()).contains(aiJobStatus)) {Long userId newAiJob.getUserId();newAiJob.setJobStatusDesc(迁移照片过程失败,已返还每日积分);String modelId newAiJob.getModelId();AiModelEnum aiModelEnum AiModelEnum.getMap().get(modelId);Integer outputCount newAiJob.getOutputCount();userBenefitsService.subtractPointCountPerDay(userId, newAiJob.getCreateDate(), aiModelEnum, outputCount);String photoReviewAnalysisCode newAiJob.getPhotoReviewAnalysisCode();PhotoReviewAnalysis photoReviewAnalysis photoReviewService.selectOneByCode(photoReviewAnalysisCode);photoReviewAnalysis.setBeautifyImage(Boolean.FALSE);photoReviewAnalysis.setUpdateTime(new Date());photoReviewService.updateById(photoReviewAnalysis);}} catch (Exception e) {log.error(迁移AI任务失败, e);} finally {if (rLock ! null rLock.isHeldByCurrentThread()) {rLock.unlock();}}}return 0;}Overridepublic AiJob selectOneById(Long aiJobId) {return aiJobMapper.selectByPrimaryKey(aiJobId);}
}
整体业务流程
提交AI任务-定时任务刷新状态-定时任务迁移 (配合webscoket实时推送状态)
总结
该方案适用于所有需要对接三方异步 API 的场景如 AI 生成、视频处理、数据分析等通过标准化状态管理与流程控制解决了异步任务的复杂性与不可靠性问题。通用95%以上的场景非常好用