添加队列线程的redis实现

This commit is contained in:
b2baccline
2021-03-03 10:45:08 +08:00
parent aff93875ac
commit d8c6d7ed8b

View File

@@ -0,0 +1,143 @@
package com.hccake.ballcat.common.redis.thread;
import cn.hutool.core.util.StrUtil;
import com.hccake.ballcat.common.core.thread.AbstractQueueThread;
import com.hccake.ballcat.common.redis.RedisHelper;
import com.hccake.ballcat.common.util.JsonUtils;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
/**
* @see java.util.concurrent.LinkedBlockingDeque
* @author lingting 2021/3/2 21:09
*/
@Slf4j
public abstract class AbstractRedisThread<E> extends AbstractQueueThread<E> {
@Autowired
protected RedisHelper redisHelper;
/**
* 锁
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 激活与休眠线程
*/
private final Condition condition = lock.newCondition();
/**
* 获取数据存储的key
* @return java.lang.String
* @author lingting 2021-03-02 21:11
*/
public abstract String getKey();
/**
* 对象 转换成 string. 把String 存入redis
* @param e 对象
* @return java.lang.String
* @author lingting 2021-03-02 21:38
*/
protected String convertToString(@NotNull E e) {
return JsonUtils.toJson(e);
}
/**
* 获取目标对象的type , 即 E 的实际类型.
* @return java.lang.reflect.Type
* @author lingting 2021-03-02 21:41
*/
protected Type getObjType() {
Type superClass = getClass().getGenericSuperclass();
superClass = ((Class<?>) superClass).getGenericSuperclass();
return ((ParameterizedType) superClass).getActualTypeArguments()[0];
}
/**
* string 转换成 对象
* @param str string
* @return java.lang.String
* @author lingting 2021-03-02 21:38
*/
@Nullable
protected E convertToObj(String str) {
if (StrUtil.isBlank(str)) {
return null;
}
return JsonUtils.toObj(str, getObjType());
}
@Override
public void put(@NotNull E e) {
// 不插入空值
if (e != null) {
try {
lock.lockInterruptibly();
try {
RedisHelper.listRightPush(getKey(), convertToString(e));
// 激活线程
condition.signal();
}
finally {
lock.unlock();
}
}
catch (Exception ex) {
log.error("{} put Object error, param: {}", this.getClass().toString(), e, ex);
}
}
}
/**
* 从redis中获取数据
* @return java.lang.String
* @author lingting 2021-03-02 22:04
*/
protected String get() {
return RedisHelper.listLeftPop(getKey());
}
@Override
@Nullable
public E poll(long time) throws InterruptedException {
// 上锁
lock.lockInterruptibly();
try {
String pop = get();
if (StrUtil.isBlank(pop)) {
// 设置等待时长
long nanos = TimeUnit.MILLISECONDS.toNanos(time);
while (nanos > 0) {
// 休眠. 返回剩余的休眠时间
nanos = condition.awaitNanos(nanos);
// 被唤醒. 或者超时
pop = get();
// 获取到值
if (StrUtil.isNotBlank(pop)) {
// 结束循环. 返回值
break;
}
}
}
return convertToObj(pop);
}
finally {
lock.unlock();
}
}
}