update: push callback req to manager

This commit is contained in:
hongqiaowei
2021-01-28 11:27:06 +08:00
parent a291e6f8e5
commit 2cf0bd768f

View File

@@ -18,13 +18,16 @@
package we.filter; package we.filter;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.http.HttpHeaders; import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
@@ -32,6 +35,8 @@ import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain; import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig; import we.plugin.auth.ApiConfig;
import we.plugin.auth.Receiver; import we.plugin.auth.Receiver;
import we.proxy.DiscoveryClientUriSelector; import we.proxy.DiscoveryClientUriSelector;
@@ -45,7 +50,6 @@ import javax.annotation.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* @author hongqiaowei * @author hongqiaowei
@@ -71,6 +75,17 @@ public class CallbackFilter extends FizzWebFilter {
@Resource @Resource
private FizzWebClient fizzWebClient; private FizzWebClient fizzWebClient;
@NacosValue(value = "${callback.push.dest:redis}", autoRefreshed = true)
@Value("${callback.push.dest:redis}")
private String dest;
@NacosValue(value = "${callback.push.queue:fizz_callback_channel}", autoRefreshed = true)
@Value("${callback.push.queue:fizz_callback_channel}")
private String queue;
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
private ReactiveStringRedisTemplate rt;
@Override @Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) { public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
@@ -139,20 +154,7 @@ public class CallbackFilter extends FizzWebFilter {
private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, String bodyStr, HashMap<String, ServiceInstance> service2instMap) { private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, String bodyStr, HashMap<String, ServiceInstance> service2instMap) {
ServerHttpRequest req = exchange.getRequest(); ServerHttpRequest req = exchange.getRequest();
StringBuilder b = ThreadContext.getStringBuilder(); StringBuilder b = ThreadContext.getStringBuilder();
// Long winStart = w.getStartTime();
// BigDecimal rps = w.getRps();
// double qps;
// if (rps == null) {
// qps = 0.00;
// } else {
// qps = rps.doubleValue();
// }
//
// AtomicLong totalBlock = key2totalBlockMap.get(String.format("%s%s", resource, winStart));
// Long totalBlockReqs = totalBlock != null ? totalBlock.get() : w.getBlockRequests();
//
b.append(Constants.Symbol.LEFT_BRACE); b.append(Constants.Symbol.LEFT_BRACE);
b.append(_id); toJsonStringValue(b, req.getId()); b.append(Constants.Symbol.COMMA); b.append(_id); toJsonStringValue(b, req.getId()); b.append(Constants.Symbol.COMMA);
@@ -180,15 +182,15 @@ public class CallbackFilter extends FizzWebFilter {
} }
b.append(Constants.Symbol.RIGHT_BRACE); b.append(Constants.Symbol.RIGHT_BRACE);
// String msg = b.toString(); String msg = b.toString();
// if ("kafka".equals(dest)) { // for internal use if ("kafka".equals(dest)) { // for internal use
// log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue)); log.warn(msg, LogService.HANDLE_STGY, LogService.toKF(queue));
// } else { } else {
// rt.convertAndSend(queue, msg).subscribe(); rt.convertAndSend(queue, msg).subscribe();
// } }
// if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
// log.debug("report " + toDP19(winStart) + " win10: " + msg); log.debug("push callback req: " + msg);
// } }
} }
private static void toJsonStringValue(StringBuilder b, String value) { private static void toJsonStringValue(StringBuilder b, String value) {