网站的建设书籍,移动网站建设的基本流程图,网络营销方式有哪几种,网站的目录结构文章目录 前言一、双写一致性模式#xff08;同步#xff09;Redis-MySQLMySQL-Redis 二、数据监听模式#xff08;异步#xff09;Redis-MySQLMySQL - Redis 总结 前言
Redis和MySQL之间保持数据一致性是个复杂的问题#xff0c;搜索资料发现大部分也只… 文章目录 前言一、双写一致性模式同步Redis-MySQLMySQL-Redis 二、数据监听模式异步Redis-MySQLMySQL - Redis 总结 前言
Redis和MySQL之间保持数据一致性是个复杂的问题搜索资料发现大部分也只做了理论的说明。主流的方案大概是两种一种是同步一种是异步。下面我们来分析这两种模式。 一、双写一致性模式同步
双写就是在插入Redis数据的同时再向MySQL写入或者在写入MySQL的同时再向Redis写入。这种方式的优点是数据高度一致而且实时同步。但缺点也很明显侵入性太强需要时刻编码同时还需要考虑各自的事务控制。具体实现方案如下
Redis-MySQL
这种方式需要Redis来显示控制事务当然数据库事务也必须要有
package com.test.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
SpringBootApplication
RestController
EnableTransactionManagement
public class TestRedisToMysqlApplication {Autowiredprivate StringRedisTemplate stringRedisTemplate;Autowiredprivate JdbcTemplate jdbcTemplate;RequestMapping(/test)Transactionalpublic String test1(){stringRedisTemplate.execute(new RedisCallbackBoolean() {Overridepublic Boolean doInRedis(RedisConnection connection) throws DataAccessException {connection.multi();connection.commands().set(k1.getBytes(),1.getBytes());connection.commands().set(k2.getBytes(),2.getBytes());jdbcTemplate.update(insert into t_user (k1,k2) values (?,?),1,2);connection.exec();return true;}});return success;}public static void main(String[] args) {SpringApplication.run(TestRedisToMysqlApplication.class, args);}}MySQL-Redis
这种方式只需要控制jdbc事务即可
package com.test.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;SpringBootApplication
RestController
EnableTransactionManagement
public class TestMysqlToRedisApplication {Autowiredprivate StringRedisTemplate stringRedisTemplate;Autowiredprivate JdbcTemplate jdbcTemplate;RequestMapping(/test)Transactionalpublic String test1(){jdbcTemplate.update(insert into t_user (k1,k2) values (?,?),1,2);stringRedisTemplate.opsForValue().set(k1,1);stringRedisTemplate.opsForValue().set(k2,2);return success;}public static void main(String[] args) {SpringApplication.run(TestMysqlToRedisApplication.class, args);}}
二、数据监听模式异步
异步模式是通过对Redis的监听或者对MySQL的监听来实现这种方式具有一定延迟但是对原有代码无侵入性可以单独开发程序来独立执行并且无需关心各自的事务操作。在不需要绝对实时性的情况下是不错的选择。
Redis-MySQL
这种模式需要在Redis的配置文件redis.conf中修改
notify-keyspace-events KEApackage com.test.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.jdbc.core.JdbcTemplate;import java.nio.charset.StandardCharsets;
import java.util.Objects;SpringBootApplication
public class TestRedisApplication {Autowiredprivate StringRedisTemplate stringRedisTemplate;Autowiredprivate JdbcTemplate jdbcTemplate;Beanpublic MessageListener redisMessageListener() {return (Message message, byte[] pattern)-{String key new String(message.getBody(), StandardCharsets.UTF_8);String valuestringRedisTemplate.opsForValue().get(key);System.out.println(key: key 发生变化。变化的值value);//下面进行数据库操作,具体的逻辑需要根据你的设计来编写jdbcTemplate.update(insert into t_user (key) values (?),key,value);};}Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {final RedisMessageListenerContainer container new RedisMessageListenerContainer();container.setConnectionFactory(Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()));return container;}EventListenervoid listener(ApplicationReadyEvent event) {Topic topic new PatternTopic(__keyevent*);// 监听 整个redis数据库 的所有事件;RedisMessageListenerContainer redisMessageListenerContainer event.getApplicationContext().getBean(RedisMessageListenerContainer.class);MessageListener redisMessageListener event.getApplicationContext().getBean(MessageListener.class);redisMessageListenerContainer.addMessageListener(redisMessageListener, topic);}public static void main(String[] args) {SpringApplication.run(TestRedisApplication.class, args);}}
MySQL - Redis
监听MySQL最方便的方式是监听MySQL的二进制文件这种方式对原有数据无侵入。关于二进制文件的监听方案有很多比如Canal 但是Canal再和Java集成上稍显复杂这里给大家介绍另外一款工具Debezium在集成上很方便具体操作如下 加入maven依赖
dependencygroupIdio.debezium/groupIdartifactIddebezium-api/artifactIdversion1.6.0.Final/version
/dependency
dependencygroupIdio.debezium/groupIdartifactIddebezium-embedded/artifactIdversion1.6.0.Final/version
/dependency
dependencygroupIdio.debezium/groupIdartifactIddebezium-connector-mysql/artifactIdversion1.6.0.Final/version
/dependency编写DebeziumServerBootstrap用作启动Debezium
package com.test.spring;import io.debezium.engine.DebeziumEngine;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;Data
Slf4j
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {private final Thread.UncaughtExceptionHandler handler new Thread.UncaughtExceptionHandler() {public void uncaughtException(Thread t, Throwable e) {log.error(解析事件有一个错误 , e);}};private Thread thread null;private boolean running false;private DebeziumEngine? debeziumEngine;Overridepublic void start() {threadnew Thread(debeziumEngine);thread.setName(debezium-server-thread);thread.setUncaughtExceptionHandler(handler);thread.start();running true;}SneakyThrowsOverridepublic void stop() {debeziumEngine.close();this.runningfalse;thread.join();log.info(DebeziumServerBootstrap stop );}Overridepublic boolean isRunning() {return running;}Overridepublic void afterPropertiesSet() throws Exception {Assert.notNull(debeziumEngine, debeziumEngine must not be null);}
}编写DebeziumConfiguration配置
package com.test.spring;import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Field;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.apache.commons.lang3.tuple.Pair;import java.util.List;
import java.util.Map;import static io.debezium.data.Envelope.FieldName.AFTER;
import static io.debezium.data.Envelope.FieldName.BEFORE;
import static io.debezium.data.Envelope.FieldName.OPERATION;
import static java.util.stream.Collectors.toMap;Slf4j
public class DebeziumConfiguration {private static final String serverNamedebecontrol;/*** Debezium 配置.** return configuration*/Beanpublic io.debezium.config.Configuration debeziumConfig(Environment environment) {String usernameenvironment.getProperty(spring.datasource.username);String passwordenvironment.getProperty(spring.datasource.password);String direnvironment.getProperty(canal.conf.dir);String defaultDatabaseNameenvironment.getProperty(canal.defaultDatabaseName);String slaveIdenvironment.getProperty(canal.slaveId);String urlenvironment.getProperty(canal.address);String[] urlsurl.split([:]);return io.debezium.config.Configuration.create()
// 连接器的Java类名称.with(connector.class, MySqlConnector.class.getName())
// 偏移量持久化用来容错 默认值.with(offset.storage, org.apache.kafka.connect.storage.FileOffsetBackingStore)
// 捕获偏移量的周期.with(offset.flush.interval.ms, 6000)
// 连接器的唯一名称.with(name, mysql-connector)
// 数据库的hostname.with(database.hostname, urls[0])
// 端口.with(database.port, urls[1])
// 用户名.with(database.user, username)
// 密码.with(database.password, password)
// 包含的数据库列表.with(database.include.list, defaultDatabaseName)
// 是否包含数据库表结构层面的变更建议使用默认值true.with(include.schema.changes, false)
// mysql.cnf 配置的 server-id.with(database.server.id, slaveId)
// MySQL 服务器或集群的逻辑名称.with(database.server.name, serverName)
// 历史变更记录.with(database.history, io.debezium.relational.history.FileDatabaseHistory).build();}/*** Debezium server bootstrap debezium server bootstrap.** param configuration the configuration* return the debezium server bootstrap*/Beanpublic DebeziumServerBootstrap debeziumServerBootstrap(io.debezium.config.Configuration configuration) {DebeziumServerBootstrap debeziumServerBootstrap new DebeziumServerBootstrap();DebeziumEngineRecordChangeEventSourceRecord debeziumEngine DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(configuration.asProperties()).notifying(this::handlePayload).build();debeziumServerBootstrap.setDebeziumEngine(debeziumEngine);return debeziumServerBootstrap;}private void handlePayload(ListRecordChangeEventSourceRecord recordChangeEvents, DebeziumEngine.RecordCommitterRecordChangeEventSourceRecord recordCommitter) {recordChangeEvents.forEach(r - {SourceRecord sourceRecord r.record();Struct sourceRecordChangeValue (Struct) sourceRecord.value();if(sourceRecordChangeValuenull) return;this.handlePayload1(sourceRecordChangeValue);});}private void handlePayload1(Struct sourceRecordChangeValue){try{// 判断操作的类型 过滤掉读 只处理增删改 这个其实可以在配置中设置Envelope.Operation operation Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));if(operationEnvelope.Operation.READ) return;//customer_mysql_db_server.control.t_dic.Envelope//debecontrol.control.t_dic.EnvelopeString name sourceRecordChangeValue.schema().name();String[] namesname.split([.]);String talbenames[2];// 获取增删改对应的结构体数据Struct before_struct (Struct) sourceRecordChangeValue.get(BEFORE);// 将变更的行封装为MapMapString, Object before_payload null;if(before_struct!null){before_payload before_struct.schema().fields().stream().map(Field::name).filter(fieldName - before_struct.get(fieldName) ! null).map(fieldName - Pair.of(fieldName, before_struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));}// 获取增删改对应的结构体数据Struct after_struct (Struct) sourceRecordChangeValue.get(AFTER);MapString, Object after_payload null;if(after_struct!null){// 将变更的行封装为Mapafter_payload after_struct.schema().fields().stream().map(Field::name).filter(fieldName - after_struct.get(fieldName) ! null).map(fieldName - Pair.of(fieldName, after_struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));}//在这里进行Redis操作if(operationEnvelope.Operation.CREATE){//数据库插入}else if(operationEnvelope.Operation.UPDATE){//数据库更新}else if(operationEnvelope.Operation.DELETE){//数据库删除}}catch (Exception e){log.warn(解析数据错误e.getMessage());}}}入口类
package com.test.spring;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
SpringBootApplication
Import(DebeziumConfiguration.class)
public class TestMysqlApplication {public static void main(String[] args) {SpringApplication.run(TestMysqlApplication.class, args);}}这里我们需要开启MySQL的二进制日志需要修改my.cnf文件增加如下配置
log-binmysql-bin
binlog_formatrow
server-id1
log_bin_trust_function_creators1总结
关于Redis与MySQL数据一致性我觉得还需要考虑各自的数据结构如何设计因为这两种存储方式完全不一样。