修改批处理大小和超时时间获取方式

This commit is contained in:
b2baccline
2020-05-22 19:49:55 +08:00
parent f006d5cb72
commit 600a67cc73

View File

@@ -13,27 +13,32 @@ import java.util.concurrent.TimeUnit;
* 抽象的线程类,主要用于汇聚详情数据
* 做一些基础的处理后
* 进行批量插入
*
* @author Hccake
*/
@Slf4j
public abstract class AbstractQueueThread<T> extends Thread implements InitializingBean {
private BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
private static final int BATCH_SIZE = 100;
private static final int BATCH_TIMEOUT = 30000;
private final BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
private final long defaultBatchSize = 100;
private final long defaultBatchTimeout = 30000;
public void putObject(T t) {
try {
if(t != null){
queue.put(t);
}
if (t != null) {
queue.put(t);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public long getBatchSize() {
return defaultBatchSize;
}
public long getBatchTimeout() {
return defaultBatchTimeout;
}
@Override
public void run() {
@@ -49,7 +54,7 @@ public abstract class AbstractQueueThread<T> extends Thread implements Initializ
long ts = 0;
int i = 0;
while (i < BATCH_SIZE) {
while (i < getBatchSize()) {
T tmp = null;
try {
tmp = queue.poll(5000, TimeUnit.MILLISECONDS);
@@ -65,7 +70,7 @@ public abstract class AbstractQueueThread<T> extends Thread implements Initializ
processor(list, tmp);
}
//有数据 且从第一次插入数据已经过了 设定时间 则 执行插入
if (list.size() > 0 && System.currentTimeMillis() - ts >= BATCH_TIMEOUT) {
if (list.size() > 0 && System.currentTimeMillis() - ts >= getBatchTimeout()) {
break;
}
}
@@ -77,7 +82,7 @@ public abstract class AbstractQueueThread<T> extends Thread implements Initializ
}
}
private void preProcessor() {
private void preProcessor() {
}
@@ -88,12 +93,13 @@ public abstract class AbstractQueueThread<T> extends Thread implements Initializ
public abstract void startLog();
/**
* 错误日志打印
/**
* 错误日志打印
*
* @param e
* @param list
*/
public abstract void errorLog(Throwable e, List<T> list);
public abstract void errorLog(Throwable e, List<T> list);
/**
@@ -102,7 +108,7 @@ public abstract class AbstractQueueThread<T> extends Thread implements Initializ
* @param list
* @param elem
*/
public void processor(List<T> list, T elem) {
public void processor(List<T> list, T elem) {
list.add(elem);
}
@@ -112,7 +118,7 @@ public abstract class AbstractQueueThread<T> extends Thread implements Initializ
*
* @param list
*/
public abstract void save(List<T> list) throws Exception;
public abstract void save(List<T> list) throws Exception;
/**