From 600a67cc730619f5f71c765629d2df5f28dbb80b Mon Sep 17 00:00:00 2001 From: b2baccline <23131013+b2baccline@users.noreply.github.com> Date: Fri, 22 May 2020 19:49:55 +0800 Subject: [PATCH] =?UTF-8?q?:zap:=20=E4=BF=AE=E6=94=B9=E6=89=B9=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=A4=A7=E5=B0=8F=E5=92=8C=E8=B6=85=E6=97=B6=E6=97=B6?= =?UTF-8?q?=E9=97=B4=E8=8E=B7=E5=8F=96=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/thread/AbstractQueueThread.java | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/ballcat-common/ballcat-common-core/src/main/java/com/hccake/ballcat/common/core/thread/AbstractQueueThread.java b/ballcat-common/ballcat-common-core/src/main/java/com/hccake/ballcat/common/core/thread/AbstractQueueThread.java index caf0df0b..c1b8907e 100644 --- a/ballcat-common/ballcat-common-core/src/main/java/com/hccake/ballcat/common/core/thread/AbstractQueueThread.java +++ b/ballcat-common/ballcat-common-core/src/main/java/com/hccake/ballcat/common/core/thread/AbstractQueueThread.java @@ -13,27 +13,32 @@ import java.util.concurrent.TimeUnit; * 抽象的线程类,主要用于汇聚详情数据 * 做一些基础的处理后 * 进行批量插入 + * * @author Hccake */ @Slf4j public abstract class AbstractQueueThread extends Thread implements InitializingBean { - - private BlockingQueue queue = new LinkedBlockingQueue(); - - private static final int BATCH_SIZE = 100; - - private static final int BATCH_TIMEOUT = 30000; + private final BlockingQueue queue = new LinkedBlockingQueue(); + private final long defaultBatchSize = 100; + private final long defaultBatchTimeout = 30000; public void putObject(T t) { try { - if(t != null){ - queue.put(t); - } + if (t != null) { + queue.put(t); + } } catch (InterruptedException e) { e.printStackTrace(); } } + public long getBatchSize() { + return defaultBatchSize; + } + + public long getBatchTimeout() { + return defaultBatchTimeout; + } @Override public void run() { @@ -49,7 +54,7 @@ public abstract class AbstractQueueThread extends Thread implements Initializ long ts = 0; int i = 0; - while (i < BATCH_SIZE) { + while (i < getBatchSize()) { T tmp = null; try { tmp = queue.poll(5000, TimeUnit.MILLISECONDS); @@ -65,7 +70,7 @@ public abstract class AbstractQueueThread extends Thread implements Initializ processor(list, tmp); } //有数据 且从第一次插入数据已经过了 设定时间 则 执行插入 - if (list.size() > 0 && System.currentTimeMillis() - ts >= BATCH_TIMEOUT) { + if (list.size() > 0 && System.currentTimeMillis() - ts >= getBatchTimeout()) { break; } } @@ -77,7 +82,7 @@ public abstract class AbstractQueueThread extends Thread implements Initializ } } - private void preProcessor() { + private void preProcessor() { } @@ -88,12 +93,13 @@ public abstract class AbstractQueueThread extends Thread implements Initializ public abstract void startLog(); - /** - * 错误日志打印 + /** + * 错误日志打印 + * * @param e * @param list */ - public abstract void errorLog(Throwable e, List list); + public abstract void errorLog(Throwable e, List list); /** @@ -102,7 +108,7 @@ public abstract class AbstractQueueThread extends Thread implements Initializ * @param list * @param elem */ - public void processor(List list, T elem) { + public void processor(List list, T elem) { list.add(elem); } @@ -112,7 +118,7 @@ public abstract class AbstractQueueThread extends Thread implements Initializ * * @param list */ - public abstract void save(List list) throws Exception; + public abstract void save(List list) throws Exception; /**