update: add aggregate service

This commit is contained in:
hongqiaowei
2021-01-29 13:43:23 +08:00
parent 6fba85ea92
commit 95dd47ca49
3 changed files with 153 additions and 61 deletions

View File

@@ -0,0 +1,31 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author hongqiaowei
*/
@RestController
@RequestMapping(value = "/admin/callback")
public class CallbackController {
}

View File

@@ -41,14 +41,10 @@ import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import we.config.AggregateRedisConfig;
import we.constants.CommonConstants;
import we.fizz.AggregateResource;
import we.fizz.AggregateResult;
import we.fizz.AggregateService;
import we.fizz.ConfigLoader;
import we.fizz.Pipeline;
import we.fizz.input.Input;
import we.flume.clients.log4j2appender.LogService;
import we.plugin.auth.ApiConfig;
import we.plugin.auth.CallbackConfig;
@@ -57,7 +53,6 @@ import we.proxy.DiscoveryClientUriSelector;
import we.proxy.FizzWebClient;
import we.proxy.ServiceInstance;
import we.util.Constants;
import we.util.MapUtil;
import we.util.ThreadContext;
import we.util.WebUtils;
@@ -66,7 +61,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author hongqiaowei
@@ -106,6 +100,9 @@ public class CallbackFilter extends FizzWebFilter {
@Resource
private ConfigLoader aggregateResourceLoader;
@Resource
private AggregateService aggregateService;
@Override
public Mono<Void> doFilter(ServerWebExchange exchange, WebFilterChain chain) {
@@ -160,7 +157,7 @@ public class CallbackFilter extends FizzWebFilter {
send = fizzWebClient.send(req.getId(), req.getMethod(), uri, headers, body);
}
} else {
send = requestAggregateResource(exchange, headers, body, r);
send = aggregateService.request(exchange, r.service, r.path, headers, body);
}
monos[i] = send;
}
@@ -187,7 +184,7 @@ public class CallbackFilter extends FizzWebFilter {
return genServerResponse(exchange, remoteResp);
} else if (r instanceof AggregateResult) {
AggregateResult ar = (AggregateResult) r;
return genAggregateResponse(exchange, ar);
return aggregateService.genAggregateResponse(exchange, ar);
} else {
return Mono.error(new RuntimeException("cant response client with " + r, null, false, false) {});
}
@@ -196,58 +193,6 @@ public class CallbackFilter extends FizzWebFilter {
;
}
private Mono<AggregateResult> requestAggregateResource(ServerWebExchange exchange, HttpHeaders hdrs, DataBuffer body, Receiver receiver) {
// long start = System.currentTimeMillis();
ServerHttpRequest request = exchange.getRequest();
String path = WebUtils.getClientReqPathPrefix(exchange) + receiver.service + receiver.path;
String method = request.getMethodValue();
AggregateResource aggregateResource = aggregateResourceLoader.matchAggregateResource(method, path);
if (aggregateResource == null) {
return Mono.error(new RuntimeException("no aggregate resource: " + method + ' ' + path, null, false, false) {});
} else {
Pipeline pipeline = aggregateResource.getPipeline();
Input input = aggregateResource.getInput();
Map<String, Object> headers = MapUtil.toHashMap(hdrs);
String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
log.debug("matched aggregation api: {}", path);
Map<String, Object> clientInput = new HashMap<>(9);
clientInput.put("path", path);
clientInput.put("method", method);
clientInput.put("headers", headers);
clientInput.put("params", MapUtil.toHashMap(request.getQueryParams()));
if (body != null) {
clientInput.put("body", JSON.parse(body.toString(StandardCharsets.UTF_8)));
}
return pipeline.run(input, clientInput, traceId).subscribeOn(Schedulers.elastic());
}
}
private Mono<? extends Void> genAggregateResponse(ServerWebExchange exchange, AggregateResult ar) {
ServerHttpResponse clientResp = exchange.getResponse();
String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
String js = JSON.toJSONString(ar.getBody());
log.debug("aggregate response body: {}", js);
if (ar.getHeaders() != null && !ar.getHeaders().isEmpty()) {
ar.getHeaders().remove("Content-Length");
clientResp.getHeaders().addAll(ar.getHeaders());
}
if (!clientResp.getHeaders().containsKey("Content-Type")) {
// defalut content-type
clientResp.getHeaders().add("Content-Type", "application/json; charset=UTF-8");
}
List<String> headerTraceIds = clientResp.getHeaders().get(CommonConstants.HEADER_TRACE_ID);
if (headerTraceIds == null || !headerTraceIds.contains(traceId)) {
clientResp.getHeaders().add(CommonConstants.HEADER_TRACE_ID, traceId);
}
// long end = System.currentTimeMillis();
// pipeline.getStepContext().addElapsedTime("总耗时", end - start);
// log.info("ElapsedTimes={}", JSON.toJSONString(pipeline.getStepContext().getElapsedTimes()));
return clientResp
.writeWith(Flux.just(exchange.getResponse().bufferFactory().wrap(js.getBytes())));
}
private Mono<? extends Void> genServerResponse(ServerWebExchange exchange, ClientResponse remoteResp) {
ServerHttpResponse clientResp = exchange.getResponse();
clientResp.setStatusCode(remoteResp.statusCode());

View File

@@ -0,0 +1,116 @@
/*
* Copyright (C) 2020 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.fizz;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Service;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import we.constants.CommonConstants;
import we.fizz.input.Input;
import we.flume.clients.log4j2appender.LogService;
import we.util.MapUtil;
import we.util.WebUtils;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author hongqiaowei
*/
@Service
public class AggregateService {
private static final Logger log = LoggerFactory.getLogger(AggregateService.class);
@Resource
private ConfigLoader aggregateResourceLoader;
public Mono<AggregateResult> request(ServerWebExchange exchange, String service, String path, HttpHeaders headers, String body) {
// long start = System.currentTimeMillis();
ServerHttpRequest request = exchange.getRequest();
String pash = WebUtils.getClientReqPathPrefix(exchange) + service + path;
String method = request.getMethodValue();
AggregateResource aggregateResource = aggregateResourceLoader.matchAggregateResource(method, pash);
if (aggregateResource == null) {
return Mono.error(new RuntimeException("no aggregate resource: " + method + ' ' + pash, null, false, false) {});
} else {
Pipeline pipeline = aggregateResource.getPipeline();
Input input = aggregateResource.getInput();
Map<String, Object> hs = MapUtil.toHashMap(headers);
String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
log.debug("matched aggregation api: {}", pash);
Map<String, Object> clientInput = new HashMap<>();
clientInput.put("path", pash);
clientInput.put("method", method);
clientInput.put("headers", hs);
clientInput.put("params", MapUtil.toHashMap(request.getQueryParams()));
if (body != null) {
clientInput.put("body", JSON.parse(body));
}
return pipeline.run(input, clientInput, traceId).subscribeOn(Schedulers.elastic());
}
}
public Mono<AggregateResult> request(ServerWebExchange exchange, String service, String path, HttpHeaders headers, DataBuffer body) {
String b = null;
if (body != null) {
b = body.toString(StandardCharsets.UTF_8);
}
return request(exchange, service, path, headers, b);
}
public Mono<? extends Void> genAggregateResponse(ServerWebExchange exchange, AggregateResult ar) {
ServerHttpResponse clientResp = exchange.getResponse();
String traceId = WebUtils.getTraceId(exchange);
LogService.setBizId(traceId);
String js = JSON.toJSONString(ar.getBody());
log.debug("aggregate response body: {}", js);
if (ar.getHeaders() != null && !ar.getHeaders().isEmpty()) {
ar.getHeaders().remove("Content-Length");
clientResp.getHeaders().addAll(ar.getHeaders());
}
if (!clientResp.getHeaders().containsKey("Content-Type")) {
// defalut content-type
clientResp.getHeaders().add("Content-Type", "application/json; charset=UTF-8");
}
List<String> headerTraceIds = clientResp.getHeaders().get(CommonConstants.HEADER_TRACE_ID);
if (headerTraceIds == null || !headerTraceIds.contains(traceId)) {
clientResp.getHeaders().add(CommonConstants.HEADER_TRACE_ID, traceId);
}
// long end = System.currentTimeMillis();
// pipeline.getStepContext().addElapsedTime("总耗时", end - start);
// log.info("ElapsedTimes={}", JSON.toJSONString(pipeline.getStepContext().getElapsedTimes()));
return clientResp
.writeWith(Flux.just(exchange.getResponse().bufferFactory().wrap(js.getBytes())));
}
}