网络营销中网站建设的策略,动漫制作与设计专业,岳阳网站开发培训,小程序开发教程源码潮汐#x1f4dd;个人主页#xff1a;哈__
期待您的关注 一、ShardingSphere简介 ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈#xff0c;它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar#xff08;计划中#xff09;这3款相互独立的产品组成… 个人主页哈__
期待您的关注 一、ShardingSphere简介 ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar计划中这3款相互独立的产品组成。 他们均提供标准化的数据分片、分布式事务和数据库治理功能可适用于如Java同构、异构语言、容器、云原生等各种多样化的应用场景。 ShardingSphere定位为关系型数据库中间件旨在充分合理地在分布式的场景下利用关系型数据库的计算和存储能力而并非实现一个全新的关系型数据库。 它与NoSQL和NewSQL是并存而非互斥的关系。NoSQL和NewSQL作为新技术探索的前沿放眼未来拥抱变化是非常值得推荐的。反之也可以用另一种思路看待问题放眼未来关注不变的东西进而抓住事物本质。 关系型数据库当今依然占有巨大市场是各个公司核心业务的基石未来也难于撼动我们目前阶段更加关注在原有基础上的增量而非颠覆。----来自官方 1.Sharding-JDBC 定位为轻量级Java框架在Java的JDBC层提供的额外服务。 它使用客户端直连数据库以jar包形式提供服务无需额外部署和依赖可理解为增强版的JDBC驱动完全兼容JDBC和各种ORM框架。 适用于任何基于Java的ORM框架如JPA, Hibernate, Mybatis, Spring JDBC Template或直接使用JDBC。基于任何第三方的数据库连接池如DBCP, C3P0, BoneCP, Druid, HikariCP等。支持任意实现JDBC规范的数据库。目前支持MySQLOracleSQLServer和PostgreSQL。 2.Sharding-Proxy 定位为透明化的数据库代理端提供封装了数据库二进制协议的服务端版本用于完成对异构语言的支持。 目前先提供MySQL版本它可以使用任何兼容MySQL协议的访问客户端(如MySQL Command Client, MySQL Workbench等)操作数据对DBA更加友好。 向应用程序完全透明可直接当做MySQL使用。适用于任何兼容MySQL协议的客户端。 3.Sharding-SidecarTBD 定位为Kubernetes或Mesos的云原生数据库代理以DaemonSet的形式代理所有对数据库的访问。 通过无中心、零侵入的方案提供与数据库交互的的啮合层即Database Mesh又可称数据网格。 Database Mesh的关注重点在于如何将分布式的数据访问应用与数据库有机串联起来它更加关注的是交互是将杂乱无章的应用与数据库之间的交互有效的梳理。使用Database Mesh访问数据库的应用和数据库终将形成一个巨大的网格体系应用和数据库只需在网格体系中对号入座即可它们都是被啮合层所治理的对象。 二、为什么用到ShardingSphere 从性能方面来说由于关系型数据库大多采用B树类型的索引在数据量超过阈值的情况下索引深度的增加也将使得磁盘访问的IO次数增加进而导致查询性能的下降同时高并发访问请求也使得集中式数据库成为系统的最大瓶颈。 从可用性的方面来讲服务化的无状态型能够达到较小成本的随意扩容这必然导致系统的最终压力都落在数据库之上。而单一的数据节点或者简单的主从架构已经越来越难以承担。数据库的可用性已成为整个系统的关键。 从运维成本方面考虑当一个数据库实例中的数据达到阈值以上对于DBA的运维压力就会增大。数据备份和恢复的时间成本都将随着数据量的大小而愈发不可控。一般来讲单一数据库实例的数据的阈值在1TB之内是比较合理的范围。 在传统的关系型数据库无法满足互联网场景需要的情况下将数据存储至原生支持分布式的NoSQL的尝试越来越多。 但NoSQL对SQL的不兼容性以及生态圈的不完善使得它们在与关系型数据库的博弈中始终无法完成致命一击而关系型数据库的地位却依然不可撼动。 三、数据分片
水平分片又称为横向拆分。它不再将数据根据业务逻辑分类而是通过某个字段或某几个字段根据某种规则将数据分散至多个库或表中每个分片仅包含数据的一部分。 例如根据主键分片偶数主键的记录放入0库或表奇数主键的记录放入1库或表如下图所示。 简单的来说水平分片就是把一张大表的数据进行一个水平切割将切割出来的不同的部分添加到不同的表当中我们举这样的一个例子在一家银行当中最开始只开放了一个业务窗口因为一开始的业务量不大一个窗口足以解决这一天当中的所有问题但是由于业务员的出色的业务能力越来越多的人开始到这个银行办理业务了这时一个窗口就不够了需要多开几个窗口分担业务压力。我们这样设定一下一共开放5个窗口去哪个窗口取决于个人的身份证最后一位%5取余1如果是X那么就直接到1号窗口。
那么对于实际的业务来说我们也是如此一张订单表我们可以根据订单号进行取余操作分配表。
除了分表之外我们还可以分库具体的思想还是一致的。
四、SpringBoot整合ShardingSphere
1.创建我们的数据库ds0和ds1。分别创建我们的表格order0order1order2。两个数据库都运行一下
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS 0;-- ----------------------------
-- Table structure for t_order0
-- ----------------------------
DROP TABLE IF EXISTS t_order0;
CREATE TABLE t_order0 (order_id bigint(20) NOT NULL AUTO_INCREMENT,user_id int(11) NOT NULL,order_name varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,PRIMARY KEY (order_id) USING BTREE
) ENGINE InnoDB CHARACTER SET utf8 COLLATE utf8_bin ROW_FORMAT Compact;-- ----------------------------
-- Table structure for t_order1
-- ----------------------------
DROP TABLE IF EXISTS t_order1;
CREATE TABLE t_order1 (order_id bigint(20) NOT NULL AUTO_INCREMENT,user_id int(11) NOT NULL,order_name varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,PRIMARY KEY (order_id) USING BTREE
) ENGINE InnoDB CHARACTER SET utf8 COLLATE utf8_bin ROW_FORMAT Compact;-- ----------------------------
-- Table structure for t_order2
-- ----------------------------
DROP TABLE IF EXISTS t_order2;
CREATE TABLE t_order2 (order_id bigint(20) NOT NULL AUTO_INCREMENT,user_id int(11) NOT NULL,order_name varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL,PRIMARY KEY (order_id) USING BTREE
) ENGINE InnoDB CHARACTER SET utf8 COLLATE utf8_bin ROW_FORMAT Compact;SET FOREIGN_KEY_CHECKS 1;
2.引入依赖
这里的依赖是为了实现我的们的目标进行多线程分库分表插入。 dependencygroupIdorg.apache.shardingsphere/groupIdartifactIdshardingsphere-jdbc-core-spring-boot-starter/artifactIdversion5.0.0/version/dependencydependencygroupIdcom.baomidou/groupIdartifactIdmybatis-plus-boot-starter/artifactIdversion3.5.2/version/dependencydependencygroupIdcom.mysql/groupIdartifactIdmysql-connector-j/artifactId/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactId/dependencydependencygroupIdlog4j/groupIdartifactIdlog4j/artifactIdversion1.2.17/version/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.8.18/version/dependency
3.添加配置文件。创建application.yml
我来讲解一下这些配置文件都是干啥的都写到注释了。
spring:shardingsphere:props:#d打印Sql语句sql-show: truedatasource:#创建我们的ds0数据源ds0:#下边这些都是老套路了driver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://localhost:3306/ds0?useUnicodetruecharacterEncodingutf-8useSSLfalseserverTimezoneGMTallowPublicKeyRetrievaltruepassword: 2020type: com.zaxxer.hikari.HikariDataSourceusername: root#创建我们的ds1数据源ds1:#一样的老套路driver-class-name: com.mysql.cj.jdbc.Driverjdbc-url: jdbc:mysql://localhost:3306/ds1?useUnicodetruecharacterEncodingutf-8useSSLfalseserverTimezoneGMTallowPublicKeyRetrievaltruepassword: 2020type: com.zaxxer.hikari.HikariDataSourceusername: rootnames: ds0,ds1#这里就比较重要了这里是定义我们的分库分表的规则rules:sharding:#分片算法sharding-algorithms:#为分库定义一个算法 到底是如何分的库custom-db-inline:props:# 这里是具体的算法我们根据userId取余进行分库余数是几就分到ds几algorithm-expression: ds$-{user_id%2}type: INLINE# 如何分表custom-table-inline:props:# 根据orderId取余分表algorithm-expression: t_order$-{order_id%3}type: INLINEtables:# 这是我们的逻辑表 因为我们根本没有t_order这个表这是我们的t_order0 1 2抽象出来的t_order:# 这是我们的真实表actual-data-nodes: ds$-{0..1}.t_order$-{0..2}database-strategy:standard:# 分库算法的名称 也就是上边的sharding-algorithm-name: custom-db-inlinesharding-column: user_idtable-strategy:standard:# 分表算法名称sharding-algorithm-name: custom-table-inlinesharding-column: order_id
async:executor:thread:core_pool_size: 5max_pool_size: 20queue_capacity: 90000name:prefix: async-
mybatis-plus:global-config:db-config:id-type: assign_id
4.创建我们的框架结构 三层Order的代码如下。
// Order实体
Data
TableName(t_order)
SuppressWarnings(serial)
public class Order extends ModelOrder {TableId(type IdType.ASSIGN_ID)private Long orderId;private Integer userId;private String orderName;Overridepublic Serializable pkVal() {return this.orderId;}
}//mapper
Mapper
public interface OrderMapper extends BaseMapperOrder {
}//Order的service接口
public interface OrderService extends IServiceOrder {
}//接口实现
Service
public class OrderServiceImpl extends ServiceImplOrderMapper, Order implements OrderService {
}
ExecutorConfig配置我们的线程池。
Configuration
public class ExecutorConfig {Value(${async.executor.thread.core_pool_size})private int corePoolSize;Value(${async.executor.thread.max_pool_size})private int maxPoolSize;Value(${async.executor.thread.queue_capacity})private int queueCapacity;Value(${async.executor.thread.name.prefix})private String namePrefix;Bean(name asyncServiceExecutor)public Executor asyncServiceExecutor() {//在这里修改ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(corePoolSize);//配置最大线程数executor.setMaxPoolSize(maxPoolSize);//配置队列大小executor.setQueueCapacity(queueCapacity);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix(namePrefix);// rejection-policy当pool已经达到max size的时候如何处理新任务// CALLER_RUNS不在新线程中执行任务而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//执行初始化executor.initialize();return executor;}
} 创建AsyncService接口和实现类。
public interface AsyncService {void add(ListOrder orderList, CountDownLatch countDownLatch);
}
Service
Slf4j
public class AsyncServiceImpl implements AsyncService {Resourceprivate OrderServiceImpl orderService;Async(asyncServiceExecutor)Transactional(rollbackFor Exception.class)Overridepublic void add(ListOrder orderList, CountDownLatch countDownLatch) {try {log.debug(Thread.currentThread().getName()开始插入数据);orderService.saveBatch(orderList);log.debug(Thread.currentThread().getName()插入数据完成);}finally {countDownLatch.countDown();}}
} 要使用多线程异步调用要在启动程序上加上注解。
SpringBootApplication
EnableAsync
EnableTransactionManagement
public class ShardingSphereApplication {public static void main(String[] args) {SpringApplication.run(ShardingSphereApplication.class, args);}}
现在来看我们的AysncController。我定义了一个getData的方法用于模拟生成我们的数据当然我设置的名称都差不多一共一万条数据通过user_id进行分库通过order_id进行分表userId使用的是for循环的i索引orderId使用的是雪花算法生成的Id序列。
在testAsyncInsert方法中。使用ListUtils的方法进行数据切片每两千条数据切割成一个list然后执行异步添加操作。待所有线程执行完毕之后打印输出语句。
RestController
public class AsyncController {Autowiredprivate AsyncService asyncService;GetMapping(/test)public String testAsyncInsert(){CountDownLatch c;try {ListOrder data getData();ListListOrder partition ListUtil.partition(data, 2000);c new CountDownLatch(partition.size());for (ListOrder list : partition) {asyncService.add(list,c);}c.await();}catch (Exception e){e.printStackTrace();}finally {System.out.println(所有的数据插入完毕);}return 执行完毕;}private ListOrder getData(){ListOrder list new ArrayList();for(int i 0;i10000;i){Order o new Order();o.setOrderName(苹果i);o.setUserId(i1);list.add(o);}return list;}
}
看结果。 大家可以自己去验证一下。