当前位置: 首页 > news >正文

空间租用 网站开发重庆手机网站推广资料

空间租用 网站开发,重庆手机网站推广资料,电脑网页打不开是什么原因,广东网站建设微信官网开发大纲 1.Redisson可重入锁RedissonLock概述 2.可重入锁源码之创建RedissonClient实例 3.可重入锁源码之lua脚本加锁逻辑 4.可重入锁源码之WatchDog维持加锁逻辑 5.可重入锁源码之可重入加锁逻辑 6.可重入锁源码之锁的互斥阻塞逻辑 7.可重入锁源码之释放锁逻辑 8.可重入锁…大纲 1.Redisson可重入锁RedissonLock概述 2.可重入锁源码之创建RedissonClient实例 3.可重入锁源码之lua脚本加锁逻辑 4.可重入锁源码之WatchDog维持加锁逻辑 5.可重入锁源码之可重入加锁逻辑 6.可重入锁源码之锁的互斥阻塞逻辑 7.可重入锁源码之释放锁逻辑 8.可重入锁源码之获取锁超时与锁超时自动释放逻辑 9.可重入锁源码总结 1.Redisson可重入锁RedissonLock概述 (1)在pom.xml里引入依赖 (2)构建RedissonClient并使用Redisson (3)Redisson可重入锁RedissonLock简单使用 (1)在pom.xml里引入依赖 dependenciesdependencygroupIdorg.redisson/groupIdartifactIdredisson/artifactIdversion3.16.8/version/dependency /dependencies (2)构建RedissonClient并使用Redisson 参考官网中文文档连接上3主3从的Redis Cluster。 //https://github.com/redisson/redisson/wiki/目录 public class Application {public static void main(String[] args) throws Exception {//连接3主3从的Redis CLusterConfig config new Config();config.useClusterServers().addNodeAddress(redis://192.168.1.110:7001).addNodeAddress(redis://192.168.1.110:7002).addNodeAddress(redis://192.168.1.110:7003).addNodeAddress(redis://192.168.1.111:7001).addNodeAddress(redis://192.168.1.111:7002).addNodeAddress(redis://192.168.1.111:7003);//创建RedissonClient实例RedissonClient redisson Redisson.create(config);//获取可重入锁RLock lock redisson.getLock(myLock);lock.lock();lock.unlock();RMapString, Object map redisson.getMap(myMap);map.put(foo, bar); map redisson.getMap(myMap);System.out.println(map.get(foo)); } } (3)Redisson可重入锁RedissonLock简单使用 Redisson可重入锁RLock实现了java.util.concurrent.locks.Lock接口同时还提供了异步(Async)、响应式(Reactive)和RxJava2标准的接口。 RLock lock redisson.getLock(myLock); //最常见的使用方法 lock.lock(); 如果设置锁的超时时间不合理导致超时时间已到时锁还没能主动释放但实际上锁却被Redis节点通过过期时间释放了这会有问题。 为了避免这种情况Redisson内部提供了一个用来监控锁的WatchDog。WatchDog的作用是在Redisson实例被关闭前不断地延长锁的有效期。 WatchDog检查锁的默认超时时间是30秒可通过Config.lockWatchdogTimeout来指定。 RLock的tryLock方法提供了leaseTime参数来指定加锁的超时时间超过这个时间后锁便自动被释放。 //如果没有主动释放锁的话10秒后将会自动释放锁 lock.lock(10, TimeUnit.SECONDS);//加锁等待最多是100秒加锁成功后如果没有主动释放锁的话锁会在10秒后自动释放 boolean res lock.tryLock(100, 10, TimeUnit.SECONDS); if (res) {try {...} finally {lock.unlock();} } RLock完全符合Java的Lock规范即只有拥有锁的进程才能解锁其他进程解锁则会抛出IllegalMonitorStateException错误。如果需要其他进程也能解锁那么可以使用分布式信号量Semaphore。 2.可重入锁源码之创建RedissonClient实例 (1)初始化与Redis的连接管理器ConnectionManager (2)初始化Redis的命令执行器CommandExecutor 使用Redisson.create()方法可以根据配置创建一个RedissonClient实例因为Redisson类会实现RedissonClient接口而创建RedissonClient实例的主要工作其实就是 一.初始化与Redis的连接管理器ConnectionManager 二.初始化Redis的命令执行器CommandExecutor (1)初始化与Redis的连接管理器ConnectionManager Redis的配置类Config会被封装在连接管理器ConnectionManager中后续可以通过连接管理器ConnectionManager获取Redis的配置类Config。 public class Application {public static void main(String[] args) throws Exception {Config config new Config();config.useClusterServers().addNodeAddress(redis://192.168.1.110:7001);//创建RedissonClient实例RedissonClient redisson Redisson.create(config);...} }//创建RedissonClient实例的源码 public class Redisson implements RedissonClient {protected final Config config;//Redis配置类protected final ConnectionManager connectionManager;//Redis的连接管理器protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器...public static RedissonClient create(Config config) {return new Redisson(config);}protected Redisson(Config config) {this.config config;Config configCopy new Config(config);//根据Redis配置类Config实例创建和Redis的连接管理器connectionManager ConfigSupport.createConnectionManager(configCopy);RedissonObjectBuilder objectBuilder null;if (config.isReferenceEnabled()) {objectBuilder new RedissonObjectBuilder(this);}//创建Redis的命令执行器commandExecutor new CommandSyncService(connectionManager, objectBuilder);evictionScheduler new EvictionScheduler(commandExecutor);writeBehindService new WriteBehindService(commandExecutor);}... }public class ConfigSupport {...//创建Redis的连接管理器public static ConnectionManager createConnectionManager(Config configCopy) {//生成UUIDUUID id UUID.randomUUID();...if (configCopy.getClusterServersConfig() ! null) {validate(configCopy.getClusterServersConfig());//返回ClusterConnectionManager实例return new ClusterConnectionManager(configCopy.getClusterServersConfig(), configCopy, id);}...}... }public class ClusterConnectionManager extends MasterSlaveConnectionManager {public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {super(config, id);...this.natMapper cfg.getNatMapper();//将Redis的配置类Config封装在ConnectionManager中this.config create(cfg);initTimer(this.config);Throwable lastException null;ListString failedMasters new ArrayListString();for (String address : cfg.getNodeAddresses()) {RedisURI addr new RedisURI(address);//异步连接Redis节点CompletionStageRedisConnection connectionFuture connectToNode(cfg, addr, addr.getHost());...//通过connectionFuture阻塞获取建立好的连接RedisConnection connection connectionFuture.toCompletableFuture().join();...ListClusterNodeInfo nodes connection.sync(clusterNodesCommand);...CompletableFutureCollectionClusterPartition partitionsFuture parsePartitions(nodes);CollectionClusterPartition partitions partitionsFuture.join();ListCompletableFutureVoid masterFutures new ArrayList();for (ClusterPartition partition : partitions) {if (partition.isMasterFail()) {failedMasters.add(partition.getMasterAddress().toString());continue;}if (partition.getMasterAddress() null) {throw new IllegalStateException(Master node: partition.getNodeId() doesnt have address.);}CompletableFutureVoid masterFuture addMasterEntry(partition, cfg);masterFutures.add(masterFuture);}CompletableFutureVoid masterFuture CompletableFuture.allOf(masterFutures.toArray(new CompletableFuture[0]));masterFuture.join();...}...}... }public class MasterSlaveConnectionManager implements ConnectionManager {protected final String id;//初始化时为UUIDprivate final MapRedisURI, RedisConnection nodeConnections new ConcurrentHashMap();...protected MasterSlaveConnectionManager(Config cfg, UUID id) {this.id id.toString();//传入的是UUIDthis.cfg cfg;...}protected final CompletionStageRedisConnection connectToNode(NodeType type, BaseConfig? cfg, RedisURI addr, String sslHostname) {RedisConnection conn nodeConnections.get(addr);if (conn ! null) {if (!conn.isActive()) {closeNodeConnection(conn);} else {return CompletableFuture.completedFuture(conn);}}//创建Redis客户端连接实例RedisClient client createClient(type, addr, cfg.getConnectTimeout(), cfg.getTimeout(), sslHostname);//向Redis服务端发起异步连接请求这个future会层层往外返回CompletionStageRedisConnection future client.connectAsync();return future.thenCompose(connection - {if (connection.isActive()) {if (!addr.isIP()) {RedisURI address new RedisURI(addr.getScheme() :// connection.getRedisClient().getAddr().getAddress().getHostAddress() : connection.getRedisClient().getAddr().getPort());nodeConnections.put(address, connection);}nodeConnections.put(addr, connection);return CompletableFuture.completedFuture(connection);} else {connection.closeAsync();CompletableFutureRedisConnection f new CompletableFuture();f.completeExceptionally(new RedisException(Connection to connection.getRedisClient().getAddr() is not active!));return f;}});}//创建Redis客户端连接实例Overridepublic RedisClient createClient(NodeType type, RedisURI address, int timeout, int commandTimeout, String sslHostname) {RedisClientConfig redisConfig createRedisConfig(type, address, timeout, commandTimeout, sslHostname);return RedisClient.create(redisConfig);}... }//Redisson主要使用Netty去和Redis服务端建立连接 public final class RedisClient {private final Bootstrap bootstrap;private final Bootstrap pubSubBootstrap;...public static RedisClient create(RedisClientConfig config) {return new RedisClient(config);}private RedisClient(RedisClientConfig config) {...bootstrap createBootstrap(copy, Type.PLAIN);pubSubBootstrap createBootstrap(copy, Type.PUBSUB);this.commandTimeout copy.getCommandTimeout();}private Bootstrap createBootstrap(RedisClientConfig config, Type type) {Bootstrap bootstrap new Bootstrap().resolver(config.getResolverGroup()).channel(config.getSocketChannelClass()).group(config.getGroup());bootstrap.handler(new RedisChannelInitializer(bootstrap, config, this, channels, type));bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout());bootstrap.option(ChannelOption.SO_KEEPALIVE, config.isKeepAlive());bootstrap.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay());config.getNettyHook().afterBoostrapInitialization(bootstrap);return bootstrap;}//向Redis服务端发起异步连接请求public RFutureRedisConnection connectAsync() {CompletableFutureInetSocketAddress addrFuture resolveAddr();CompletableFutureRedisConnection f addrFuture.thenCompose(res - {CompletableFutureRedisConnection r new CompletableFuture();//Netty的Bootstrap发起连接ChannelFuture channelFuture bootstrap.connect(res);channelFuture.addListener(new ChannelFutureListener() {Overridepublic void operationComplete(final ChannelFuture future) throws Exception {if (bootstrap.config().group().isShuttingDown()) {IllegalStateException cause new IllegalStateException(RedisClient is shutdown);r.completeExceptionally(cause);return;}if (future.isSuccess()) {RedisConnection c RedisConnection.getFrom(future.channel());c.getConnectionPromise().whenComplete((res, e) - {bootstrap.config().group().execute(new Runnable() {Overridepublic void run() {if (e null) {if (!r.complete(c)) {c.closeAsync();}} else {r.completeExceptionally(e);c.closeAsync();}}});});} else {bootstrap.config().group().execute(new Runnable() {public void run() {r.completeExceptionally(future.cause());}});}}});return r;});return new CompletableFutureWrapper(f);}... } (2)初始化Redis的命令执行器CommandExecutor 首先CommandSyncService继承自CommandAsyncService类。 而CommandAsyncService类实现了CommandExecutor接口。 然后ConnectionManager连接管理器会封装在命令执行器CommandExecutor中。 所以通过CommandExecutor命令执行器可以获取连接管理器ConnectionManager。 //Redis命令的同步执行器CommandSyncService public class CommandSyncService extends CommandAsyncService implements CommandExecutor {//初始化CommandExecutorpublic CommandSyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder) {super(connectionManager, objectBuilder, RedissonObjectBuilder.ReferenceType.DEFAULT);}public T, R R read(String key, RedisCommandT command, Object... params) {return read(key, connectionManager.getCodec(), command, params);}public T, R R read(String key, Codec codec, RedisCommandT command, Object... params) {RFutureR res readAsync(key, codec, command, params);return get(res);}public T, R R evalRead(String key, RedisCommandT evalCommandType, String script, ListObject keys, Object... params) {return evalRead(key, connectionManager.getCodec(), evalCommandType, script, keys, params);}public T, R R evalRead(String key, Codec codec, RedisCommandT evalCommandType, String script, ListObject keys, Object... params) {RFutureR res evalReadAsync(key, codec, evalCommandType, script, keys, params);return get(res);}public T, R R evalWrite(String key, RedisCommandT evalCommandType, String script, ListObject keys, Object... params) {return evalWrite(key, connectionManager.getCodec(), evalCommandType, script, keys, params);}public T, R R evalWrite(String key, Codec codec, RedisCommandT evalCommandType, String script, ListObject keys, Object... params) {RFutureR res evalWriteAsync(key, codec, evalCommandType, script, keys, params);return get(res);} }//Redis命令的异步执行器CommandAsyncService public class CommandAsyncService implements CommandAsyncExecutor {//Redis连接管理器final ConnectionManager connectionManager;final RedissonObjectBuilder objectBuilder;final RedissonObjectBuilder.ReferenceType referenceType;public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {this.connectionManager connectionManager;this.objectBuilder objectBuilder;this.referenceType referenceType;}Overridepublic V V getNow(CompletableFutureV future) {try {return future.getNow(null);} catch (Exception e) {return null;}}Overridepublic T, R R read(String key, Codec codec, RedisCommandT command, Object... params) {RFutureR res readAsync(key, codec, command, params);return get(res);}Overridepublic T, R RFutureR readAsync(String key, Codec codec, RedisCommandT command, Object... params) {NodeSource source getNodeSource(key);return async(true, source, codec, command, params, false, false);}private NodeSource getNodeSource(String key) {int slot connectionManager.calcSlot(key);return new NodeSource(slot);}public V, R RFutureR async(boolean readOnlyMode, NodeSource source, Codec codec, RedisCommandV command, Object[] params, boolean ignoreRedirect, boolean noRetry) {CompletableFutureR mainPromise createPromise();RedisExecutorV, R executor new RedisExecutor(readOnlyMode, source, codec, command, params, mainPromise, ignoreRedirect, connectionManager, objectBuilder, referenceType, noRetry);executor.execute();return new CompletableFutureWrapper(mainPromise);}Overridepublic V V get(RFutureV future) {if (Thread.currentThread().getName().startsWith(redisson-netty)) {throw new IllegalStateException(Sync methods cant be invoked from async/rx/reactive listeners);}try {return future.toCompletableFuture().get();} catch (InterruptedException e) {future.cancel(true);Thread.currentThread().interrupt();throw new RedisException(e);} catch (ExecutionException e) {throw convertException(e);}}... } 3.可重入锁源码之lua脚本加锁逻辑 (1)通过Redisson.getLock()方法获取一个RedissonLock实例 (2)加锁时的执行流程 (3)加锁时执行的lua脚本 (4)执行加锁lua脚本的命令执行器逻辑 (5)如何根据slot值获取对应的节点 (1)通过Redisson.getLock()方法获取一个RedissonLock实例 在Redisson.getLock()方法中会传入命令执行器CommandExecutor来创建一个RedissonLock实例而命令执行器CommandExecutor是在执行Redisson.create()方法时初始化好的所以命令执行器CommandExecutor会被封装在RedissonLock实例中。 因此通过RedissonLock实例可以获取一个命令执行器CommandExecutor通过命令执行器CommandExecutor可获取连接管理器ConnectionManager通过连接管理器ConnectionManager可获取Redis的配置信息类Config通过Redis的配置信息类Config可以获取各种配置信息。 RedissonLock类继承自实现了RLock接口的RedissonBaseLock类。在RedissonLock的构造方法里面有个internalLockLeaseTime变量这个internalLockLeaseTime变量与WatchDog看门狗有关系。interlnalLockLeaseTime的默认值是30000毫秒即30秒 public class Application {public static void main(String[] args) throws Exception {Config config new Config();config.useClusterServers().addNodeAddress(redis://192.168.1.110:7001);//创建RedissonClient实例RedissonClient redisson Redisson.create(config);//获取可重入锁RLock lock redisson.getLock(myLock);lock.lock();...} }//创建Redisson实例 public class Redisson implements RedissonClient {protected final Config config;//Redis配置类protected final ConnectionManager connectionManager;//Redis的连接管理器protected final CommandAsyncExecutor commandExecutor;//Redis的命令执行器...public static RedissonClient create(Config config) {return new Redisson(config);}protected Redisson(Config config) {...//根据Redis配置类Config实例创建和Redis的连接管理器connectionManager ConfigSupport.createConnectionManager(configCopy);//创建Redis的命令执行器commandExecutor new CommandSyncService(connectionManager, objectBuilder);...}...Overridepublic RLock getLock(String name) {return new RedissonLock(commandExecutor, name);}... }//创建RedissonLock实例 //通过RedissonLock实例可以获取一个命令执行器CommandExecutor public class RedissonLock extends RedissonBaseLock {protected long internalLockLeaseTime;protected final LockPubSub pubSub;final CommandAsyncExecutor commandExecutor;public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor commandExecutor;//与WatchDog有关的internalLockLeaseTime//通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config//通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间this.internalLockLeaseTime commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.pubSub commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}... }//创建Redis的命令执行器 //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager public class CommandAsyncService implements CommandAsyncExecutor {final ConnectionManager connectionManager;...public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {this.connectionManager connectionManager;this.objectBuilder objectBuilder;this.referenceType referenceType;}Overridepublic ConnectionManager getConnectionManager() {return connectionManager;}... }//创建Redis的连接管理器 //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config public class ClusterConnectionManager extends MasterSlaveConnectionManager {...public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {super(config, id);...}... }//创建Redis的连接管理器 //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config public class MasterSlaveConnectionManager implements ConnectionManager {private final Config cfg;protected final String id;//初始化时为UUID...protected MasterSlaveConnectionManager(Config cfg, UUID id) {this.id id.toString();//传入的是UUIDthis.cfg cfg;...}Overridepublic Config getCfg() {return cfg;}... }//配置信息类Config中的lockWatchdogTimeout变量初始化为30秒该变量与WatchDog有关 public class Config {private long lockWatchdogTimeout 30 * 1000;...//This parameter is only used if lock has been acquired without leaseTimeout parameter definition. //Lock expires after lockWatchdogTimeout if watchdog didnt extend it to next lockWatchdogTimeout time interval.//This prevents against infinity locked locks due to Redisson client crush or any other reason when lock cant be released in proper way.//Default is 30000 millisecondspublic Config setLockWatchdogTimeout(long lockWatchdogTimeout) {this.lockWatchdogTimeout lockWatchdogTimeout;return this;}public long getLockWatchdogTimeout() {return lockWatchdogTimeout;} } 默认情况下调用RedissonLock.lock()方法加锁时传入的leaseTime为-1。此时锁的超时时间会设为lockWatchdogTimeout默认的30秒从而避免出现死锁的情况。 public class RedissonLock extends RedissonBaseLock {...//加锁Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {long threadId Thread.currentThread().getId();Long ttl tryAcquire(-1, leaseTime, unit, threadId);...}//解锁Overridepublic void unlock() {try {get(unlockAsync(Thread.currentThread().getId()));} catch (RedisException e) {if (e.getCause() instanceof IllegalMonitorStateException) {throw (IllegalMonitorStateException) e.getCause();} else {throw e;}}}... } (2)加锁时的执行流程 首先会调用RedissonLock的tryAcquire()方法处理异步RFuture相关然后调用RedissonLock的tryAcquireAsync()方法对执行脚本的结果进行处理接着调用RedissonLock.tryLockInnerAsync方法执行加锁的lua脚本。 public class RedissonLock extends RedissonBaseLock {protected long internalLockLeaseTime;protected final LockPubSub pubSub;final CommandAsyncExecutor commandExecutor;public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor commandExecutor;//与WatchDog有关的internalLockLeaseTime//通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager//通过连接管理器ConnectionManager可以获取Redis的配置信息类Config//通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间this.internalLockLeaseTime commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.pubSub commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();}...//加锁Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {//线程ID用来生成设置Hash的值long threadId Thread.currentThread().getId();//尝试加锁此时执行RedissonLock.lock()方法默认传入的leaseTime-1Long ttl tryAcquire(-1, leaseTime, unit, threadId);//ttl为null说明加锁成功if (ttl null) {return;}//加锁失败时的处理CompletableFutureRedissonLockEntry future subscribe(threadId);if (interruptibly) {commandExecutor.syncSubscriptionInterrupted(future);} else {commandExecutor.syncSubscription(future);}try {while (true) {ttl tryAcquire(-1, leaseTime, unit, threadId);// lock acquiredif (ttl null) {break;}// waiting for messageif (ttl 0) {try {commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {if (interruptibly) {commandExecutor.getNow(future).getLatch().acquire();} else {commandExecutor.getNow(future).getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(commandExecutor.getNow(future), threadId);}}...private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {//默认下waitTime和leaseTime都是-1下面调用的get方法是来自于RedissonObject的get()方法//可以理解为异步转同步将异步的tryAcquireAsync通过get转同步return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));}private T RFutureLong tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFutureLong ttlRemainingFuture;if (leaseTime ! -1) {ttlRemainingFuture tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {//默认情况下由于leaseTime-1所以会使用初始化RedissonLock实例时的internalLockLeaseTime//internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值30秒ttlRemainingFuture tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}CompletionStageLong f ttlRemainingFuture.thenApply(ttlRemaining - {//加锁返回的ttlRemaining为null表示加锁成功if (ttlRemaining null) {if (leaseTime ! -1) {internalLockLeaseTime unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper(f);}//默认情况下外部传入的leaseTime-1时会取lockWatchdogTimeout的默认值30秒T RFutureT tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);,Collections.singletonList(getRawName()),//锁的名字KEYS[1]unit.toMillis(leaseTime),//过期时间ARGV[1]默认时为30秒getLockName(threadId)//ARGV[2]值为UUID 线程ID);}... }public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {final String id;final String entryName;final CommandAsyncExecutor commandExecutor;public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor commandExecutor;this.id commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName id : name;}protected String getLockName(long threadId) {return id : threadId;}... }abstract class RedissonExpirable extends RedissonObject implements RExpirable {...RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {super(connectionManager, name);}... }public abstract class RedissonObject implements RObject {protected final CommandAsyncExecutor commandExecutor;protected String name;protected final Codec codec;public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {this.codec codec;this.commandExecutor commandExecutor;if (name null) {throw new NullPointerException(name cant be null);}setName(name);}...protected final V V get(RFutureV future) {//下面会调用CommandAsyncService.get()方法return commandExecutor.get(future);}... }public class CommandAsyncService implements CommandAsyncExecutor {...Overridepublic V V get(RFutureV future) {if (Thread.currentThread().getName().startsWith(redisson-netty)) {throw new IllegalStateException(Sync methods cant be invoked from async/rx/reactive listeners);}try {return future.toCompletableFuture().get();} catch (InterruptedException e) {future.cancel(true);Thread.currentThread().interrupt();throw new RedisException(e);} catch (ExecutionException e) {throw convertException(e);}}... } (3)加锁时执行的lua脚本 public class RedissonLock extends RedissonBaseLock {//默认情况下外部传入的leaseTime-1时会取lockWatchdogTimeout的默认值30秒T RFutureT tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommandT command) {return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,if (redis.call(exists, KEYS[1]) 0) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) 1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1]); return nil; end; return redis.call(pttl, KEYS[1]);,Collections.singletonList(getRawName()),//锁的名字KEYS[1]比如myLockunit.toMillis(leaseTime),//过期时间ARGV[1]默认时为30秒getLockName(threadId)//ARGV[2]值为UUID 线程ID);}... } 首先执行Redis的exists命令判断key为锁名的Hash值是否不存在也就是判断key为锁名myLock的Hash值是否存在。 一.如果key为锁名的Hash值不存在那么就进行如下加锁处理 首先通过Redis的hset命令设置一个key为锁名的Hash值。该Hash值的key为锁名value是一个映射。也就是在value值中会有一个field为UUID 线程IDvalue为1的映射。比如hset myLock UUID:ThreadID 1lua脚本中的ARGV[2]就是由UUID 线程ID组成的唯一值。 然后通过Redis的pexpire命令设置key为锁名的Hash值的过期时间也就是设置key为锁名的Hash值的过期时间为30秒。比如pexpire myLock 30000。所以默认情况下myLock这个锁在30秒后就会自动过期。 二.如果key为锁名的Hash值存在那么就执行如下判断处理 首先通过Redis的hexists命令判断在key为锁名的Hash值里field为UUID 线程ID的映射是否已经存在。 如果在key为锁名的Hash值里field为UUID 线程ID的映射存在那么就通过Redis的hincrby命令对field为UUID 线程ID的value值进行递增1。比如hincrby myLock UUID:ThreadID 1。也就是在key为myLock的Hash值里把field为UUID:ThreadID的value值从1累加到2发生这种情况的时候往往就是当前线程对锁进行了重入。接着执行pexpire myLock 30000再次将myLock的有效期设置为30秒。 如果在key为锁名的Hash值里field为UUID 线程ID的映射不存在发生这种情况的时候往往就是其他线程获取不到这把锁而产生互斥。那么就通过Redis的pttl命令返回key为锁名的Hash值的剩余存活时间因为不同线程的ARGV[2]是不一样的ARGV[2] UUID 线程ID。 (4)执行加锁lua脚本的命令执行器逻辑 在RedissonLock的tryLockInnerAsync()方法中会通过RedissonBaseLock的evalWriteAsync()方法执行lua脚本即通过CommandAsyncService的evalWriteAsync()方法执行lua脚本。 在CommandAsyncService的evalWriteAsync()方法中首先会执行CommandAsyncService的getNodeSource()方法获取对应的节点。然后执行CommandAsyncService的evalAsync()方法来执行lua脚本。 在CommandAsyncService的getNodeSource()方法中会根据key进行CRC16运算然后再对16384取模计算出key的slot值。然后根据这个slot值创建一个NodeSource实例进行返回。 在CommandAsyncService的evalAsync()方法中会将获得的NodeSource实例封装到Redis执行器RedisExecutor里。然后执行RedisExecutor实现将脚本请求发送给对应的Redis节点处理。 public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {//从外部传入的在创建实现了RedissonClient的Redisson实例时初始化的命令执行器CommandExecutorfinal CommandAsyncExecutor commandExecutor;public RedissonBaseLock(CommandAsyncExecutor commandExecutor, String name) {super(commandExecutor, name);this.commandExecutor commandExecutor;this.id commandExecutor.getConnectionManager().getId();this.internalLockLeaseTime commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();this.entryName id : name;}...protected T RFutureT evalWriteAsync(String key, Codec codec, RedisCommandT evalCommandType, String script, ListObject keys, Object... params) {//获取可用的节点并继续封装一个命令执行器CommandBatchServiceMasterSlaveEntry entry commandExecutor.getConnectionManager().getEntry(getRawName());int availableSlaves entry.getAvailableSlaves();CommandBatchService executorService createCommandBatchService(availableSlaves);//通过CommandAsyncService.evalWriteAsync方法执行lua脚本RFutureT result executorService.evalWriteAsync(key, codec, evalCommandType, script, keys, params);if (commandExecutor instanceof CommandBatchService) {return result;}//异步执行然后获取结果RFutureBatchResult? future executorService.executeAsync();CompletionStageT f future.handle((res, ex) - {if (ex null res.getSyncedSlaves() ! availableSlaves) {throw new CompletionException(new IllegalStateException(Only res.getSyncedSlaves() of availableSlaves slaves were synced));}return result.getNow();});return new CompletableFutureWrapper(f);}private CommandBatchService createCommandBatchService(int availableSlaves) {if (commandExecutor instanceof CommandBatchService) {return (CommandBatchService) commandExecutor;}BatchOptions options BatchOptions.defaults().syncSlaves(availableSlaves, 1, TimeUnit.SECONDS);return new CommandBatchService(commandExecutor, options);}... }public class CommandBatchService extends CommandAsyncService {...public CommandBatchService(CommandAsyncExecutor executor, BatchOptions options) {this(executor.getConnectionManager(), options, executor.getObjectBuilder(), RedissonObjectBuilder.ReferenceType.DEFAULT);}private CommandBatchService(ConnectionManager connectionManager, BatchOptions options, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {super(connectionManager, objectBuilder, referenceType);this.options options;}... }public class CommandAsyncService implements CommandAsyncExecutor {final ConnectionManager connectionManager;final RedissonObjectBuilder objectBuilder;final RedissonObjectBuilder.ReferenceType referenceType;public CommandAsyncService(ConnectionManager connectionManager, RedissonObjectBuilder objectBuilder, RedissonObjectBuilder.ReferenceType referenceType) {this.connectionManager connectionManager;this.objectBuilder objectBuilder;this.referenceType referenceType;}...Overridepublic T, R RFutureR evalWriteAsync(String key, Codec codec, RedisCommandT evalCommandType, String script, ListObject keys, Object... params) {//获取key对应的节点NodeSource source getNodeSource(key);//让对应的节点执行lua脚本请求return evalAsync(source, false, codec, evalCommandType, script, keys, false, params);}//获取key对应的Redis Cluster节点private NodeSource getNodeSource(String key) {//先计算key对应的slot值int slot connectionManager.calcSlot(key);//返回节点实例return new NodeSource(slot);}//执行lua脚本private T, R RFutureR evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommandT evalCommandType, String script, ListObject keys, boolean noRetry, Object... params) {if (isEvalCacheActive() evalCommandType.getName().equals(EVAL)) {CompletableFutureR mainPromise new CompletableFuture();Object[] pps copy(params);CompletableFutureR promise new CompletableFuture();String sha1 calcSHA(script);RedisCommand cmd new RedisCommand(evalCommandType, EVALSHA);ListObject args new ArrayListObject(2 keys.size() params.length);args.add(sha1);args.add(keys.size());args.addAll(keys);args.addAll(Arrays.asList(params));//将根据key进行CRC16运算然后对16384取模获取到的NodeSource实例封装到Redis执行器RedisExecutor中RedisExecutorT, R executor new RedisExecutor(readOnlyMode, nodeSource, codec, cmd, args.toArray(), promise, false, connectionManager, objectBuilder, referenceType, noRetry);//通过执行Redis执行器RedisExecutor来实现将lua脚本请求发送给对应的Redis节点进行处理executor.execute();...}...}... }public class ClusterConnectionManager extends MasterSlaveConnectionManager {public static final int MAX_SLOT 16384;//Redis Cluster默认有16384个slot...//对key进行CRC16运算然后再对16384取模Overridepublic int calcSlot(String key) {if (key null) {return 0;}int start key.indexOf({);if (start ! -1) {int end key.indexOf(});if (end ! -1 start 1 end) {key key.substring(start 1, end);}}int result CRC16.crc16(key.getBytes()) % MAX_SLOT;return result;}... } (5)如何根据slot值获取对应的节点 因为最后会执行封装了NodeSource实例的RedisExecutor的excute()方法而NodeSource实例中又会封装了锁名key对应的slot值所以RedisExecutor的excute()方法可以通过getConnection()方法获取对应节点的连接。 其中RedisExecutor的getConnection()方法会调用到MasterSlaveConnectionManager的connectionWriteOp()方法该方法又会通过调用ConnectionManager的getEntry()方法根据slot值获取节点也就是由ClusterConnectionManager的getEntry()方法去获取Redis的主节点。 其实在初始化连接管理器ClusterConnectionManager时就已经根据配置初始化好哪些slot映射到那个Redis主节点了。 public class RedisExecutorV, R {NodeSource source;...public void execute() {...//异步获取建立好的Redis连接CompletableFutureRedisConnection connectionFuture getConnection().toCompletableFuture();...}protected CompletableFutureRedisConnection getConnection() {...connectionFuture connectionManager.connectionWriteOp(source, command);return connectionFuture;}... }public class MasterSlaveConnectionManager implements ConnectionManager {...Overridepublic CompletableFutureRedisConnection connectionWriteOp(NodeSource source, RedisCommand? command) {MasterSlaveEntry entry getEntry(source);...}private MasterSlaveEntry getEntry(NodeSource source) {if (source.getRedirect() ! null) {return getEntry(source.getAddr());}MasterSlaveEntry entry source.getEntry();if (source.getRedisClient() ! null) {entry getEntry(source.getRedisClient());}if (entry null source.getSlot() ! null) {//根据slot获取Redis的主节点entry getEntry(source.getSlot());}return entry;}... }public class ClusterConnectionManager extends MasterSlaveConnectionManager {//slot和Redis主节点的原子映射数组private final AtomicReferenceArrayMasterSlaveEntry slot2entry new AtomicReferenceArray(MAX_SLOT);//Redis客户端连接和Redis主节点的映射关系private final MapRedisClient, MasterSlaveEntry client2entry new ConcurrentHashMap();...Overridepublic MasterSlaveEntry getEntry(int slot) {//根据slot获取Redis的主节点return slot2entry.get(slot);}...//初始化连接管理器ClusterConnectionManager时//就已经根据配置初始化好那些slot映射到那个Redis主节点了public ClusterConnectionManager(ClusterServersConfig cfg, Config config, UUID id) {...for (String address : cfg.getNodeAddresses()) {...CompletableFutureCollectionClusterPartition partitionsFuture parsePartitions(nodes);CollectionClusterPartition partitions partitionsFuture.join();ListCompletableFutureVoid masterFutures new ArrayList();for (ClusterPartition partition : partitions) {...CompletableFutureVoid masterFuture addMasterEntry(partition, cfg);masterFutures.add(masterFuture);}...}...}private CompletableFutureVoid addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {...CompletionStageRedisConnection connectionFuture connectToNode(cfg, partition.getMasterAddress(), configEndpointHostName);connectionFuture.whenComplete((connection, ex1) - {//成功连接时的处理if (ex1 ! null) {log.error(Cant connect to master: {} with slot ranges: {}, partition.getMasterAddress(), partition.getSlotRanges());result.completeExceptionally(ex1);return;}MasterSlaveServersConfig config create(cfg);config.setMasterAddress(partition.getMasterAddress().toString());//创建Redis的主节点MasterSlaveEntry entry;if (config.checkSkipSlavesInit()) {entry new SingleEntry(ClusterConnectionManager.this, config);} else {SetString slaveAddresses partition.getSlaveAddresses().stream().map(r - r.toString()).collect(Collectors.toSet());config.setSlaveAddresses(slaveAddresses);entry new MasterSlaveEntry(ClusterConnectionManager.this, config);}CompletableFutureRedisClient f entry.setupMasterEntry(new RedisURI(config.getMasterAddress()), configEndpointHostName);f.whenComplete((masterClient, ex3) - {if (ex3 ! null) {log.error(Cant add master: partition.getMasterAddress() for slot ranges: partition.getSlotRanges(), ex3);result.completeExceptionally(ex3);return;}//为创建的Redis的主节点添加slot值for (Integer slot : partition.getSlots()) {addEntry(slot, entry);lastPartitions.put(slot, partition);}...});});...}//添加slot到对应节点的映射关系private void addEntry(Integer slot, MasterSlaveEntry entry) {MasterSlaveEntry oldEntry slot2entry.getAndSet(slot, entry);if (oldEntry ! entry) {entry.incReference();shutdownEntry(oldEntry);}client2entry.put(entry.getClient(), entry);}... }
http://www.pierceye.com/news/100322/

