update: push callback req to manager

This commit is contained in:
hongqiaowei
2021-01-28 19:49:28 +08:00
parent 23755143be
commit 0a619d3824

View File

@@ -20,6 +20,7 @@ package we.filter;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.config.annotation.NacosValue; import com.alibaba.nacos.api.config.annotation.NacosValue;
import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@@ -34,8 +35,11 @@ import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.BodyExtractors;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain; import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig; import we.config.AggregateRedisConfig;
import we.flume.clients.log4j2appender.LogService; import we.flume.clients.log4j2appender.LogService;
@@ -51,6 +55,7 @@ import we.util.WebUtils;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@@ -94,7 +99,7 @@ public class CallbackFilter extends FizzWebFilter {
ApiConfig ac = WebUtils.getApiConfig(exchange); ApiConfig ac = WebUtils.getApiConfig(exchange);
CallbackConfig cc = ac.callbackConfig; CallbackConfig cc = ac.callbackConfig;
if (ac != null && ac.type == ApiConfig.Type.CALLBACK) { // 由本filter处理并直接响应且不执行后续的filter等逻辑 if (ac != null && ac.type == ApiConfig.Type.CALLBACK) {
ServerHttpRequest req = exchange.getRequest(); ServerHttpRequest req = exchange.getRequest();
DataBuffer[] body = {null}; DataBuffer[] body = {null};
return return
@@ -105,22 +110,12 @@ public class CallbackFilter extends FizzWebFilter {
String bodyStr = body[0].toString(StandardCharsets.UTF_8); String bodyStr = body[0].toString(StandardCharsets.UTF_8);
HashMap<String, ServiceInstance> service2instMap = getService2instMap(ac); HashMap<String, ServiceInstance> service2instMap = getService2instMap(ac);
HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange); HttpHeaders headers = WebUtils.mergeAppendHeaders(exchange);
pushReq2manager(exchange, headers, bodyStr, service2instMap); pushReq2manager(exchange, headers, bodyStr, service2instMap);
if (cc.type == CallbackConfig.Type.ASYNC || StringUtils.isNotBlank(cc.respBody)) {
if (cc.type == CallbackConfig.Type.ASYNC) { return directResponse(exchange, cc);
return asyncResponse(exchange, cc); } else {
return requestBackends(exchange, headers, body[0], cc, service2instMap);
} }
// 如果是同步调用,取出 receivers
/*
for (r : receivers) {
如果r 是服务发现fizzWebClient直接调用实例取service2instMap中的
如果是聚合接口需预处理再用fizzWebClient调用
多个r可以并行调用然后以第一个r的响应响应客户端
}
*/
return null;
} }
) )
.doFinally( .doFinally(
@@ -135,7 +130,95 @@ public class CallbackFilter extends FizzWebFilter {
return chain.filter(exchange); return chain.filter(exchange);
} }
private Mono<Void> asyncResponse(ServerWebExchange exchange, CallbackConfig cc) { private Mono<? extends Void> requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc, HashMap<String, ServiceInstance> service2instMap) {
ServerHttpRequest req = exchange.getRequest();
int rs = cc.receivers.size();
Mono<Object>[] monos = new Mono[rs]; // cant Mono[] ?
for (int i = 0; i < rs; i++) {
Receiver r = cc.receivers.get(i);
Mono send;
if (r.type == ApiConfig.Type.SERVICE_DISCOVERY) {
ServiceInstance si = service2instMap.get(r.service);
if (si == null) {
send = fizzWebClient.proxySend2service(req.getId(), req.getMethod(), r.service, r.path, headers, body);
} else {
String uri = buildUri(req, si, r.path);
send = fizzWebClient.send(req.getId(), req.getMethod(), uri, headers, body);
}
} else {
// 如果是聚合接口需预处理再用fizzWebClient调用
send = fizzWebClient.send(req.getId(), req.getMethod(), "xxx", headers, body);
}
monos[i] = send;
}
return Flux.mergeSequential(monos)
.reduce(
new ArrayList<Object>(),
(respCollector, resp) -> {
respCollector.add(resp);
return respCollector;
}
)
.flatMap(
resps -> {
for (int i = 1; i < resps.size(); i++) {
// complete and release resp
}
Object o = resps.get(0);
if (o instanceof ClientResponse) {
ClientResponse remoteResp = (ClientResponse) o;
return genServerResponse(exchange, remoteResp);
} else {
return Mono.empty();
} // else AggregateResult
}
)
;
}
private Mono<? extends Void> genServerResponse(ServerWebExchange exchange, ClientResponse remoteResp) {
ServerHttpResponse clientResp = exchange.getResponse();
clientResp.setStatusCode(remoteResp.statusCode());
HttpHeaders clientRespHeaders = clientResp.getHeaders();
HttpHeaders remoteRespHeaders = remoteResp.headers().asHttpHeaders();
remoteRespHeaders.entrySet().forEach(
h -> {
String k = h.getKey();
if (clientRespHeaders.containsKey(k)) {
if (k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_ORIGIN) || k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_CREDENTIALS)
|| k.equals(HttpHeaders.ACCESS_CONTROL_ALLOW_HEADERS) || k.equals(HttpHeaders.ACCESS_CONTROL_MAX_AGE)) {
} else {
clientRespHeaders.put(k, h.getValue());
}
} else {
clientRespHeaders.put(k, h.getValue());
}
}
);
if (log.isDebugEnabled()) {
StringBuilder b = ThreadContext.getStringBuilder();
String rid = exchange.getRequest().getId();
WebUtils.response2stringBuilder(rid, remoteResp, b);
log.debug(b.toString(), LogService.BIZ_ID, rid);
}
return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers()))
.doOnError(throwable -> cleanup(remoteResp)).doOnCancel(() -> cleanup(remoteResp));
}
private void cleanup(ClientResponse clientResponse) {
if (clientResponse != null) {
clientResponse.bodyToMono(Void.class).subscribe();
}
}
private String buildUri(ServerHttpRequest req, ServiceInstance si, String path) {
StringBuilder b = ThreadContext.getStringBuilder();
b.append(req.getURI().getScheme()) .append(Constants.Symbol.COLON) .append(Constants.Symbol.FORWARD_SLASH) .append(Constants.Symbol.FORWARD_SLASH);
b.append(si.ip) .append(Constants.Symbol.COLON) .append(si.port) .append(path);
return b.toString();
}
private Mono<Void> directResponse(ServerWebExchange exchange, CallbackConfig cc) {
HttpHeaders httpHeaders = new HttpHeaders(); HttpHeaders httpHeaders = new HttpHeaders();
cc.respHeaders.forEach( cc.respHeaders.forEach(
(h, v) -> { (h, v) -> {