AbstractQueueThread 添加程序关闭时的处理方法.

This commit is contained in:
b2baccline
2021-03-09 11:11:41 +08:00
parent e8e45eadd1
commit e662c4a957
6 changed files with 117 additions and 16 deletions

View File

@@ -1,18 +1,23 @@
package com.hccake.ballcat.common.core.thread; package com.hccake.ballcat.common.core.thread;
import org.springframework.beans.factory.InitializingBean; import com.hccake.ballcat.common.util.JsonUtils;
import org.springframework.lang.Nullable;
import javax.validation.constraints.NotNull;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.lang.Nullable;
/** /**
* 顶级队列线程类 * 顶级队列线程类
* *
* @author lingting 2021/3/2 15:07 * @author lingting 2021/3/2 15:07
*/ */
public abstract class AbstractQueueThread<E> extends Thread implements InitializingBean { @Slf4j
public abstract class AbstractQueueThread<E> extends Thread
implements InitializingBean, ApplicationListener<ContextClosedEvent> {
/** /**
* 默认缓存数据数量 * 默认缓存数据数量
@@ -133,7 +138,7 @@ public abstract class AbstractQueueThread<E> extends Thread implements Initializ
e = poll(getPollTimeoutMs()); e = poll(getPollTimeoutMs());
} }
catch (InterruptedException interruptedException) { catch (InterruptedException interruptedException) {
interruptedException.printStackTrace(); log.error("{} 类的线程被中断!id: {}", getClass().getSimpleName(), getId());
} }
if (e != null) { if (e != null) {
@@ -150,7 +155,13 @@ public abstract class AbstractQueueThread<E> extends Thread implements Initializ
break; break;
} }
} }
process(list);
if (!isRun()) {
shutdownHandler(list);
}
else {
process(list);
}
} }
catch (Throwable e) { catch (Throwable e) {
error(e, list); error(e, list);
@@ -173,4 +184,35 @@ public abstract class AbstractQueueThread<E> extends Thread implements Initializ
start(); start();
} }
@Override
public void onApplicationEvent(ContextClosedEvent event) {
log.warn("{} 类的线程开始关闭! id: {} ", getClass().getSimpleName(), getId());
// 执行关闭方法
shutdown();
}
/**
* 线程关闭时执行
* @author lingting 2021-03-08 22:25
*/
public void shutdown() {
// 通过中断线程唤醒当前线程. 让线程进入 shutdownHandler 方法处理数据
this.interrupt();
}
/**
* 线程被中断后的处理. 如果有缓存手段可以让数据进入缓存.
* @param list 当前数据
* @author lingting 2021-03-08 22:40
*/
public void shutdownHandler(List<E> list) {
try {
log.error("{} 类 线程: {} 被关闭. 数据:{}", this.getClass().getSimpleName(), getId(), JsonUtils.toJson(list));
}
catch (Throwable e) {
log.error("{} 类 线程: {} 被关闭. 数据:{}", this.getClass().getSimpleName(), getId(), list);
}
}
} }

View File

