🎨 虚拟货币处理使用redis队列线程

This commit is contained in:
b2baccline
2021-03-03 10:45:33 +08:00
parent d8c6d7ed8b
commit f3abd66137
8 changed files with 55 additions and 54 deletions

View File

@@ -30,5 +30,9 @@
<groupId>com.hccake</groupId>
<artifactId>ballcat-common-core</artifactId>
</dependency>
<dependency>
<groupId>com.hccake</groupId>
<artifactId>ballcat-spring-boot-starter-redis</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -1,6 +1,6 @@
package com.hccake.starter.pay.viratual;
import com.hccake.ballcat.common.core.thread.AbstractBlockingQueueThread;
import com.hccake.ballcat.common.redis.thread.AbstractRedisThread;
import java.util.List;
import java.util.Optional;
import javax.validation.constraints.NotNull;
@@ -13,7 +13,7 @@ import live.lingting.virtual.currency.Transaction;
* @author lingting 2021/1/5 11:08
*/
@Slf4j
public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends AbstractBlockingQueueThread<T> {
public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends AbstractRedisThread<T> {
@Override
public int getBatchSize() {
@@ -55,20 +55,6 @@ public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends Abstr
*/
public abstract void success(T obj, @NotNull Optional<Transaction> optional, R r);
/**
* 缓存校验对象
* @param obj 校验对象
* @author lingting 2021-01-05 11:17
*/
public abstract void cache(T obj);
/**
* 读取所有缓存的校验对象
* @return java.util.List<T>
* @author lingting 2021-01-05 11:17
*/
public abstract List<T> readCache();
/**
* 异常处理
* @param obj 校验对象
@@ -78,23 +64,18 @@ public abstract class AbstractVerifyThread<T extends VerifyObj, R> extends Abstr
public abstract void error(T obj, Throwable e);
@Override
public void preProcess() {
for (T obj : readCache()) {
// 把缓存中的所有数据插入线程
putObject(obj);
public void receiveProcess(List<T> list, T t) {
try {
// 收到就处理. 不再汇总处理
handler(t, getTransaction(t));
}
catch (Exception e) {
error(t, e);
}
}
@Override
public void process(List<T> list) throws Exception {
for (T obj : list) {
try {
handler(obj, getTransaction(obj));
}
catch (Throwable e) {
error(obj, e);
}
}
}
}

View File

@@ -21,6 +21,10 @@
<groupId>com.hccake</groupId>
<artifactId>ballcat-common-conf</artifactId>
</dependency>
<dependency>
<groupId>com.hccake</groupId>
<artifactId>ballcat-spring-boot-starter-redis</artifactId>
</dependency>
<dependency>
<groupId>com.hccake</groupId>
<artifactId>ballcat-spring-boot-starter-pay</artifactId>

View File

@@ -3,10 +3,11 @@ package com.hccake.sample.pay.virtual.thread;
import com.hccake.ballcat.common.util.JsonUtils;
import com.hccake.sample.pay.virtual.domain.Result;
import com.hccake.sample.pay.virtual.entity.Order;
import com.hccake.sample.pay.virtual.enums.Contract;
import com.hccake.starter.pay.viratual.AbstractVerifyThread;
import java.lang.reflect.Type;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
@@ -21,13 +22,22 @@ import live.lingting.virtual.currency.enums.TransactionStatus;
@Slf4j
public abstract class AbstractThread extends AbstractVerifyThread<Order, Result> {
public final List<Order> CACHE = new ArrayList<>();
@Override
public int getBatchSize() {
return 1;
}
/**
* 如果默认程序无法正确解析你的数据. 可自定义类型. 或者重写 {@link this#convertToObj(String)} 方法 自定义转换
*
* 可重写 {@link this#convertToString(Object)} 方法. 自定义缓存数据
* @author lingting 2021-03-03 10:29
*/
@Override
protected Type getObjType() {
return super.getObjType();
}
@Override
public void error(Throwable e, List<Order> list) {
// 读取缓存 和 接收数据时出现异常执行此方法
@@ -50,7 +60,7 @@ public abstract class AbstractThread extends AbstractVerifyThread<Order, Result>
}
// 没有超过限时, 缓存
else {
cache(obj);
put(obj);
}
return;
}
@@ -59,7 +69,7 @@ public abstract class AbstractThread extends AbstractVerifyThread<Order, Result>
if (transaction.getStatus() == TransactionStatus.WAIT) {
// 交易需要等待继续查询
cache(obj);
put(obj);
}
// 交易失败
else if (transaction.getStatus() == TransactionStatus.FAIL) {
@@ -74,7 +84,7 @@ public abstract class AbstractThread extends AbstractVerifyThread<Order, Result>
failed(obj, optional, new Result("收款地址异常"));
}
// 收款货币类型验证
else if (obj.getContract() != transaction.getContract()) {
else if (obj.getContract() != Contract.USDT) {
failed(obj, optional, new Result("收款货币类型异常"));
}
@@ -98,7 +108,7 @@ public abstract class AbstractThread extends AbstractVerifyThread<Order, Result>
/*
* 例如 放入缓存等待下次处理.
*/
cache(obj);
put(obj);
/*
* 例如 直接按照充值失败进行结算
*/
@@ -117,19 +127,4 @@ public abstract class AbstractThread extends AbstractVerifyThread<Order, Result>
!optional.isPresent() ? "null" : JsonUtils.toJson(optional.get()), JsonUtils.toJson(verifyResult));
}
@Override
public void cache(Order obj) {
CACHE.add(obj);
}
@Override
public List<Order> readCache() {
/*
* 这里使用 list 缓存是因为只是样例, 生产环境不建议
*/
List<Order> list = new ArrayList<>(CACHE);
CACHE.clear();
return list;
}
}

View File

@@ -36,4 +36,9 @@ public class EtherscanThread extends AbstractThread {
}
}
@Override
public String getKey() {
return "virtual:currency:etherscan";
}
}

View File

@@ -2,13 +2,12 @@ package com.hccake.sample.pay.virtual.thread;
import com.hccake.ballcat.common.util.JsonUtils;
import com.hccake.sample.pay.virtual.entity.Order;
import live.lingting.virtual.currency.Transaction;
import live.lingting.virtual.currency.service.impl.BtcOmniServiceImpl;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Optional;
import live.lingting.virtual.currency.Transaction;
import live.lingting.virtual.currency.service.impl.BtcOmniServiceImpl;
/**
* @author lingting 2021/1/5 15:22
@@ -37,4 +36,9 @@ public class OmniThread extends AbstractThread {
}
}
@Override
public String getKey() {
return "virtual:currency:omni";
}
}

View File

@@ -36,4 +36,9 @@ public class TronscanThread extends AbstractThread {
}
}
@Override
public String getKey() {
return "virtual:currency:tronscan";
}
}

View File

@@ -1,6 +1,9 @@
spring:
application:
name: 支付演示
redis:
host: 192.168.1.3
port: 50300
ballcat:
pay: