🎨 抽象 AbstractThread 类. 让下级自定义 poll和put方法.

This commit is contained in:
b2baccline
2021-03-02 21:08:33 +08:00
parent b8e2970435
commit ffc9034ccf
8 changed files with 185 additions and 118 deletions

View File

@@ -2,7 +2,7 @@ package com.hccake.ballcat.admin.modules.log.thread;
import com.hccake.ballcat.admin.modules.log.model.entity.AdminAccessLog;
import com.hccake.ballcat.admin.modules.log.service.AdminAccessLogService;
import com.hccake.ballcat.common.core.thread.AbstractQueueThread;
import com.hccake.ballcat.common.core.thread.AbstractBlockingQueueThread;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@@ -17,7 +17,7 @@ import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class AccessLogAdminSaveThread extends AbstractQueueThread<AdminAccessLog> {
public class AccessLogAdminSaveThread extends AbstractBlockingQueueThread<AdminAccessLog> {
private final AdminAccessLogService adminAccessLogService;
@@ -25,7 +25,7 @@ public class AccessLogAdminSaveThread extends AbstractQueueThread<AdminAccessLog
* 线程启动时的日志打印
*/
@Override
public void startLog() {
public void init() {
log.info("后台访问日志存储线程已启动===");
}
@@ -35,7 +35,7 @@ public class AccessLogAdminSaveThread extends AbstractQueueThread<AdminAccessLog
* @param list 后台访问日志列表
*/
@Override
public void errorLog(Throwable e, List<AdminAccessLog> list) {
public void error(Throwable e, List<AdminAccessLog> list) {
log.error("后台访问日志记录异常, [msg]:{}, [data]:{}", e.getMessage(), list);
}
@@ -44,7 +44,7 @@ public class AccessLogAdminSaveThread extends AbstractQueueThread<AdminAccessLog
* @param list 后台访问日志列表
*/
@Override
public void save(List<AdminAccessLog> list) throws Exception {
public void process(List<AdminAccessLog> list) throws Exception {
adminAccessLogService.saveBatchSomeColumn(list);
}

View File

@@ -0,0 +1,40 @@
package com.hccake.ballcat.common.core.thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
/**
* 抽象的线程类,主要用于汇聚详情数据 做一些基础的处理后 进行批量插入
*
* @author Hccake
*/
@Slf4j
public abstract class AbstractBlockingQueueThread<T> extends AbstractQueueThread<T> {
private final BlockingQueue<T> queue = new LinkedBlockingQueue<>();
public void putObject(T t) {
try {
if (t != null) {
queue.put(t);
}
}
catch (Exception e) {
log.error("{} putObject error, param: {}", this.getClass().toString(), t, e);
}
}
@Override
public void put(@NotNull T t) throws InterruptedException {
queue.put(t);
}
@Override
public T poll(long time) throws InterruptedException {
return queue.poll(time, TimeUnit.MILLISECONDS);
}
}

View File

@@ -1,48 +1,43 @@
package com.hccake.ballcat.common.core.thread;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.validation.constraints.NotNull;
import org.springframework.beans.factory.InitializingBean;
/**
* 抽象的线程类,主要用于汇聚详情数据 做一些基础的处理后 进行批量插入
* 顶级队列线程类
*
* @author Hccake
* @author lingting 2021/3/2 15:07
*/
@Slf4j
public abstract class AbstractQueueThread<T> extends Thread implements InitializingBean {
private final BlockingQueue<T> queue = new LinkedBlockingQueue<>();
private final static long DEFAULT_BATCH_SIZE = 500;
public abstract class AbstractQueueThread<E> extends Thread implements InitializingBean {
/**
* 默认时长 30秒单位 毫秒
* 默认缓存数据数量
*/
private final static int DEFAULT_BATCH_SIZE = 500;
/**
* 默认等待时长 30秒单位 毫秒
*/
private final static long DEFAULT_BATCH_TIMEOUT_MS = 30 * 1000L;
public void putObject(T t) {
try {
if (t != null) {
queue.put(t);
}
}
catch (Exception e) {
log.error("{} putObject error, param: {}", this.getClass().toString(), t, e);
}
}
/**
* 默认获取数据时的超时时间
*/
private final static long POLL_TIMEOUT_MS = 5 * 1000;
public long getBatchSize() {
/**
* 用于子类自定义缓存数据数量
* @return long
* @author lingting 2021-03-02 15:12
*/
public int getBatchSize() {
return DEFAULT_BATCH_SIZE;
}
/**
* 用于子类自定义时长
* 用于子类自定义等待时长
* @return 返回时长,单位毫秒
* @author lingting 2020-08-05 11:23:33
*/
@@ -50,91 +45,131 @@ public abstract class AbstractQueueThread<T> extends Thread implements Initializ
return DEFAULT_BATCH_TIMEOUT_MS;
}
/**
* 用于子类自定义 获取数据的超时时间
* @return 返回时长,单位毫秒
* @author lingting 2021-03-02 20:52
*/
public static long getPollTimeoutMs() {
return POLL_TIMEOUT_MS;
}
/**
* 往队列插入数据
* @param e 数据
* @exception InterruptedException 线程中断
* @author lingting 2021-03-02 15:09
*/
public abstract void put(@NotNull E e) throws InterruptedException;
/**
* 运行前执行初始化
* @author lingting 2021-03-02 15:14
*/
public void init() {
}
/**
* 是否可以继续运行
* @return boolean true 表示可以继续运行
* @author lingting 2021-03-02 15:17
*/
public boolean isRun() {
// 未被中断表示可以继续运行
return !isInterrupted();
}
/**
* 数据处理前执行
* @author lingting 2021-03-02 15:15
*/
public void preProcess() {
}
/**
* 从队列中取值
* @param time 等待时长, 单位 毫秒
* @return E
* @throws InterruptedException 线程中断
* @author lingting 2021-03-02 15:20
*/
public abstract E poll(long time) throws InterruptedException;
/**
* 处理接收的数据
* @param list 当前所有数据
* @param e 接收的数据
* @author lingting 2021-03-02 20:49
*/
public void receiveProcess(List<E> list, E e) {
list.add(e);
}
/**
* 处理所有已接收的数据
* @param list 所有已接收的数据
* @exception Exception 异常
* @author lingting 2021-03-02 20:53
*/
public abstract void process(List<E> list) throws Exception;
@Override
public void run() {
startLog();
while (!isInterrupted()) {
List<T> list = new ArrayList<>();
init();
List<E> list;
while (isRun()) {
list = new ArrayList<>(getBatchSize());
try {
preProcessor();
preProcess();
long timestamp = 0;
int count = 0;
long ts = 0;
int i = 0;
while (i < getBatchSize()) {
T tmp = null;
while (count < getBatchSize()) {
E e = null;
try {
tmp = queue.poll(5000, TimeUnit.MILLISECONDS);
e = poll(getPollTimeoutMs());
}
catch (InterruptedException e) {
e.printStackTrace();
catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
if (tmp != null) {
// 记录第一次添加数据后的时间
if (i++ == 0) {
ts = System.currentTimeMillis();
if (e != null) {
// 第一次插入数据
if (count++ == 0) {
// 记录时间
timestamp = System.currentTimeMillis();
}
// 处理
processor(list, tmp);
receiveProcess(list, e);
}
// 有数据 且从第一次插入数据已经过了 设定时间 则 执行插入
if (list.size() > 0 && System.currentTimeMillis() - ts >= getBatchTimeout()) {
// 已有数据 已超过设定的等待时间
if (list.size() > 0 && System.currentTimeMillis() - timestamp >= getBatchTimeout()) {
break;
}
}
save(list);
process(list);
}
catch (Throwable e) {
errorLog(e, list);
error(e, list);
}
}
}
/**
* 预处理方法 主要用于在每段数据处理前的一些成员变量初始化
* 发生异常时处理异常
* @param e 异常
* @param list 当时的数据
* @author lingting 2021-03-02 20:46
*/
public void preProcessor() {
public abstract void error(Throwable e, List<E> list);
}
/**
* 线程启动时的日志打印
*/
public abstract void startLog();
/**
* 错误日志打印
* @param e exception
* @param list error data
*/
public abstract void errorLog(Throwable e, List<T> list);
/**
* 数据处理
* @param list data list
* @param elem data
*/
public void processor(List<T> list, T elem) {
list.add(elem);
}
/**
* 数据保存
* @param list list
* @throws Exception 抛出可能的异常
*/
public abstract void save(List<T> list) throws Exception;
/**
* 初始化后启动
*/
@Override
public void afterPropertiesSet() {
this.start();
public void afterPropertiesSet() throws Exception {
// 默认配置线程名. 用来方便查询
setName(this.getClass().getSimpleName());
start();
}
}

View File

@@ -1,12 +1,11 @@
package com.hccake.starter.pay.viratual;
import com.hccake.ballcat.common.core.thread.AbstractQueueThread;
import live.lingting.virtual.currency.Transaction;
import lombok.extern.slf4j.Slf4j;
import javax.validation.constraints.NotNull;
import com.hccake.ballcat.common.core.thread.AbstractBlockingQueueThread;
import java.util.List;
import java.util.Optional;
import javax.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import live.lingting.virtual.currency.Transaction;
/**
* 用于校验交易的线程
@@ -14,10 +13,10 @@ import java.util.Optional;
* @author lingting 2021/1/5 11:08
*/
@Slf4j
public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends AbstractQueueThread<T> {
public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends AbstractBlockingQueueThread<T> {
@Override
public long getBatchSize() {
public int getBatchSize() {
// 校验对象不需要堆积太多, 10条直接处理
return 10;
}
@@ -79,7 +78,7 @@ public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends Abstr
public abstract void error(T obj, Throwable e);
@Override
public void preProcessor() {
public void preProcess() {
for (T obj : readCache()) {
// 把缓存中的所有数据插入线程
putObject(obj);
@@ -87,7 +86,7 @@ public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends Abstr
}
@Override
public void save(List<T> list) throws Exception {
public void process(List<T> list) throws Exception {
for (T obj : list) {
try {
handler(obj, getTransaction(obj));
@@ -98,10 +97,4 @@ public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends Abstr
}
}
@Override
public void afterPropertiesSet() {
setName(this.getClass().getSimpleName());
super.afterPropertiesSet();
}
}

View File

@@ -4,15 +4,14 @@ import com.hccake.ballcat.common.util.JsonUtils;
import com.hccake.sample.pay.virtual.domain.Result;
import com.hccake.sample.pay.virtual.entity.Order;
import com.hccake.starter.pay.viratual.AbstractVerifyThread;
import live.lingting.virtual.currency.Transaction;
import live.lingting.virtual.currency.enums.TransactionStatus;
import lombok.extern.slf4j.Slf4j;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import live.lingting.virtual.currency.Transaction;
import live.lingting.virtual.currency.enums.TransactionStatus;
/**
* 配置基本校验
@@ -25,12 +24,12 @@ public abstract class AbstractThread extends AbstractVerifyThread<Order, Result>
public final List<Order> CACHE = new ArrayList<>();
@Override
public long getBatchSize() {
public int getBatchSize() {
return 1;
}
@Override
public void errorLog(Throwable e, List<Order> list) {
public void error(Throwable e, List<Order> list) {
// 读取缓存 和 接收数据时出现异常执行此方法
log.error("读取缓存 和 接收数据时出现异常执行此方法, 数据: " + JsonUtils.toJson(list), e);
}

View File

@@ -20,7 +20,7 @@ public class EtherscanThread extends AbstractThread {
private final InfuraServiceImpl service;
@Override
public void startLog() {
public void init() {
log.debug("Etherscan 订单校验");
}

View File

@@ -21,7 +21,7 @@ public class OmniThread extends AbstractThread {
private final BtcOmniServiceImpl service;
@Override
public void startLog() {
public void init() {
log.debug("Omni 订单校验");
}

View File

@@ -20,7 +20,7 @@ public class TronscanThread extends AbstractThread {
private final TronscanServiceImpl service;
@Override
public void startLog() {
public void init() {
log.debug("Tronscan 订单校验");
}