@@ -66,8 +66,14 @@ public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends Abstr
@Override @Override
public void receiveProcess(List<T> list, T t) { public void receiveProcess(List<T> list, T t) {
try { try {
// 收到就处理. 不再汇总处理 if (isRun()) {
handler(t, getTransaction(t)); // 收到就处理. 不再汇总处理. 减少数据丢失的问题
handler(t, getTransaction(t));
}
else {
// 结束运行插入redis
put(t);
}
} }
catch (Exception e) { catch (Exception e) {
error(t, e); error(t, e);

View File

@@ -47,4 +47,13 @@
<artifactId>spring-boot-starter-actuator</artifactId> <artifactId>spring-boot-starter-actuator</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
</plugin>
</plugins>
</build>
</project> </project>

View File

@@ -24,7 +24,7 @@ public abstract class AbstractThread extends AbstractVerifyThread<Order, Result>
@Override @Override
public int getBatchSize() { public int getBatchSize() {
return 1; return 10;
} }
/** /**

View File

@@ -8,14 +8,13 @@ spring:
ballcat: ballcat:
pay: pay:
bitcoin: bitcoin:
omni:
endpoints: mainnet
enabled: true enabled: true
endpoints: mainnet
ethereum: ethereum:
infura: infura:
endpoints: mainnet
# 正式运营请使用自己的 project id, 这个随时可能失效 # 正式运营请使用自己的 project id, 这个随时可能失效
project-id: b6066b4cfce54e7384ea38d52f9260ac project-id: b6066b4cfce54e7384ea38d52f9260ac
endpoints: mainnet
tronscan: tronscan:
endpoints: mainnet endpoints: mainnet
enabled: true enabled: true

View File

@@ -6,6 +6,7 @@ import com.hccake.ballcat.common.redis.RedisHelper;
import com.hccake.ballcat.common.util.JsonUtils; import com.hccake.ballcat.common.util.JsonUtils;
import java.lang.reflect.ParameterizedType; import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@@ -24,15 +25,20 @@ public abstract class AbstractRedisThread<E> extends AbstractQueueThread<E> {
@Autowired @Autowired
protected RedisHelper redisHelper; protected RedisHelper redisHelper;
/**
* 是否正在运行
*/
protected boolean run = true;
/** /**
* 锁 * 锁
*/ */
private final ReentrantLock lock = new ReentrantLock(); protected final ReentrantLock lock = new ReentrantLock();
/** /**
* 激活与休眠线程 * 激活与休眠线程
*/ */
private final Condition condition = lock.newCondition(); protected final Condition condition = lock.newCondition();
/** /**
* 获取数据存储的key * 获取数据存储的key
@@ -83,6 +89,7 @@ public abstract class AbstractRedisThread<E> extends AbstractQueueThread<E> {
try { try {
lock.lockInterruptibly(); lock.lockInterruptibly();
try { try {
// 线程被中断后无法执行Redis命令
RedisHelper.listRightPush(getKey(), convertToString(e)); RedisHelper.listRightPush(getKey(), convertToString(e));
// 激活线程 // 激活线程
condition.signal(); condition.signal();
@@ -92,7 +99,7 @@ public abstract class AbstractRedisThread<E> extends AbstractQueueThread<E> {
} }
} }
catch (Exception ex) { catch (Exception ex) {
log.error("{} put Object error, param: {}", this.getClass().toString(), e, ex); log.error("{} put error, param: {}", this.getClass().toString(), e, ex);
} }
} }
} }
@@ -109,6 +116,10 @@ public abstract class AbstractRedisThread<E> extends AbstractQueueThread<E> {
@Override @Override
@Nullable @Nullable
public E poll(long time) throws InterruptedException { public E poll(long time) throws InterruptedException {
if (!isRun()) {
// 停止运行时返回null, 让数据待在redis里面
return null;
}
// 上锁 // 上锁
lock.lockInterruptibly(); lock.lockInterruptibly();
try { try {
@@ -125,6 +136,11 @@ public abstract class AbstractRedisThread<E> extends AbstractQueueThread<E> {
// 被唤醒. 或者超时 // 被唤醒. 或者超时
pop = get(); pop = get();
// 如果不运行了
if (!isRun()) {
break;
}
// 获取到值 // 获取到值
if (StrUtil.isNotBlank(pop)) { if (StrUtil.isNotBlank(pop)) {
// 结束循环. 返回值 // 结束循环. 返回值
@@ -140,4 +156,33 @@ public abstract class AbstractRedisThread<E> extends AbstractQueueThread<E> {
} }
@Override
public void shutdown() {
// 修改运行标志
run = false;
lock.lock();
try {
condition.signalAll();
}
finally {
lock.unlock();
}
}
@Override
public void shutdownHandler(List<E> list) {
log.warn("{} 线程被关闭! id: {}", getClass().getSimpleName(), getId());
for (E e : list) {
// 所有数据插入redis
put(e);
log.error("{}", e);
}
}
@Override
public boolean isRun() {
// 运行中 且 未被中断
return run && !isInterrupted();
}
} }