update: code clean

This commit is contained in:
hongqiaowei
2021-01-27 23:00:26 +08:00
parent 6b21646f47
commit 6a27718b1a
7 changed files with 225 additions and 2 deletions

View File

@@ -17,16 +17,33 @@
package we.filter;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.Receiver;
import we.proxy.DiscoveryClientUriSelector;
import we.proxy.FizzWebClient;
import we.proxy.ServiceInstance;
import we.util.Constants;
import we.util.ThreadContext;
import we.util.WebUtils;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
/**
* @author hongqiaowei
@@ -36,15 +53,140 @@ import javax.annotation.Resource;
@Order(20)
public class CallbackFilter extends FizzWebFilter {
private static final Logger log = LoggerFactory.getLogger(CallbackFilter.class);
private static final Logger log = LoggerFactory.getLogger(CallbackFilter.class);
public static final String CALLBACK_FILTER = "callbackFilter";
public static final String CALLBACK_FILTER = "callbackFilter";
private static final String s2im = "$s2im";
private static final DataBuffer emptyBody = new NettyDataBufferFactory(new UnpooledByteBufAllocator(false, true)).wrap(Constants.Symbol.EMPTY.getBytes());
private static final String json = "json";
@Resource
private DiscoveryClientUriSelector discoveryClientSelector;
@Resource
private FizzWebClient fizzWebClient;
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
ApiConfig ac = WebUtils.getApiConfig(exchange);
if (ac != null && ac.type == ApiConfig.Type.CALLBACK) { // 由本filter处理并直接响应且不执行后续的filter等逻辑
ServerHttpRequest req = exchange.getRequest();
DataBuffer[] body = {null};
DataBufferUtils.join(req.getBody()).defaultIfEmpty(emptyBody)
.flatMap(
b -> {
body[0] = b;
String bodyStr = body[0].toString(StandardCharsets.UTF_8);
HashMap<String, ServiceInstance> service2instMap = getService2instMap(ac);
pushReq2manager(exchange, bodyStr, service2instMap);
// 如果是异步回调直接按apiconfig的配置响应并return
// 如果是同步调用,取出 receivers
/*
for (r : receivers) {
如果r 是服务发现fizzWebClient直接调用实例取service2instMap中的
如果是聚合接口需预处理再用fizzWebClient调用
多个r可以并行调用然后以第一个r的响应响应客户端
}
*/
return null;
}
)
.doFinally(
s -> {
if (body[0] != emptyBody) {
DataBufferUtils.release(body[0]);
}
}
)
;
}
return chain.filter(exchange);
}
private HashMap<String, ServiceInstance> getService2instMap(ApiConfig ac) {
HashMap<String, ServiceInstance> service2instMap = ThreadContext.getHashMap(s2im, String.class, ServiceInstance.class);
List<Receiver> receivers = ac.callbackConfig.receivers;
for (Receiver r : receivers) {
if (r.type == ApiConfig.Type.SERVICE_DISCOVERY) {
ServiceInstance inst = discoveryClientSelector.getNextInstance(r.service);
service2instMap.put(r.service, inst);
}
}
return service2instMap;
}
private static final String _id = "\"id\":";
private static final String _datetime = "\"datetime\":";
private static final String _origin = "\"origin\":";
private static final String _app = "\"app\":";
private static final String _method = "\"method\":";
private static final String _service = "\"service\":";
private static final String _path = "\"path\":";
private static final String _query = "\"query\":";
private static final String _headers = "\"headers\":";
private static final String _body = "\"body\":";
private static final String _receivers = "\"receivers\":";
private void pushReq2manager(ServerWebExchange exchange, String bodyStr, HashMap<String, ServiceInstance> service2instMap) {
// 请求推给manager请求头可能含前面插件执行的结果如midwe kafkathey redis
ServerHttpRequest req = exchange.getRequest();
StringBuilder b = ThreadContext.getStringBuilder();
// Long winStart = w.getStartTime();
// BigDecimal rps = w.getRps();
// double qps;
// if (rps == null) {
// qps = 0.00;
// } else {
// qps = rps.doubleValue();
// }
//
// AtomicLong totalBlock = key2totalBlockMap.get(String.format("%s%s", resource, winStart));
// Long totalBlockReqs = totalBlock != null ? totalBlock.get() : w.getBlockRequests();
//
b.append(Constants.Symbol.LEFT_BRACE);
b.append(_id); toJsonStringValue(b, req.getId()); b.append(Constants.Symbol.COMMA);
b.append(_datetime); b.append(System.currentTimeMillis()); b.append(Constants.Symbol.COMMA);
b.append(_origin); toJsonStringValue(b, WebUtils.getOriginIp(exchange)); b.append(Constants.Symbol.COMMA);
b.append(_app); toJsonStringValue(b, WebUtils.getAppId(exchange)); b.append(Constants.Symbol.COMMA);
b.append(_method); toJsonStringValue(b, req.getMethod().name()); b.append(Constants.Symbol.COMMA);
b.append(_service); toJsonStringValue(b, WebUtils.getClientService(exchange)); b.append(Constants.Symbol.COMMA);
b.append(_path); toJsonStringValue(b, WebUtils.getClientReqPath(exchange)); b.append(Constants.Symbol.COMMA);
b.append(_query); toJsonStringValue(b, WebUtils.getClientReqQuery(exchange)); b.append(Constants.Symbol.COMMA);
// b.append(_headers); toJsonStringValue(b, WebUtils.getClientReqQuery(exchange)); b.append(Constants.Symbol.COMMA);
if (!service2instMap.isEmpty()) {
String s = JSON.toJSONString(JSON.toJSONString(service2instMap));
b.append(_receivers); b.append(s); b.append(Constants.Symbol.COMMA);
}
MediaType contentType = req.getHeaders().getContentType();
if (contentType != null && contentType.getSubtype().equalsIgnoreCase(json)) {
b.append(_body); b.append(JSON.toJSONString(bodyStr));
} else {
b.append(_body); toJsonStringValue(b, bodyStr);
}
b.append(Constants.Symbol.RIGHT_BRACE);
// String msg = b.toString();
// if ("kafka".equals(dest)) { // for internal use
// log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
// } else {
// rt.convertAndSend(queue, msg).subscribe();
// }
// if (log.isDebugEnabled()) {
// log.debug("report " + toDP19(winStart) + " win10: " + msg);
// }
}
private static void toJsonStringValue(StringBuilder b, String value) {
b.append(Constants.Symbol.DOUBLE_QUOTE).append(value).append(Constants.Symbol.DOUBLE_QUOTE);
}
}

