新增DistributeLock (#128)

*  新增DistributeLock
Co-authored-by: huyuanzhi <hyz_sound@163.com>
This commit is contained in:
b2baccline
2021-11-20 00:59:17 +08:00
parent 6629129876
commit 7b5e0f3902
7 changed files with 294 additions and 19 deletions

View File

@@ -4,6 +4,7 @@ import com.hccake.ballcat.common.redis.config.CachePropertiesHolder;
import com.hccake.ballcat.common.redis.core.annotation.CacheDel;
import com.hccake.ballcat.common.redis.core.annotation.CachePut;
import com.hccake.ballcat.common.redis.core.annotation.Cached;
import com.hccake.ballcat.common.redis.lock.DistributedLock;
import com.hccake.ballcat.common.redis.operation.CacheDelOps;
import com.hccake.ballcat.common.redis.operation.CachedOps;
import com.hccake.ballcat.common.redis.operation.CachePutOps;
@@ -155,29 +156,18 @@ public class CacheStringAspect {
}
// 2.==========如果缓存为空 则需查询数据库并更新===============
Object dbData = null;
// 尝试获取锁,只允许一个线程更新缓存
String reqId = UUID.randomUUID().toString();
if (CacheLock.lock(ops.lockKey(), reqId)) {
// 有可能其他线程已经更新缓存,这里再次判断缓存是否为空
cacheData = cacheQuery.get();
if (cacheData == null) {
cacheData = DistributedLock.<String>builder().action(ops.lockKey(), () -> {
String cacheValue = cacheQuery.get();
if (cacheValue == null) {
// 从数据库查询数据
dbData = ops.joinPoint().proceed();
Object dbValue = ops.joinPoint().proceed();
// 如果数据库中没数据填充一个String防止缓存击穿
cacheData = dbData == null ? CachePropertiesHolder.nullValue() : cacheSerializer.serialize(dbData);
cacheValue = dbValue == null ? CachePropertiesHolder.nullValue() : cacheSerializer.serialize(dbValue);
// 设置缓存
ops.cachePut().accept(cacheData);
ops.cachePut().accept(cacheValue);
}
// 解锁
CacheLock.releaseLock(ops.lockKey(), reqId);
// 返回数据
return dbData;
}
else {
cacheData = cacheQuery.get();
}
return cacheValue;
}).fail(cacheQuery).lock();
// 自旋时间内未获取到锁或者数据库中数据为空返回null
if (cacheData == null || ops.nullValue(cacheData)) {
return null;

View File

@@ -0,0 +1,18 @@
package com.hccake.ballcat.common.redis.lock;
import com.hccake.ballcat.common.redis.lock.function.ThrowingSupplier;
/**
* @author huyuanzhi 锁住的方法
* @param <T> 返回类型
*/
public interface Action<T> {
/**
* 执行方法
* @param supplier 执行方法
* @return 状态处理器
*/
StateHandler<T> action(String lockKey, ThrowingSupplier<? extends T> supplier);
}

View File

@@ -0,0 +1,110 @@
package com.hccake.ballcat.common.redis.lock;
import cn.hutool.core.lang.Assert;
import com.hccake.ballcat.common.redis.core.CacheLock;
import com.hccake.ballcat.common.redis.lock.function.ThrowingSupplier;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author huyuanzhi
* @version 1.0
* @date 2021/11/16 分布式锁操作类
*/
public final class DistributedLock<T> implements Action<T>, StateHandler<T> {
Object result;
String key;
ThrowingSupplier<? extends T> executeAction;
Function<? super T, ? extends T> successAction;
Supplier<? extends T> lockFailAction;
Consumer<? super Throwable> exceptionAction;
private DistributedLock() {
this.exceptionAction = this::throwException;
}
public static <T> Action<T> builder() {
return new DistributedLock<>();
}
public StateHandler<T> action(String lockKey, ThrowingSupplier<? extends T> action) {
Assert.isTrue(this.executeAction == null, "execute action has been already set");
Assert.notNull(action, "execute action cant be null");
Assert.notBlank(lockKey, "lock key cant be blank");
this.executeAction = action;
this.key = lockKey;
return this;
}
public StateHandler<T> success(Function<? super T, ? extends T> action) {
Assert.isTrue(this.successAction == null, "success action has been already set");
Assert.notNull(action, "success action cant be null");
this.successAction = action;
return this;
}
public StateHandler<T> fail(Supplier<? extends T> action) {
Assert.isTrue(this.lockFailAction == null, "lock fail action has been already set");
Assert.notNull(action, "lock fail action cant be null");
this.lockFailAction = action;
return this;
}
public StateHandler<T> exception(Consumer<? super Throwable> action) {
Assert.notNull(action, "exception action cant be null");
this.exceptionAction = action;
return this;
}
@SuppressWarnings("unchecked")
public T lock() {
String requestId = UUID.randomUUID().toString();
if (Boolean.TRUE.equals(CacheLock.lock(this.key, requestId))) {
T ret = null;
boolean exResolved = false;
try {
ret = executeAction.get();
this.setValue(ret);
}
catch (Throwable e) {
handleException(e);
exResolved = true;
}
finally {
CacheLock.releaseLock(this.key, requestId);
}
if (!exResolved && this.successAction != null) {
this.setValue(this.successAction.apply(ret));
}
}
else {
if (lockFailAction != null) {
this.setValue(lockFailAction.get());
}
}
return (T) this.result;
}
private void handleException(Throwable e) {
this.exceptionAction.accept(e);
}
private void setValue(Object result) {
this.result = result;
}
@SuppressWarnings("unchecked")
private <E extends Throwable> void throwException(Throwable t) throws E {
throw (E) t;
}
}

View File

@@ -0,0 +1,39 @@
package com.hccake.ballcat.common.redis.lock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
/**
* @author huyuanzhi 状态处理器
* @param <T> 返回类型
*/
public interface StateHandler<T> {
/**
* 获取锁成功,业务方法执行成功回调
* @param action 回调方法引用
* @return 状态处理器
*/
StateHandler<T> success(Function<? super T, ? extends T> action);
/**
* 获取锁失败回调
* @param action 回调方法引用
* @return 状态处理器
*/
StateHandler<T> fail(Supplier<? extends T> action);
/**
* 获取锁成功,执行业务方法异常回调
* @param action 回调方法引用
* @return 状态处理器
*/
StateHandler<T> exception(Consumer<? super Throwable> action);
/**
* 终态,获取锁
*/
T lock();
}

View File

@@ -0,0 +1,12 @@
package com.hccake.ballcat.common.redis.lock.function;
public interface ThrowingSupplier<T> {
/**
* 可抛异常的supplier
* @return T
* @throws Throwable 异常
*/
T get() throws Throwable;
}

View File

@@ -0,0 +1,36 @@
package com.hccake.ballcat.common.redis.test;
import com.hccake.ballcat.common.redis.lock.DistributedLock;
import org.junit.jupiter.api.Test;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import java.io.IOException;
/**
* @author huyuanzhi
* @version 1.0
* @date 2021/11/16
*/
@SpringJUnitConfig(RedisConfiguration.class)
class DistributeLockTest {
String lockKey = "ballcat";
@Test
void testSuccess() {
String lock = DistributedLock.<String>builder().action(lockKey, () -> lockKey).success(ret -> ret + ret).lock();
System.out.println(lock);
}
@Test
void testException() {
String lock = DistributedLock.<String>builder().action(lockKey, this::get).success(ret -> ret + ret)
.exception(e -> System.out.println("发生异常了")).lock();
System.out.println(lock);
}
String get() throws IOException {
throw new IOException();
}
}

View File

@@ -0,0 +1,70 @@
package com.hccake.ballcat.common.redis.test;
import com.hccake.ballcat.common.redis.config.CacheProperties;
import com.hccake.ballcat.common.redis.config.CachePropertiesHolder;
import com.hccake.ballcat.common.redis.core.CacheLock;
import com.hccake.ballcat.common.redis.prefix.IRedisPrefixConverter;
import com.hccake.ballcat.common.redis.prefix.impl.DefaultRedisPrefixConverter;
import com.hccake.ballcat.common.redis.serialize.PrefixStringRedisSerializer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* @author huyuanzhi
* @version 1.0
* @date 2021/11/16
*/
@Configuration
@EnableConfigurationProperties(CacheProperties.class)
public class RedisConfiguration {
private static final String host = "localhost";
private static final int port = 6379;
@Bean
public CachePropertiesHolder cachePropertiesHolder(CacheProperties cacheProperties) {
CachePropertiesHolder cachePropertiesHolder = new CachePropertiesHolder();
cachePropertiesHolder.setCacheProperties(cacheProperties);
return cachePropertiesHolder;
}
@Bean
public RedisConnectionFactory redisConnectionFactory(){
LettuceConnectionFactory factory = new LettuceConnectionFactory(host,port);
factory.afterPropertiesSet();
return factory;
}
@Bean
@ConditionalOnClass(IRedisPrefixConverter.class)
@ConditionalOnMissingBean
public StringRedisTemplate stringRedisTemplate(IRedisPrefixConverter redisPrefixConverter,
RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
template.setKeySerializer(new PrefixStringRedisSerializer(redisPrefixConverter));
return template;
}
@Bean
public CacheLock cacheLock(StringRedisTemplate stringRedisTemplate) {
CacheLock cacheLock = new CacheLock();
cacheLock.setStringRedisTemplate(stringRedisTemplate);
return cacheLock;
}
@Bean
@ConditionalOnMissingBean(IRedisPrefixConverter.class)
public IRedisPrefixConverter redisPrefixConverter() {
return new DefaultRedisPrefixConverter("ballcat");
}
}