相关文章:

  • 西安网站快速优化重庆明建网络科技有限公司干啥的
  • 广州市天河区门户网站软件制作公司
  • 做网站前期创建文件夹博罗高端网站建设价格
  • 襄阳网站建设价格淄博网站推广价格
  • 网站推广的软件六安网站制作哪里有
  • 大型门户网站模板wordpress有哪些小工具
  • 有flash的网站新闻资讯app制作公司
  • 网站和平台有什么区别黄页88怎么发信息质量高
  • 阿里建站价格小户型室内装修设计公司网站
  • 建设银行网站安全性分析网络推广服务平台
  • 大型购物网站建设福建微网站建设公司
  • 做网站软件j程序员找工作网站
  • 济南网站建设系统画册设计公司宣传册
  • 上海网站设计方案家纺网站建设
  • 衡水精品网站建设游戏广告推广平台
  • 响应式企业网站建设营销战略
  • wordpress离线浏览搜索引擎优化包括
  • 门户网站建设需要多少呼伦贝尔市住房和城乡建设局网站
  • 静海集团网站建设住房城乡建设网站
  • 个人备案挂企业网站网站开发公司照片
  • 网站建设课程体会国内最新新闻简短
  • 网站开发大概价格最常用的网页制作软件
  • 商务网站模块设计时前台基础设施建设免费网站建设空间
  • 青海省公路工程建设总公司网站饮料公司网站模板
  • 建设部网站刘赵云网页版邮箱
  • 免费扑克网站企业网站怎么搜索优化
  • 做网站导航的厦门网站建设制作多少钱
  • 怎样免费注册网站域名鹤城建设集团网站
  • 3合1网站建设价格网站建设论坛快速建站
  • 怎样做钓鱼网站上海网站关键词排名优化报价