update: add call back service
This commit is contained in:
@@ -17,15 +17,65 @@
|
||||
|
||||
package we.controller;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.web.bind.annotation.PostMapping;
|
||||
import org.springframework.web.bind.annotation.RequestBody;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.flume.clients.log4j2appender.LogService;
|
||||
import we.proxy.CallbackReplayReq;
|
||||
import we.proxy.CallbackService;
|
||||
import we.util.Constants;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ReactiveResult;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
@RestController
|
||||
@RequestMapping(value = "/admin/callback")
|
||||
@RequestMapping(value = "/admin")
|
||||
public class CallbackController {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CallbackController.class);
|
||||
|
||||
@Resource
|
||||
private CallbackService callbackService;
|
||||
|
||||
@PostMapping("/callback")
|
||||
public Mono<String> callback(ServerWebExchange exchange, @RequestBody CallbackReplayReq req) {
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(JacksonUtils.writeValueAsString(req), LogService.BIZ_ID, req.id);
|
||||
}
|
||||
|
||||
return
|
||||
callbackService.replay(req)
|
||||
.onErrorResume(
|
||||
t -> {
|
||||
return Mono.just(ReactiveResult.fail(t));
|
||||
}
|
||||
)
|
||||
.map(
|
||||
r -> {
|
||||
ServerHttpResponse resp = exchange.getResponse();
|
||||
if (r.code == ReactiveResult.SUCC) {
|
||||
resp.setStatusCode(HttpStatus.OK);
|
||||
return Constants.Symbol.EMPTY;
|
||||
} else {
|
||||
resp.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
|
||||
return r.t.getMessage();
|
||||
}
|
||||
}
|
||||
)
|
||||
;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ import we.config.AggregateRedisConfig;
|
||||
import we.flume.clients.log4j2appender.LogService;
|
||||
import we.plugin.auth.ApiConfig;
|
||||
import we.plugin.auth.CallbackConfig;
|
||||
import we.plugin.auth.GatewayGroupService;
|
||||
import we.plugin.auth.Receiver;
|
||||
import we.proxy.CallbackService;
|
||||
import we.proxy.DiscoveryClientUriSelector;
|
||||
@@ -93,6 +94,9 @@ public class CallbackFilter extends FizzWebFilter {
|
||||
@Resource
|
||||
private CallbackService callbackService;
|
||||
|
||||
@Resource
|
||||
private GatewayGroupService gatewayGroupService;
|
||||
|
||||
@Override
|
||||
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
|
||||
|
||||
@@ -164,6 +168,7 @@ public class CallbackFilter extends FizzWebFilter {
|
||||
private static final String _headers = "\"headers\":";
|
||||
private static final String _body = "\"body\":";
|
||||
private static final String _receivers = "\"receivers\":";
|
||||
private static final String _gatewayGroup = "\"gatewayGroup\":";
|
||||
|
||||
private void pushReq2manager(ServerWebExchange exchange, HttpHeaders headers, String bodyStr, HashMap<String, ServiceInstance> service2instMap) {
|
||||
|
||||
@@ -180,7 +185,6 @@ public class CallbackFilter extends FizzWebFilter {
|
||||
b.append(_path); toJsonStringValue(b, WebUtils.getClientReqPath(exchange)); b.append(Constants.Symbol.COMMA);
|
||||
b.append(_query); toJsonStringValue(b, WebUtils.getClientReqQuery(exchange)); b.append(Constants.Symbol.COMMA);
|
||||
|
||||
// String headersJsonStr = JSON.toJSONString(JSON.toJSONString(headers));
|
||||
String headersJson = JSON.toJSONString(headers);
|
||||
b.append(_headers); b.append(headersJson); b.append(Constants.Symbol.COMMA);
|
||||
|
||||
@@ -189,6 +193,9 @@ public class CallbackFilter extends FizzWebFilter {
|
||||
b.append(_receivers); b.append(bodyJsonStr); b.append(Constants.Symbol.COMMA);
|
||||
}
|
||||
|
||||
String gg = gatewayGroupService.currentGatewayGroupSet.iterator().next();
|
||||
b.append(_gatewayGroup); toJsonStringValue(b, gg); b.append(Constants.Symbol.COMMA);
|
||||
|
||||
MediaType contentType = req.getHeaders().getContentType();
|
||||
if (contentType != null && contentType.getSubtype().equalsIgnoreCase(json)) {
|
||||
b.append(_body); b.append(JSON.toJSONString(bodyStr));
|
||||
|
||||
@@ -22,9 +22,9 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
@@ -53,11 +53,13 @@ public class AggregateService {
|
||||
@Resource
|
||||
private ConfigLoader aggregateResourceLoader;
|
||||
|
||||
public Mono<AggregateResult> request(ServerWebExchange exchange, String service, String path, HttpHeaders headers, String body) {
|
||||
public Mono<AggregateResult> request(String traceId, String clientReqPathPrefix, String method, String service, String path, MultiValueMap<String, String> queryParams,
|
||||
HttpHeaders headers, String body) {
|
||||
|
||||
// long start = System.currentTimeMillis();
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
String pash = WebUtils.getClientReqPathPrefix(exchange) + service + path;
|
||||
String method = request.getMethodValue();
|
||||
// ServerHttpRequest request = exchange.getRequest();
|
||||
String pash = clientReqPathPrefix + service + path;
|
||||
// String method = request.getMethodValue();
|
||||
AggregateResource aggregateResource = aggregateResourceLoader.matchAggregateResource(method, pash);
|
||||
if (aggregateResource == null) {
|
||||
return Mono.error(new RuntimeException("no aggregate resource: " + method + ' ' + pash, null, false, false) {});
|
||||
@@ -65,14 +67,17 @@ public class AggregateService {
|
||||
Pipeline pipeline = aggregateResource.getPipeline();
|
||||
Input input = aggregateResource.getInput();
|
||||
Map<String, Object> hs = MapUtil.toHashMap(headers);
|
||||
String traceId = WebUtils.getTraceId(exchange);
|
||||
// String traceId = WebUtils.getTraceId(exchange);
|
||||
LogService.setBizId(traceId);
|
||||
log.debug("matched aggregation api: {}", pash);
|
||||
Map<String, Object> clientInput = new HashMap<>();
|
||||
clientInput.put("path", pash);
|
||||
clientInput.put("method", method);
|
||||
clientInput.put("headers", hs);
|
||||
clientInput.put("params", MapUtil.toHashMap(request.getQueryParams()));
|
||||
// MultiValueMap<String, String> queryParams = request.getQueryParams();
|
||||
if (queryParams != null) {
|
||||
clientInput.put("params", MapUtil.toHashMap(queryParams));
|
||||
}
|
||||
if (body != null) {
|
||||
clientInput.put("body", JSON.parse(body));
|
||||
}
|
||||
@@ -80,12 +85,13 @@ public class AggregateService {
|
||||
}
|
||||
}
|
||||
|
||||
public Mono<AggregateResult> request(ServerWebExchange exchange, String service, String path, HttpHeaders headers, DataBuffer body) {
|
||||
public Mono<AggregateResult> request(String traceId, String clientReqPathPrefix, String method, String service, String path, MultiValueMap<String, String> queryParams,
|
||||
HttpHeaders headers, DataBuffer body) {
|
||||
String b = null;
|
||||
if (body != null) {
|
||||
b = body.toString(StandardCharsets.UTF_8);
|
||||
}
|
||||
return request(exchange, service, path, headers, b);
|
||||
return request(traceId, clientReqPathPrefix, method, service, path, queryParams, headers, b);
|
||||
}
|
||||
|
||||
public Mono<? extends Void> genAggregateResponse(ServerWebExchange exchange, AggregateResult ar) {
|
||||
|
||||
81
src/main/java/we/proxy/CallbackReplayReq.java
Normal file
81
src/main/java/we/proxy/CallbackReplayReq.java
Normal file
@@ -0,0 +1,81 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.proxy;
|
||||
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import we.util.JacksonUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
public class CallbackReplayReq {
|
||||
|
||||
public static interface Type {
|
||||
static final int ORIGINAL_PATH = 1;
|
||||
static final int ASSIGN_SERVICES = 2;
|
||||
}
|
||||
|
||||
public String id;
|
||||
|
||||
public String app;
|
||||
|
||||
public String gatewayGroup;
|
||||
|
||||
public HttpMethod method;
|
||||
|
||||
public String service;
|
||||
|
||||
public String path;
|
||||
|
||||
public String query;
|
||||
|
||||
public HttpHeaders headers;
|
||||
|
||||
public String body;
|
||||
|
||||
public int replayType;
|
||||
|
||||
public Map<String, ServiceInstance> receivers;
|
||||
|
||||
public List<ServiceTypePath> assignServices;
|
||||
|
||||
public void setMethod(String m) {
|
||||
method = HttpMethod.resolve(m);
|
||||
}
|
||||
|
||||
public void setHeaders(Map<String, List<String>> hs) {
|
||||
if (hs != null && !hs.isEmpty()) {
|
||||
headers = new HttpHeaders();
|
||||
hs.forEach(
|
||||
(h, vs) -> {
|
||||
headers.addAll(h, vs);
|
||||
}
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JacksonUtils.writeValueAsString(this);
|
||||
}
|
||||
}
|
||||
@@ -17,10 +17,12 @@
|
||||
|
||||
package we.proxy;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpMethod;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.stereotype.Service;
|
||||
@@ -29,19 +31,21 @@ import org.springframework.web.reactive.function.client.ClientResponse;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.constants.CommonConstants;
|
||||
import we.fizz.AggregateResult;
|
||||
import we.fizz.AggregateService;
|
||||
import we.flume.clients.log4j2appender.LogService;
|
||||
import we.plugin.auth.ApiConfig;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.CallbackConfig;
|
||||
import we.plugin.auth.Receiver;
|
||||
import we.util.Constants;
|
||||
import we.util.ThreadContext;
|
||||
import we.util.WebUtils;
|
||||
import we.util.*;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
@@ -50,7 +54,13 @@ import java.util.HashMap;
|
||||
@Service
|
||||
public class CallbackService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(CallbackService.class);
|
||||
private static final Logger log = LoggerFactory.getLogger(CallbackService.class);
|
||||
|
||||
private static final ClientResponse fcr = new FizzFakeClientResponse();
|
||||
|
||||
private static final AggregateResult far = new AggregateResult();
|
||||
|
||||
private static final String callback = "callback";
|
||||
|
||||
@Resource
|
||||
private FizzWebClient fizzWebClient;
|
||||
@@ -58,33 +68,39 @@ public class CallbackService {
|
||||
@Resource
|
||||
private AggregateService aggregateService;
|
||||
|
||||
public Mono<? extends Void> requestBackends(ServerWebExchange exchange, HttpHeaders headers, Object body, CallbackConfig cc, HashMap<String, ServiceInstance> service2instMap) {
|
||||
@Resource
|
||||
private ApiConfigService apiConfigService;
|
||||
|
||||
public Mono<? extends Void> requestBackends(ServerWebExchange exchange, HttpHeaders headers, DataBuffer body, CallbackConfig cc,
|
||||
HashMap<String, ServiceInstance> service2instMap) {
|
||||
ServerHttpRequest req = exchange.getRequest();
|
||||
String reqId = req.getId();
|
||||
HttpMethod method = req.getMethod();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("service2instMap: " + JacksonUtils.writeValueAsString(service2instMap), LogService.BIZ_ID, reqId);
|
||||
}
|
||||
int rs = cc.receivers.size();
|
||||
Mono<Object>[] monos = new Mono[rs];
|
||||
Mono<Object>[] sends = new Mono[rs];
|
||||
for (int i = 0; i < rs; i++) {
|
||||
Receiver r = cc.receivers.get(i);
|
||||
Mono send;
|
||||
if (r.type == ApiConfig.Type.SERVICE_DISCOVERY) {
|
||||
ServiceInstance si = service2instMap.get(r.service);
|
||||
if (si == null) {
|
||||
send = fizzWebClient.proxySend2service(req.getId(), req.getMethod(), r.service, r.path, headers, body);
|
||||
send = fizzWebClient.proxySend2service(reqId, method, r.service, r.path, headers, body)
|
||||
.onErrorResume( crError(exchange, r, method, headers, body) );
|
||||
} else {
|
||||
String uri = buildUri(req, si, r.path);
|
||||
send = fizzWebClient.send(req.getId(), req.getMethod(), uri, headers, body);
|
||||
send = fizzWebClient.send(reqId, method, uri, headers, body)
|
||||
.onErrorResume( crError(exchange, r, method, headers, body) );
|
||||
}
|
||||
} else {
|
||||
if (body instanceof DataBuffer) {
|
||||
send = aggregateService.request(exchange, r.service, r.path, headers, (DataBuffer) body);
|
||||
} else if (body instanceof String) {
|
||||
send = aggregateService.request(exchange, r.service, r.path, headers, (String) body);
|
||||
} else {
|
||||
return Mono.error(new RuntimeException("cant handle " + body, null, false, false) {});
|
||||
}
|
||||
send = aggregateService.request(WebUtils.getTraceId(exchange), WebUtils.getClientReqPathPrefix(exchange), method.name(), r.service, r.path, req.getQueryParams(), headers, body)
|
||||
.onErrorResume( arError(exchange, r, method, headers, body) );
|
||||
}
|
||||
monos[i] = send;
|
||||
sends[i] = send;
|
||||
}
|
||||
return Flux.mergeSequential(monos)
|
||||
return Flux.mergeSequential(sends)
|
||||
.reduce(
|
||||
new ArrayList<Object>(rs),
|
||||
(respCollector, resp) -> {
|
||||
@@ -97,25 +113,48 @@ public class CallbackService {
|
||||
Object r = null;
|
||||
for (int i = 1; i < resps.size(); i++) {
|
||||
r = resps.get(i);
|
||||
if (r instanceof ClientResponse) {
|
||||
cleanup((ClientResponse) r);
|
||||
if (r instanceof ClientResponse && r != fcr) {
|
||||
clean((ClientResponse) r);
|
||||
}
|
||||
}
|
||||
r = resps.get(0);
|
||||
if (r instanceof ClientResponse) {
|
||||
ClientResponse remoteResp = (ClientResponse) r;
|
||||
return genServerResponse(exchange, remoteResp);
|
||||
} else if (r instanceof AggregateResult) {
|
||||
if (r == fcr || r == far) {
|
||||
return Mono.error(new RuntimeException("cant response client with " + r, null, false, false) {});
|
||||
} else if (r instanceof ClientResponse) {
|
||||
ClientResponse cr = (ClientResponse) r;
|
||||
return genServerResponse(exchange, cr);
|
||||
} else {
|
||||
AggregateResult ar = (AggregateResult) r;
|
||||
return aggregateService.genAggregateResponse(exchange, ar);
|
||||
} else {
|
||||
return Mono.error(new RuntimeException("cant response client with " + r, null, false, false) {});
|
||||
}
|
||||
}
|
||||
)
|
||||
;
|
||||
}
|
||||
|
||||
private Function<Throwable, Mono<? extends ClientResponse>> crError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body) {
|
||||
return t -> {
|
||||
log(exchange, r, method, headers, body, t);
|
||||
return Mono.just(fcr);
|
||||
};
|
||||
}
|
||||
|
||||
private Function<Throwable, Mono<AggregateResult>> arError(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body) {
|
||||
return t -> {
|
||||
log(exchange, r, method, headers, body, t);
|
||||
return Mono.just(far);
|
||||
};
|
||||
}
|
||||
|
||||
private void log(ServerWebExchange exchange, Receiver r, HttpMethod method, HttpHeaders headers, DataBuffer body, Throwable t) {
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
WebUtils.request2stringBuilder(exchange, b);
|
||||
b.append(Constants.Symbol.LINE_SEPARATOR).append(callback).append(Constants.Symbol.LINE_SEPARATOR);
|
||||
String id = exchange.getRequest().getId();
|
||||
WebUtils.request2stringBuilder(id, method, r.service + Constants.Symbol.FORWARD_SLASH + r.path, headers, body, b);
|
||||
log.error(b.toString(), LogService.BIZ_ID, id, t);
|
||||
}
|
||||
|
||||
private String buildUri(ServerHttpRequest req, ServiceInstance si, String path) {
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
b.append(req.getURI().getScheme()) .append(Constants.Symbol.COLON) .append(Constants.Symbol.FORWARD_SLASH) .append(Constants.Symbol.FORWARD_SLASH);
|
||||
@@ -123,6 +162,13 @@ public class CallbackService {
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
private String buildUri(String scheme, ServiceInstance si, String path) {
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
b.append(scheme) .append(Constants.Symbol.COLON) .append(Constants.Symbol.FORWARD_SLASH) .append(Constants.Symbol.FORWARD_SLASH);
|
||||
b.append(si.ip) .append(Constants.Symbol.COLON) .append(si.port) .append(path);
|
||||
return b.toString();
|
||||
}
|
||||
|
||||
private Mono<? extends Void> genServerResponse(ServerWebExchange exchange, ClientResponse remoteResp) {
|
||||
ServerHttpResponse clientResp = exchange.getResponse();
|
||||
clientResp.setStatusCode(remoteResp.statusCode());
|
||||
@@ -149,13 +195,106 @@ public class CallbackService {
|
||||
log.debug(b.toString(), LogService.BIZ_ID, rid);
|
||||
}
|
||||
return clientResp.writeWith(remoteResp.body(BodyExtractors.toDataBuffers()))
|
||||
.doOnError(throwable -> cleanup(remoteResp)).doOnCancel(() -> cleanup(remoteResp));
|
||||
.doOnError(throwable -> clean(remoteResp)).doOnCancel(() -> clean(remoteResp));
|
||||
}
|
||||
|
||||
private void cleanup(ClientResponse clientResponse) {
|
||||
if (clientResponse != null) {
|
||||
clientResponse.bodyToMono(Void.class).subscribe();
|
||||
public Mono<ReactiveResult> replay(CallbackReplayReq req) {
|
||||
|
||||
CallbackConfig cc = apiConfigService.getApiConfig(req.service, req.method, req.path, req.gatewayGroup, req.app).callbackConfig;
|
||||
if (req.headers.getContentType().getSubtype().equalsIgnoreCase("json")) {
|
||||
req.body = (String) JSON.parse(req.body);
|
||||
}
|
||||
|
||||
List<Mono<Object>> sends = new ArrayList<>(); Mono send;
|
||||
|
||||
if (req.replayType == CallbackReplayReq.Type.ORIGINAL_PATH) {
|
||||
|
||||
int rs = cc.receivers.size();
|
||||
for (int i = 0; i < rs; i++) {
|
||||
Receiver r = cc.receivers.get(i);
|
||||
if (r.type == ApiConfig.Type.SERVICE_DISCOVERY) {
|
||||
ServiceInstance si = req.receivers.get(r.service);
|
||||
if (si != null) {
|
||||
String uri = buildUri("http", si, r.path);
|
||||
send = fizzWebClient.send(req.id, req.method, uri, req.headers, req.body)
|
||||
.onErrorResume( crError(req, r.service, r.path) );
|
||||
sends.add(send);
|
||||
}
|
||||
} else {
|
||||
String traceId = CommonConstants.TRACE_ID_PREFIX + req.id;
|
||||
send = aggregateService.request(traceId, "/proxy/", req.method.name(), r.service, r.path, null, req.headers, req.body)
|
||||
.onErrorResume( arError(req, r.service, r.path) );
|
||||
sends.add(send);
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
for (ServiceTypePath stp : req.assignServices) {
|
||||
if (stp.type == ApiConfig.Type.SERVICE_DISCOVERY) {
|
||||
send = fizzWebClient.proxySend2service(req.id, req.method, stp.service, stp.path, req.headers, req.body)
|
||||
.onErrorResume( crError(req, stp.service, stp.path) );
|
||||
sends.add(send);
|
||||
} else {
|
||||
String traceId = CommonConstants.TRACE_ID_PREFIX + req.id;
|
||||
send = aggregateService.request(traceId, "/proxy/", req.method.name(), stp.service, stp.path, null, req.headers, req.body)
|
||||
.onErrorResume( arError(req, stp.service, stp.path) );
|
||||
sends.add(send);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int ss = sends.size();
|
||||
Mono<Object>[] sendArr = sends.toArray(new Mono[ss]);
|
||||
return Flux.mergeSequential(sendArr)
|
||||
.reduce(
|
||||
new ArrayList<Object>(ss),
|
||||
(respCollector, resp) -> {
|
||||
respCollector.add(resp);
|
||||
return respCollector;
|
||||
}
|
||||
)
|
||||
.map(
|
||||
resps -> {
|
||||
int c = ReactiveResult.SUCC;
|
||||
for (int i = 0; i < resps.size(); i++) {
|
||||
Object r = resps.get(i);
|
||||
if (r == fcr || r == far) {
|
||||
c = ReactiveResult.FAIL;
|
||||
} else if (r instanceof ClientResponse) {
|
||||
clean((ClientResponse) r);
|
||||
}
|
||||
}
|
||||
return ReactiveResult.with(c);
|
||||
}
|
||||
)
|
||||
;
|
||||
}
|
||||
|
||||
private Function<Throwable, Mono<? extends AggregateResult>> arError(CallbackReplayReq req, String service, String path) {
|
||||
return t -> {
|
||||
log(req, service, path, t);
|
||||
return Mono.just(far);
|
||||
};
|
||||
}
|
||||
|
||||
private Function<Throwable, Mono<? extends ClientResponse>> crError(CallbackReplayReq req, String service, String path) {
|
||||
return t -> {
|
||||
log(req, service, path, t);
|
||||
return Mono.just(fcr);
|
||||
};
|
||||
}
|
||||
|
||||
private void log(CallbackReplayReq req, String service, String path, Throwable t) {
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
b.append(req.service).append(Constants.Symbol.FORWARD_SLASH).append(req.path);
|
||||
b.append(Constants.Symbol.LINE_SEPARATOR).append(callback).append(Constants.Symbol.LINE_SEPARATOR);
|
||||
WebUtils.request2stringBuilder(req.id, req.method, service + Constants.Symbol.FORWARD_SLASH + path, req.headers, req.body, b);
|
||||
log.error(b.toString(), LogService.BIZ_ID, req.id, t);
|
||||
}
|
||||
|
||||
private void clean(ClientResponse cr) {
|
||||
cr.bodyToMono(Void.class).subscribe();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
131
src/main/java/we/proxy/FizzFakeClientResponse.java
Normal file
131
src/main/java/we/proxy/FizzFakeClientResponse.java
Normal file
@@ -0,0 +1,131 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.proxy;
|
||||
|
||||
import org.springframework.core.ParameterizedTypeReference;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.ResponseCookie;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.http.client.reactive.ClientHttpResponse;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.web.reactive.function.BodyExtractor;
|
||||
import org.springframework.web.reactive.function.client.ClientResponse;
|
||||
import org.springframework.web.reactive.function.client.ExchangeStrategies;
|
||||
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
class FizzFakeClientResponse implements ClientResponse {
|
||||
|
||||
@Override
|
||||
public ExchangeStrategies strategies() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpStatus statusCode() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int rawStatusCode() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Headers headers() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public MultiValueMap<String, ResponseCookie> cookies() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T body(BodyExtractor<T, ? super ClientHttpResponse> extractor) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> bodyToMono(Class<? extends T> elementClass) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> elementTypeRef) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> bodyToFlux(Class<? extends T> elementClass) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementTypeRef) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> releaseBody() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<ResponseEntity<Void>> toBodilessEntity() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<T>> toEntity(Class<T> bodyType) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<T>> toEntity(ParameterizedTypeReference<T> bodyTypeReference) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<List<T>>> toEntityList(Class<T> elementClass) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> Mono<ResponseEntity<List<T>>> toEntityList(ParameterizedTypeReference<T> elementTypeRef) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<WebClientResponseException> createException() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String logPrefix() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
}
|
||||
38
src/main/java/we/proxy/ServiceTypePath.java
Normal file
38
src/main/java/we/proxy/ServiceTypePath.java
Normal file
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.proxy;
|
||||
|
||||
import we.util.JacksonUtils;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
public class ServiceTypePath {
|
||||
|
||||
public String service;
|
||||
|
||||
public String path;
|
||||
|
||||
public int type;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JacksonUtils.writeValueAsString(this);
|
||||
}
|
||||
}
|
||||
80
src/main/java/we/util/ReactiveResult.java
Normal file
80
src/main/java/we/util/ReactiveResult.java
Normal file
@@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.util;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
public class ReactiveResult<D> {
|
||||
|
||||
public static final int SUCC = 1;
|
||||
public static final int FAIL = 0;
|
||||
|
||||
public int code = -1;
|
||||
|
||||
public String msg;
|
||||
|
||||
public D data;
|
||||
|
||||
public Throwable t;
|
||||
|
||||
public ReactiveResult(int code, String msg, D data, Throwable t) {
|
||||
this.code = code;
|
||||
this.msg = msg;
|
||||
this.data = data;
|
||||
this.t = t;
|
||||
}
|
||||
|
||||
public static ReactiveResult succ() {
|
||||
return new ReactiveResult(SUCC, null, null, null);
|
||||
}
|
||||
|
||||
public static <D> ReactiveResult<D> succ(D data) {
|
||||
return new ReactiveResult<D>(SUCC, null, data, null);
|
||||
}
|
||||
|
||||
public static ReactiveResult fail() {
|
||||
return new ReactiveResult(FAIL, null, null, null);
|
||||
}
|
||||
|
||||
public static ReactiveResult fail(String msg) {
|
||||
return new ReactiveResult(FAIL, msg, null, null);
|
||||
}
|
||||
|
||||
public static ReactiveResult fail(Throwable t) {
|
||||
return new ReactiveResult(FAIL, null, null, t);
|
||||
}
|
||||
|
||||
public static ReactiveResult with(int code) {
|
||||
return new ReactiveResult(code, null, null, null);
|
||||
}
|
||||
|
||||
public static ReactiveResult with(int code, String msg) {
|
||||
return new ReactiveResult(code, msg, null, null);
|
||||
}
|
||||
|
||||
public static <D> ReactiveResult<D> with(int code, D data) {
|
||||
return new ReactiveResult<D>(code, null, data, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return JacksonUtils.writeValueAsString(this);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user