View File

@@ -2,6 +2,7 @@ package we.proxy;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;
import reactor.util.function.Tuple2;
/**
* The disable implementation of {@code DiscoveryClientUriSelector}, used when Nacos and Eureka discovery are not enabled.
@@ -16,4 +17,9 @@ public class DisableDiscoveryUriSelector implements DiscoveryClientUriSelector {
public String getNextUri(String service, String relativeUri) {
throw new RuntimeException("No " + service + " because discovery disabled", null, false, false) {};
}
@Override
public ServiceInstance getNextInstance(String service) {
throw new RuntimeException("No " + service + " because discovery disabled", null, false, false) {};
}
}

View File

@@ -13,4 +13,6 @@ public interface DiscoveryClientUriSelector {
* @return the uri for the next request
*/
String getNextUri(String service, String relativeUri);
ServiceInstance getNextInstance(String service);
}

View File

@@ -5,6 +5,8 @@ import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.shared.Applications;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import javax.annotation.Resource;
import java.util.ArrayList;
@@ -29,6 +31,12 @@ public class EurekaUriSelector extends AbstractDiscoveryClientUriSelector {
return buildUri(inst.getIPAddr(), inst.getPort(), relativeUri);
}
@Override
public ServiceInstance getNextInstance(String service) {
InstanceInfo inst = roundRobinChoose1instFrom(service);
return new ServiceInstance(inst.getIPAddr(), inst.getPort());
}
// private static List<InstanceInfo> aggrMemberInsts = new ArrayList<>();
// static {

View File

@@ -6,11 +6,14 @@ import com.alibaba.nacos.api.annotation.NacosInjected;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.netflix.appinfo.InstanceInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;
import javax.annotation.PostConstruct;
import java.util.Collections;
@@ -61,6 +64,12 @@ public class NacosUriSelector extends AbstractDiscoveryClientUriSelector {
return super.buildUri(instance.getIp(), instance.getPort(), relativeUri);
}
@Override
public ServiceInstance getNextInstance(String service) {
Instance inst = this.selectOneHealthyInstance(service);
return new ServiceInstance(inst.getIp(), inst.getPort());
}
private Instance selectOneHealthyInstance(String service) {
Instance instance = null;
try {

View File

@@ -0,0 +1,41 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.proxy;
import we.util.JacksonUtils;
/**
* @author hongqiaowei
*/
public class ServiceInstance {
public String ip;
public int port;
public ServiceInstance(String ip, int port) {
this.ip = ip;
this.port = port;
}
@Override
public String toString() {
return JacksonUtils.writeValueAsString(this);
}
}

View File

@@ -127,4 +127,19 @@ public abstract class ThreadContext {
}
return l;
}
public static <K, V> HashMap<K, V> getHashMap(String key, Class<K> kType, Class<V> vType) {
return getHashMap(key, kType, vType, true);
}
public static <K, V> HashMap<K, V> getHashMap(String key, Class<K> kType, Class<V> vType, boolean clear) {
HashMap<K, V> m = (HashMap<K, V>) get(key);
if (m == null) {
m = new HashMap<>();
set(key ,m);
} else if (clear) {
m.clear();
}
return m;
}
}