自定义springboot组件--基于redisson实现分布式锁
一 引入 1.1 分布式锁的概念
在单进程(启动一个jvm)的系统中,当存在多个线程可以改变某个变量(可变共享变量)时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。而同步的本质是通过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。这个标记可以理解为锁;
很多时候我们需要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,通过 Java 提供的并发 API 我们可以解决,在分布式环境下,就没有那么简单啦。
- 分布式与单机情况下最大的不同在于其不是多线程而是多进程。
- 多线程由于可以共享堆内存,可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,需要将标记存储在一个所有进程都能看到的地方。
如果是在集群或分布式环境中要保证多进程中的多线程的线程安全就要使用分布式锁,分布式锁的目的就是在分布式/集群环境中使用加锁手段保证多个服务节点对同一个数据进行顺序操作,保证数据的安全性
分布式锁:需要得有一把唯一且共享的锁,多个服务去获取锁,只有一个服务才能获取到锁,其他没有获取到锁的服务需要等待或者自旋,等获取到锁的服务业务执行完成释放锁,其他的服务就可以尝试获取锁。
二 整体代码实现Redisson是一个实现的Java操作Redis的工具包,它不仅提供了一系列常用的操作Redis的API,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDonLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法,Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。
核心思想:将整个分布式锁的底层实现逻辑抽离出来,通过aop实现分布锁,这种方式实现的分布式锁粒度比较粗会对整个方法进行加锁,如果要在要在更加细粒度的代码块中实现分布式锁可以直接代码中增加基于redission实现的分布式锁,抽离出来是为了更好的解耦.
定义分布式锁注解
@Documented @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface XlcpLock { String name() default ""; LockType lockType() default LockType.REENTRANT_LOCK; long aitTime() default Long.MIN_VALUE; long leaseTime() default Long.MIN_VALUE; LockTimeOutStrategy lockTimeOutStrategy() default LockTimeOutStrategy.NO_OPERATION; String[] keys() default ""; LockReleaseFailStrategy lockReleaseFailStrategy() default LockReleaseFailStrategy.NO_OPERATION; }
定义prop系统自定义核心参数
@ConfigurationProperties(prefix = XlcpLockProperties.PRE_FIX) @Data public class XlcpLockProperties { public static final String PRE_FIX = "config.lock"; private Long aitTime = 60L; private Long leaseTime = 60L; private Integer dataBase = 1; private String codec = ".redisson.codec.JsonJacksonCodec"; }
申明一个接口 用于申明锁的获取和释放
public interface Lock { Boolean acquire(LockInfo lockInfo); Boolean release(LockInfo lockInfo); }
通过工厂模式+策略模式提供对外可访问的锁类型(这里只申明了一种可重入的锁)
public enum LockType { // 可重入锁 REENTRANT_LOCK; } @Component @RequiredArgsConstructor public class ReentrantLock implements Lock, InitializingBean { private final RedissonClient redissonClient; private RLock rLock; @Override public Boolean acquire(LockInfo lockInfo) { rLock = redissonClient.getLock(lockInfo.getLockName()); try { return rLock.tryLock(lockInfo.getWaitTime(),lockInfo.getLeaseTime(), TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } return false; } @Override public Boolean release(LockInfo lockInfo) { if (rLock.isHeldByCurrentThread()){ try { return rLock.forceUnlockAsync().get(); } catch (InterruptedException e) { return false; } catch (ExecutionException e) { return false; } } return false; } @Override public void afterPropertiesSet() thros Exception { LockFactory.register(LockType.REENTRANT_LOCK,this); } } public class LockFactory { public static MapLOCKS = ne ConcurrentHashMap (); public static void register(LockType lockType,Lock lock){ LOCKS.put(lockType,lock); } public static Lock getLock(LockType lockType){ return LOCKS.get(lockType); } }
申明获取锁失败处理策略
public interface LockTimeOutHandler { void handler(Lock lock, LockInfo lockInfo); } public enum LockTimeOutStrategy implements LockTimeOutHandler { NO_OPERATION(){ @Override public void handler(Lock lock, LockInfo lockInfo) { // do something } }, FAST_FAIL(){ @Override public void handler(Lock lock, LockInfo lockInfo) { String message = String.format("获取锁失败,当前锁名称为:%s,获取锁的等待超时时间为:%ds", lockInfo.getLockName(), lockInfo.getWaitTime()); thro ne LockException(message); } }, KEEP_ACQUIRE(){ private static final long DEFAULT_INTERVAL = 100L; private static final long DEFAULT_MAX_INTERVAL = 3 60 1000L; @Override public void handler(Lock lock, LockInfo lockInfo) { long interval = DEFAULT_INTERVAL; // 内部自旋获取锁资源 hile (!lock.acquire(lockInfo)){ if (interval>DEFAULT_MAX_INTERVAL){ String message = String.format("获取锁失败,当前锁名称为:%s,获取锁的等待超时时间为:%ds", lockInfo.getLockName(), lockInfo.getWaitTime()); thro ne LockException(message); } try { TimeUnit.MILLISECONDS.sleep(interval); interval<<=1; } catch (InterruptedException e) { e.printStackTrace(); thro ne LockException("获取锁失败!"); } } } }; }
申明释放锁失败的处理策略
public interface LockReleaseHandel { void handler(LockInfo lockInfo); } public enum LockReleaseFailStrategy implements LockReleaseHandel { NO_OPERATION(){ @Override public void handler(LockInfo lockInfo) { } }, FAST_FAIL(){ @Override public void handler(LockInfo lockInfo) { String errorMessge = String.format("当前锁释放失败,当前锁名称为:{}", lockInfo.getLockName()); thro ne LockException(errorMessge); } }; }
aop切面实现分布式锁
@Aspect @RequiredArgsConstructor @Slf4j public class XlcpLockAspect { private final LockInfoProvider lockInfoProvider; @Around("@annotation(xlcpLock)") @SneakyThros public Object around(ProceedingJoinPoint point, XlcpLock xlcpLock){ // 构建分布式锁的基础信息 LockInfo lockInfo = lockInfoProvider.getLockInfo(point, xlcpLock); Lock lock = LockFactory.getLock(xlcpLock.lockType()); LockResource lockResource = ne LockResource(lockInfo, lock); LockContextHolder.set(lockResource); try { Boolean acquire = lock.acquire(lockInfo); log.info("锁获取结果:{}",acquire); // 判断当前锁是否获取成功 if (!acquire){ // 获取锁的后置处理 xlcpLock.lockTimeOutStrategy().handler(lock,lockInfo); } // 设置当前线程持有锁 LockContextHolder.setRes(); } catch (Exception exception) { exception.printStackTrace(); LockContextHolder.remove(); } return point.proceed(); } @AfterReturning("@annotation(xlcpLock)") @SneakyThros public void after(JoinPoint point,XlcpLock xlcpLock){ releaseLock(xlcpLock); LockContextHolder.remove(); } @AfterThroing(value = "@annotation(xlcpLock)", throing = "ex") public void afterThroing (JoinPoint joinPoint, XlcpLock xlcpLock, Throable ex) thros Throable { releaseLock(xlcpLock); LockContextHolder.remove(); thro ex; } private void releaseLock(XlcpLock xlcpLock) { LockResource lockResource = LockContextHolder.get(); if (Objects.isNull(lockResource)){ thro ne LockException("上下文中不存在当前线程的锁资源"); } if (lockResource.getRes()){ Lock lock = lockResource.getLock(); Boolean releaseResult = lock.release(lockResource.getLockInfo()); lockResource.setRes(Boolean.FALSE); if (!releaseResult){ // 释放锁后置处理 xlcpLock.lockReleaseFailStrategy().handler(lockResource.getLockInfo()); } } } }
一些基础类
@UtilityClass public class LockContextHolder { private final ThreadLocalTHREAD_LOCAL_LOCK = ne TransmittableThreadLocal<>(); public LockResource get(){ return THREAD_LOCAL_LOCK.get(); } public void set(LockResource lockResource){ THREAD_LOCAL_LOCK.set(lockResource); } public void remove(){ THREAD_LOCAL_LOCK.remove(); } public void setRes(){ LockResource lockResource = get(); lockResource.setRes(Boolean.TRUE); set(lockResource); } } @Data public class LockInfo { private String lockName; private Long aitTime; private Long leaseTime; private LockType lockType; } @Data public class LockResource { private LockInfo lockInfo; private Lock lock; private Boolean res = false; public LockResource(LockInfo lockInfo,Lock lock){ this.lockInfo=lockInfo; this.lock=lock; } } @RequiredArgsConstructor @Slf4j public class LockInfoProvider { private final BusinessKeyProvider businessKeyProvider; private final LockNameProvider lockNameProvider; private final XlcpLockProperties xlcpLockProperties; public LockInfo getLockInfo(ProceedingJoinPoint point, XlcpLock xlcpLock){ String businessName = businessKeyProvider.get(point, xlcpLock); String lockName = lockNameProvider.getLockName(businessName, point, xlcpLock); LockInfo lockInfo = ne LockInfo(); lockInfo.setLockName(lockName); lockInfo.setLockType(xlcpLock.lockType()); lockInfo.setWaitTime(getWaitTime(xlcpLock)); Long leaseTime = getLeaseTime(xlcpLock); lockInfo.setLeaseTime(leaseTime); // 如果不设置锁的过期时间 在极端情况下容易形成死锁 if (leaseTime<0L){ log.error("锁过期时间设置错误,当前过期时间为:{}",leaseTime); thro ne LockException("锁过期时间设置错误"); } return lockInfo; } private Long getWaitTime(XlcpLock xlcpLock){ Long aitTime = xlcpLock.aitTime(); return aitTime.equals(Long.MIN_VALUE)?xlcpLockProperties.getWaitTime():aitTime; } private Long getLeaseTime(XlcpLock xlcpLock){ Long leaseTime = xlcpLock.leaseTime(); return leaseTime.equals(Long.MIN_VALUE)?xlcpLockProperties.getLeaseTime():leaseTime; } } public class LockNameProvider { public static final String LOCK_NAME_PREFIX = "xlcplock"; public String getLockName(String businessName, ProceedingJoinPoint point, XlcpLock xlcpLock){ String name = getName(point, xlcpLock); String lockName = String.format("%s%s%s%s",LOCK_NAME_PREFIX, StrPool.COLON,name,businessName); return lockName; } private String getName(ProceedingJoinPoint point, XlcpLock xlcpLock){ String name = xlcpLock.name(); if (StrUtil.isEmpty(name)){ MethodSignature signature = (MethodSignature) point.getSignature(); return String.format("%s%s",signature.getDeclaringTypeName(),signature.getMethod().getName()); } return name; } } public class BusinessKeyProvider { private ParameterNameDiscoverer nameDiscoverer = ne DefaultParameterNameDiscoverer(); public String get(ProceedingJoinPoint point, XlcpLock xlcpLock){ Method method = getMethod(point); // spel表达式解析 List spelDefinitionKeys = getSpelDefinitionKey(point, xlcpLock, method); return CollUtil.join(spelDefinitionKeys,"", StrPool.DASHED,""); } private Method getMethod(ProceedingJoinPoint point){ MethodSignature methodSignature = (MethodSignature) point.getSignature(); Method method = methodSignature.getMethod(); if (method.getDeclaringClass().isAnnotation()){ try { return point.getTarget().getClass().getDeclaredMethod(methodSignature.getName(),method.getParameterTypes()); } catch (NoSuchMethodException noSuchMethodException) { noSuchMethodException.printStackTrace(); } } return method; } private List getSpelDefinitionKey(ProceedingJoinPoint point, XlcpLock xlcpLock,Method method){ ArrayList spelDefinitionKeyList = ne ArrayList<>(); SpelExpressionParser parser = ne SpelExpressionParser(); MethodBasedEvaluationContext context = ne MethodBasedEvaluationContext(null, method, point.getArgs(), nameDiscoverer); Arrays.stream(xlcpLock.keys()).forEach(key->{ if (StrUtil.isNotEmpty(key)){ Expression expression = parser.parseExpression(key); spelDefinitionKeyList.add(ObjectUtils.nullSafeToString(expression.getValue(context))); } }); return spelDefinitionKeyList; } }
定义自动配置类
@Configuration @EnableConfigurationProperties({XlcpLockProperties.class, RedisProperties.class}) @AutoConfigureAfter(RedisAutoConfiguration.class) @ComponentScan(".zkjc.xlcp.mon.lock.lock") public class XlcpLockAutoConfiguration { @Autoired private XlcpLockProperties xlcpLockProperties; @SneakyThros @Bean(destroyMethod = "shutdon") @ConditionalOnMissingBean public RedissonClient redissonClient(RedisProperties redisProperties,XlcpLockProperties xlcpLockProperties){ RedisProperties.Cluster cluster = redisProperties.getCluster(); Config config = ne Config(); if (Objects.nonNull(cluster)){ // 集群模式 List三 客户端调用nodes = cluster.getNodes(); config.useClusterServers() .addNodeAddress(ArrayUtil.toArray(nodes, String.class)) .setPassord(redisProperties.getPassord()); }else { String host = StringUtil.endAt(redisProperties.getHost(), StrPool.COLON); String address = String.format("%s%s",host,redisProperties.getPort()); config.useSingleServer() .setAddress(address) .setPassord(redisProperties.getPassord()) .setDatabase(xlcpLockProperties.getDataBase()); } Codec codec=(Codec) ClassUtils.forName(xlcpLockProperties.getCodec(),ClassUtils.getDefaultClassLoader()).neInstance(); config.setCodec(codec); config.setEventLoopGroup(ne NioEventLoopGroup()); return Redisson.create(config); } @Bean public BusinessKeyProvider businessKeyProvider(){ return ne BusinessKeyProvider(); } @Bean public LockNameProvider lockNameProvider(){ return ne LockNameProvider(); } @Bean public LockInfoProvider lockInfoProvider(){ return ne LockInfoProvider(businessKeyProvider(),lockNameProvider(),xlcpLockProperties); } @Bean public XlcpLockAspect xlcpLockAspect(){ return ne XlcpLockAspect(lockInfoProvider()); } }