北京网站建设推荐华网天下,网站改版301重定向,如何使用框架来建设网站,WordPress做搜索引擎Seata
分布式事务问题
单机单库没这个问题#xff0c;分布式之前从1: 1 - 1:N -N:N
分布式之后 单体应用被拆分成微服务应用#xff0c;原来的三个模块被拆分成三个独立的应用分别使用三个独立的数据源#xff0c;业务操作需要调用三个服务来完成。 此时每个服务…Seata
分布式事务问题
单机单库没这个问题分布式之前从1: 1 - 1:N -N:N
分布式之后 单体应用被拆分成微服务应用原来的三个模块被拆分成三个独立的应用分别使用三个独立的数据源业务操作需要调用三个服务来完成。 此时每个服务内部的数据一致性由本地事务来保证但是全局的数据一致健问题没法保证。
一句话一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用就会产生分布式事务问题
Seata简介
是什么
Seata是一款开源的分布式事务解决方案致力于在微服务架构下提供高性能和简单易用的分布式事务服务。
官网地址http://seata.io/zh-cn/
术语
一个典型的分布式事务过程
分布式事务处理过程的一ID三组件模型
一ID
Transaction lD XID 全局唯一的事务ID
三组件
事务协调者 Transaction CoordinatorTC 维护全局事务的运行状态负责协调并驱动全局事务的提交或回滚事务管理器 Transaction ManagerTM 控制全局事务的边界负责开启一个全局事务并最终发起全局提交或全局回滚的决议资源管理器 Resource ManagerRM 控制分支事务负责分支注册、状态汇报并接收事务协调器的指令驱动分支(本地事务的提交和回滚
处理过程 TM 向 TC 申请开启一个全局事务全局事务创建成功并生成一个全局唯一的XIDXID 在微服务调用链路的上下文中传播;RM 向 TC 注册分支事务将其纳入 XID 对应全局事务的管辖TM 向 TC 发起针对 XID 的全局提交或回滚决议TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求。
Seata-Server安装与使用
安装可参考这篇文章https://blog.csdn.net/Alaric_L/article/details/123935320
SEATA 的分布式交易解决方案 我们只需要使用一个 GlobalTransactional 注解在业务方法上 GlobalTransactionalpublic void purchase(String userId, String commodityCode, int orderCount) {......}订单/库存/账户业务数据库准备
以下演示都需要先启动Nacos后启动Seata保证两个都OK
分布式事务业务说明
这里我们会创建三个服务一个订单服务一个库存服务一个账户服务。
当用户下单时会在订单服务中创建一个订单 然后通过远程调用库存服务来扣减下单商品的库存 再通过远程调用账户服务来扣减用户账户里面的余额 最后在订单服务中修改订单状态为已完成。 该操作跨越三个数据库有两次远程调用很明显会有分布式事务问题。
创建业务数据库
#存储订单的数据库
CREATE DATABASE seata_order;
#存储库存的数据库
CREATE DATABASE seata_storage;
#存储账户信息的数据库
CREATE DATABASE seata_account;按照上述3库分别建对应业务表
#seata_order库下建t_order表CREATE TABLE t_order (
id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
user_id BIGINT(11) DEFAULT NULL COMMENT 用户id,
product_id BIGINT(11) DEFAULT NULL COMMENT 产品id,
countINT(11) DEFAULT NULL COMMENT 数量,
money DECIMAL(11,0) DEFAULT NULL COMMENT 金额,
statusINT(1)DEFAULT NULL COMMENT 订单状态:0:创建中;1:已完结
)ENGINEINNODB AUTO_INCREMENT7 DEFAULT CHARSETutf8;SELECT * FROM t_order;#seata_storage库下建t_storage表CREATE TABLE t_storage (
id BIGINT(11)NOT NULL AUTO_INCREMENT PRIMARY KEY,
product_id BIGINT(11) DEFAULT NULL COMMENT产品id,
totalINT(11) DEFAULT NULL COMMENT 总库存,
used INT(11) DEFAULT NULL COMMENT已用库存,
residueINT(11) DEFAULT NULL COMMENT剩余库存
)ENGINEINNODB AUTO_INCREMENT2 DEFAULT CHARSETutf8;#创建一条数据
INSERT INTO t_storage(id,product_id,total,used,residue) VALUES (1,1,100,0,100); SELECT * FROM t_storage;#seata_account库下建t_account表CREATE TABLE t_account (
idBIGINT(11)NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT id,
user_id BIGINT(11) DEFAULT NULL COMMENT用户id,
total DECIMAL(10,0) DEFAULT NULL COMMENT 总额度,
used DECIMAL(10,0)DEFAULT NULL COMMENT 已用余额,
residue DECIMAL(10,0) DEFAULT 0 COMMENT 剩余可用额度
)ENGINEINNODB AUTO_INCREMENT2 DEFAULT CHARSETutf8;#插入一条数据
INSERT INTO t_account(id,user_id,total,used,residue) VALUES (1,1,1000,0,1000);SELECT * FROM t_account;按照上述3库分别建对应的回滚日志表
CREATE TABLE undo_log (id bigint(20) NOT NULL AUTO_INCREMENT,branch_id bigint(20) NOT NULL,xid varchar(100) NOT NULL,context varchar(128) NOT NULL,rollback_info longblob NOT NULL,log_status int(11) NOT NULL,log_created datetime NOT NULL,log_modified datetime NOT NULL,ext varchar(100) DEFAULT NULL,PRIMARY KEY (id),UNIQUE KEY ux_undo_log (xid,branch_id)
) ENGINEInnoDB AUTO_INCREMENT1 DEFAULT CHARSETutf8;最终效果 订单/库存/账户业务微服务准备
业务需求
下订单 - 减库存 - 扣余额 - 改(订单)状态
新建订单Order-Module(seata-order-service2001)
POM
dependencies!--nacos--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-nacos-discovery/artifactId/dependency!--seata--dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-alibaba-seata/artifactIdexclusionsexclusionartifactIdseata-all/artifactIdgroupIdio.seata/groupId/exclusion/exclusions/dependencydependencygroupIdio.seata/groupIdartifactIdseata-all/artifactIdversion0.9.0/version/dependency!--feign--dependencygroupIdorg.springframework.cloud/groupIdartifactIdspring-cloud-starter-openfeign/artifactId/dependency!--web-actuator--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-actuator/artifactId/dependency!--mysql-druid--dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.37/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIddruid-spring-boot-starter/artifactIdversion1.1.10/version/dependencydependencygroupIdorg.mybatis.spring.boot/groupIdartifactIdmybatis-spring-boot-starter/artifactIdversion2.0.0/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdscopetest/scope/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependency/dependenciesYML
server:port: 2001spring:application:name: seata-order-servicecloud:alibaba:seata:#自定义事务组名称需要与seata-server中的对应tx-service-group: fsp_tx_groupnacos:discovery:server-addr: localhost:8848datasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/seata_orderusername: rootpassword: 123456feign:hystrix:enabled: falselogging:level:io:seata: infomybatis:mapperLocations: classpath:mapper/*.xml将 seata/conf/ 下的 file.conf 和 registry.cong 两个文件拷贝到 resource 目录下。
domain
Data
AllArgsConstructor
NoArgsConstructor
public class CommonResultT
{private Integer code;private String message;private T data;public CommonResult(Integer code, String message){this(code,message,null);}
}Data
AllArgsConstructor
NoArgsConstructor
public class Order
{private Long id;private Long userId;private Long productId;private Integer count;private BigDecimal money;private Integer status; //订单状态0创建中1已完结
}Dao接口及实现
Mapper
public interface OrderDao
{//1 新建订单void create(Order order);//2 修改订单状态从零改为1void update(Param(userId) Long userId,Param(status) Integer status);
}在resource目录下创建Mapper文件夹及OrderMapper.xml
?xml version1.0 encodingUTF-8 ?
!DOCTYPE mapper PUBLIC -//mybatis.org//DTD Mapper 3.0//EN http://mybatis.org/dtd/mybatis-3-mapper.dtd !--OrderDao的全类名--
mapper namespacecom.atguigu.springcloud.alibaba.dao.OrderDao!--与实体类Order做一个映射--resultMap idBaseResultMap typecom.atguigu.springcloud.alibaba.domain.Orderid columnid propertyid jdbcTypeBIGINT/result columnuser_id propertyuserId jdbcTypeBIGINT/result columnproduct_id propertyproductId jdbcTypeBIGINT/result columncount propertycount jdbcTypeINTEGER/result columnmoney propertymoney jdbcTypeDECIMAL/result columnstatus propertystatus jdbcTypeINTEGER//resultMapinsert idcreateinsert into t_order (id,user_id,product_id,count,money,status)values (null,#{userId},#{productId},#{count},#{money},0);/insertupdate idupdateupdate t_order set status 1where user_id#{userId} and status #{status};/update
/mapperService接口及实现
AccountService
FeignClient(value seata-account-service)
public interface AccountService
{PostMapping(value /account/decrease)CommonResult decrease(RequestParam(userId) Long userId, RequestParam(money) BigDecimal money);
}OrderService
public interface OrderService
{void create(Order order);
}StorageService
FeignClient(value seata-storage-service)
public interface StorageService
{PostMapping(value /storage/decrease)CommonResult decrease(RequestParam(productId) Long productId, RequestParam(count) Integer count);
}OrderServiceImpl
Service
Slf4j
public class OrderServiceImpl implements OrderService
{Resourceprivate OrderDao orderDao;Resourceprivate StorageService storageService;Resourceprivate AccountService accountService;/*** 创建订单-调用库存服务扣减库存-调用账户服务扣减账户余额-修改订单状态* 简单说下订单-扣库存-减余额-改状态*/Overridepublic void create(Order order){log.info(-----开始新建订单);//1 新建订单orderDao.create(order);//2 扣减库存log.info(-----订单微服务开始调用库存做扣减Count);storageService.decrease(order.getProductId(),order.getCount());log.info(-----订单微服务开始调用库存做扣减end);//3 扣减账户log.info(-----订单微服务开始调用账户做扣减Money);accountService.decrease(order.getUserId(),order.getMoney());log.info(-----订单微服务开始调用账户做扣减end);//4 修改订单状态从零到1,1代表已经完成log.info(-----修改订单状态开始);orderDao.update(order.getUserId(),0);log.info(-----修改订单状态结束);log.info(-----下订单结束了O(∩_∩)O哈哈~);}
}Controller
RestController
public class OrderController
{Resourceprivate OrderService orderService;GetMapping(/order/create)public CommonResult create(Order order){orderService.create(order);return new CommonResult(200,订单创建成功);}
}Config配置
Configuration
MapperScan({com.mzr.springcloud.dao})
public class MyBatisConfig {
}//使用Seata对数据源进行代理
Configuration
public class DataSourceProxyConfig {Value(${mybatis.mapperLocations})private String mapperLocations;BeanConfigurationProperties(prefix spring.datasource)public DataSource druidDataSource(){return new DruidDataSource();}Beanpublic DataSourceProxy dataSourceProxy(DataSource dataSource) {return new DataSourceProxy(dataSource);}Beanpublic SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {SqlSessionFactoryBean sqlSessionFactoryBean new SqlSessionFactoryBean();sqlSessionFactoryBean.setDataSource(dataSourceProxy);sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());return sqlSessionFactoryBean.getObject();}}主启动
EnableDiscoveryClient
EnableFeignClients
SpringBootApplication(exclude DataSourceAutoConfiguration.class)//取消数据源的自动创建
public class SeataOrderMainApp2001
{public static void main(String[] args){SpringApplication.run(SeataOrderMainApp2001.class, args);}
}新建库存Storage -Module(seata-storage-service2002)
与Order模块没有什么大的区别唯一区别是没有openFeign模块
新建账户Account-Module(seata-account-service2003)
与Order模块没有什么大的区别唯一区别是没有openFeign模块
Test
访问http://localhost:2001/order/create?userld1productld1count10money100进行测试
此时正常下单可以正常使用
模拟超时异常没加GlobalTransactional
countServiceImpl添加超时
public void decrease(Long userId, BigDecimal money) {LOGGER.info(-------account-service中扣减账户余额开始);//模拟超时异常全局事务回滚//暂停几秒钟线程try { TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); }accountDao.decrease(userId,money);LOGGER.info(-------account-service中扣减账户余额结束);}再次测试
数据库情况
发现订单及库存数据库数据已经发生变化事务并没有回滚
故障情况
当库存和账户金额扣减后订单状态并没有设置为已经完成没有从0改为1
而且由于feign的重试机制账户余额还有可能被多次扣减
超时异常添加GlobalTransactional
此时我们在程序的入口添加上GlobalTransactional
/*** 创建订单-调用库存服务扣减库存-调用账户服务扣减账户余额-修改订单状态* 简单说下订单-扣库存-减余额-改状态*/OverrideGlobalTransactional(name fsp-create-order,rollbackFor Exception.class)public void create(Order order){log.info(-----开始新建订单);//1 新建订单orderDao.create(order);//2 扣减库存log.info(-----订单微服务开始调用库存做扣减Count);storageService.decrease(order.getProductId(),order.getCount());log.info(-----订单微服务开始调用库存做扣减end);//3 扣减账户log.info(-----订单微服务开始调用账户做扣减Money);accountService.decrease(order.getUserId(),order.getMoney());log.info(-----订单微服务开始调用账户做扣减end);//4 修改订单状态从零到1,1代表已经完成log.info(-----修改订单状态开始);orderDao.update(order.getUserId(),0);log.info(-----修改订单状态结束);log.info(-----下订单结束了O(∩_∩)O哈哈~);}再次测试
程序依然报错但是数据库数据没有发生变化证实事务已经回滚
Seata之原理简介
2019年1月份蚂蚁金服和阿里巴巴共同开源的分布式事务解决方案
Simple Extensible Autonomous Transaction Architecture简单可扩展自治事务框架2020起始
结合上述案例理解三大组件
TCseata服务器 TMGlobalTransactional事务的发起方 RM事务的参与方三个数据库
执行流程
TM开启分布式事务(TM向TC注册全局事务记录)
按业务场景编排数据库、服务等事务内资源RM向TC汇报资源准备状态
TM结束分布式事务事务一阶段结束TM通知TC提交/回滚分布式事务)
TC汇总事务信息决定分布式事务是提交还是回滚
TC通知所有RM提交/回滚资源事务二阶段结束。
AT模式如何做到对业务的无侵入
两阶段提交协议的演变
一阶段业务数据和回滚日志记录在同一个本地事务中提交释放本地锁和连接资源。 二阶段
提交异步化非常快速地完成。回滚通过一阶段的回滚日志进行反向补偿。
一阶段加载
在一阶段Seata会拦截“业务SQL”
解析SQL语义找到“业务SQL”要更新的业务数据在业务数据被更新前 将其保存成“before image”,执行“业务SQL”更新业务数据在业务数据更新之后其保存成“after image”最后生成行锁
以上操作全部在一个数据库事务内完成这样保证了一阶段操作的原子性。 二阶段提交
二阶段如是顺利提交的话因为“业务SQL”在一阶段已经提交至数据库所以Seata框架只需将一阶段保存的快照数据和行锁删掉完成数据清理即可 二阶段回滚
二阶段如果是回滚的话Seata就需要回滚一阶段已经执行的“业务SQL”还原业务数据。
回滚方式便是用“before image”还原业务数据但在还原前要首先要校验脏写检验数据是否被动过对比“数据库当前业务数据”和“after image”如果两份数据完全一致就说明没有脏写可以还原业务数据如果不一致就说明有脏写出现脏写就需要转人工处理。