From 43f01602303c363663742f6bcaf6f17f882e8bac Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Fri, 4 Jun 2021 16:56:23 +0800 Subject: [PATCH] feat: Support configuring dubbo API in route #210 --- .../main/java/we/config/WebClientConfig.java | 22 +-- .../src/main/java/we/config/SystemConfig.java | 4 + .../main/java/we/filter/PreprocessFilter.java | 12 +- .../src/main/java/we/filter/RouteFilter.java | 94 ++++++++++-- .../input/extension/request/RequestInput.java | 6 +- .../extension/request/RequestInputConfig.java | 2 +- .../main/java/we/plugin/auth/ApiConfig.java | 15 +- .../main/java/we/proxy/CallbackService.java | 4 +- .../src/main/java/we/proxy/FizzWebClient.java | 136 ++++-------------- .../dubbo/ApacheDubboGenericService.java | 2 +- .../java/we/proxy/CallbackServiceTests.java | 4 +- 11 files changed, 160 insertions(+), 141 deletions(-) diff --git a/fizz-common/src/main/java/we/config/WebClientConfig.java b/fizz-common/src/main/java/we/config/WebClientConfig.java index 2440e6f..1c473e2 100644 --- a/fizz-common/src/main/java/we/config/WebClientConfig.java +++ b/fizz-common/src/main/java/we/config/WebClientConfig.java @@ -46,7 +46,7 @@ public abstract class WebClientConfig { private Integer chConnTimeout = null; // 20_000; - private Long responseTimeout = null; // 20_000 +// private Long responseTimeout = null; // 20_000 private Boolean chTcpNodelay = null; // true @@ -78,13 +78,13 @@ public abstract class WebClientConfig { this.chConnTimeout = chConnTimeout; } - public Long getResponseTimeout() { - return responseTimeout; - } - - public void setResponseTimeout(Long responseTimeout) { - this.responseTimeout = responseTimeout; - } +// public Long getResponseTimeout() { +// return responseTimeout; +// } +// +// public void setResponseTimeout(Long responseTimeout) { +// this.responseTimeout = responseTimeout; +// } public Boolean isChTcpNodelay() { return chTcpNodelay; @@ -146,9 +146,9 @@ public abstract class WebClientConfig { if (compress != null) { httpClient = httpClient.compress(compress); } - if (responseTimeout != null) { - httpClient = httpClient.responseTimeout(Duration.ofMillis(responseTimeout)); - } + // if (responseTimeout != null) { + // httpClient = httpClient.responseTimeout(Duration.ofMillis(responseTimeout)); + // } return webClientBuilder.exchangeStrategies( ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) diff --git a/fizz-core/src/main/java/we/config/SystemConfig.java b/fizz-core/src/main/java/we/config/SystemConfig.java index 231229e..52d772d 100644 --- a/fizz-core/src/main/java/we/config/SystemConfig.java +++ b/fizz-core/src/main/java/we/config/SystemConfig.java @@ -61,6 +61,10 @@ public class SystemConfig { public List proxySetHeaders = new ArrayList<>(); public boolean aggregateTestAuth = false; + + @NacosValue(value = "${route-timeout:0}", autoRefreshed = true) + @Value ( "${route-timeout:0}") + public long routeTimeout = 0; @NacosValue(value = "${gateway.aggr.proxy_set_headers:}", autoRefreshed = true) diff --git a/fizz-core/src/main/java/we/filter/PreprocessFilter.java b/fizz-core/src/main/java/we/filter/PreprocessFilter.java index 18aeef8..c6e2e46 100644 --- a/fizz-core/src/main/java/we/filter/PreprocessFilter.java +++ b/fizz-core/src/main/java/we/filter/PreprocessFilter.java @@ -117,11 +117,15 @@ public class PreprocessFilter extends FizzWebFilter { if (ac == null) { bs = WebUtils.getClientService(exchange); bp = WebUtils.getClientReqPath(exchange); - } else if (ac.type != ApiConfig.Type.CALLBACK) { - if (ac.type != ApiConfig.Type.REVERSE_PROXY) { - bs = ac.backendService; + } else { + if (ac.type != ApiConfig.Type.CALLBACK) { + if (ac.type != ApiConfig.Type.REVERSE_PROXY) { + bs = ac.backendService; + } + if (ac.type != ApiConfig.Type.DUBBO) { + bp = ac.transform(WebUtils.getClientReqPath(exchange)); + } } - bp = ac.transform(WebUtils.getClientReqPath(exchange)); } if (bs != null) { WebUtils.setBackendService(exchange, bs); diff --git a/fizz-core/src/main/java/we/filter/RouteFilter.java b/fizz-core/src/main/java/we/filter/RouteFilter.java index 45732ea..dc2e302 100644 --- a/fizz-core/src/main/java/we/filter/RouteFilter.java +++ b/fizz-core/src/main/java/we/filter/RouteFilter.java @@ -21,6 +21,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpRequest; @@ -28,21 +30,24 @@ import org.springframework.http.server.reactive.ServerHttpResponse; import org.springframework.stereotype.Component; import org.springframework.web.reactive.function.BodyExtractors; import org.springframework.web.reactive.function.client.ClientResponse; -import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; +import we.constants.CommonConstants; import we.flume.clients.log4j2appender.LogService; import we.legacy.RespEntity; import we.plugin.auth.ApiConfig; import we.proxy.FizzWebClient; +import we.proxy.dubbo.ApacheDubboGenericService; +import we.proxy.dubbo.DubboInterfaceDeclaration; import we.util.Constants; +import we.util.JacksonUtils; import we.util.ThreadContext; import we.util.WebUtils; import javax.annotation.Resource; -import java.util.List; -import java.util.Map; +import java.nio.charset.StandardCharsets; +import java.util.*; import java.util.function.Function; /** @@ -58,6 +63,9 @@ public class RouteFilter extends FizzWebFilter { @Resource private FizzWebClient fizzWebClient; + @Resource + private ApacheDubboGenericService dubboGenericService; + @Override public Mono doFilter(ServerWebExchange exchange, WebFilterChain chain) { @@ -84,11 +92,14 @@ public class RouteFilter extends FizzWebFilter { private Mono doFilter0(ServerWebExchange exchange, WebFilterChain chain) { - ServerHttpRequest clientReq = exchange.getRequest(); - HttpHeaders hdrs = WebUtils.mergeAppendHeaders(exchange); - - String rid = clientReq.getId(); + ServerHttpRequest req = exchange.getRequest(); + String rid = req.getId(); ApiConfig ac = WebUtils.getApiConfig(exchange); + HttpHeaders hdrs = null; + if (ac.type != ApiConfig.Type.DUBBO) { + hdrs = WebUtils.mergeAppendHeaders(exchange); + } + if (ac == null) { String pathQuery = WebUtils.getClientReqPathQuery(exchange); return send(exchange, WebUtils.getClientService(exchange), pathQuery, hdrs); @@ -101,7 +112,10 @@ public class RouteFilter extends FizzWebFilter { String uri = ThreadContext.getStringBuilder().append(ac.getNextHttpHostPort()) .append(WebUtils.appendQuery(WebUtils.getBackendPath(exchange), exchange)) .toString(); - return fizzWebClient.send(rid, clientReq.getMethod(), uri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange)); + return fizzWebClient.send(rid, req.getMethod(), uri, hdrs, req.getBody()).flatMap(genServerResponse(exchange)); + + } else if (ac.type == ApiConfig.Type.DUBBO) { + return dubboRpc(exchange, ac); } else { String err = "cant handle api config type " + ac.type; @@ -114,7 +128,7 @@ public class RouteFilter extends FizzWebFilter { private Mono send(ServerWebExchange exchange, String service, String relativeUri, HttpHeaders hdrs) { ServerHttpRequest clientReq = exchange.getRequest(); - return fizzWebClient.proxySend2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange)); + return fizzWebClient.send2service(clientReq.getId(), clientReq.getMethod(), service, relativeUri, hdrs, clientReq.getBody()).flatMap(genServerResponse(exchange)); } private Function> genServerResponse(ServerWebExchange exchange) { @@ -155,4 +169,66 @@ public class RouteFilter extends FizzWebFilter { clientResponse.bodyToMono(Void.class).subscribe(); } } + + private Mono dubboRpc(ServerWebExchange exchange, ApiConfig ac) { + DataBuffer[] body = {null}; + return DataBufferUtils.join(exchange.getRequest().getBody()).defaultIfEmpty(WebUtils.EMPTY_BODY) + .flatMap( + b -> { + HashMap parameters = null; + if (b != WebUtils.EMPTY_BODY) { + body[0] = b; + String json = body[0].toString(StandardCharsets.UTF_8).trim(); + if (json.charAt(0) == '[') { + ArrayList lst = JacksonUtils.readValue(json, ArrayList.class); + parameters = new HashMap<>(); + for (int i = 0; i < lst.size(); i++) { + parameters.put("p" + (i + 1), lst.get(i)); + } + } else { + parameters = JacksonUtils.readValue(json, HashMap.class); + } + } + + DubboInterfaceDeclaration declaration = new DubboInterfaceDeclaration(); + declaration.setServiceName(ac.backendService); + declaration.setVersion(ac.rpcVersion); + declaration.setGroup(ac.rpcGroup); + declaration.setMethod(ac.rpcMethod); + declaration.setParameterTypes(ac.rpcParamTypes); + int t = 20_000; + if (ac.timeout != 0) { + t = (int) ac.timeout; + } + declaration.setTimeout(t); + + Map attachments = Collections.singletonMap(CommonConstants.HEADER_TRACE_ID, WebUtils.getTraceId(exchange)); + return dubboGenericService.send(parameters, declaration, attachments); + } + ) + .flatMap( + dubboRpcResponseBody -> { + Mono m = WebUtils.buildJsonDirectResponse(exchange, HttpStatus.OK, null, JacksonUtils.writeValueAsString(dubboRpcResponseBody)); + return m; + } + ) + .doOnError( + t -> { + StringBuilder b = ThreadContext.getStringBuilder(); + WebUtils.request2stringBuilder(exchange, b); + if (body[0] != null) { + b.append('\n').append(body[0].toString(StandardCharsets.UTF_8)); + } + log.error(b.toString(), LogService.BIZ_ID, exchange.getRequest().getId(), t); + } + ) + .doFinally( + s -> { + if (body[0] != null) { + DataBufferUtils.release(body[0]); + } + } + ) + ; + } } diff --git a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java index 4c71e26..bb2816c 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java @@ -387,8 +387,10 @@ public class RequestInput extends RPCInput implements IInput{ String aggrService = aggrPath.split("\\/")[2]; FizzWebClient client = this.getCurrentApplicationContext().getBean(FizzWebClient.class); - Mono clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url, - headers, body, (long)timeout); + // Mono clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url, + // headers, body, (long)timeout); + + Mono clientResponse = client.send(inputContext.getStepContext().getTraceId(), method, url, headers, body, (long)timeout); return clientResponse.flatMap(cr->{ RequestRPCResponse response = new RequestRPCResponse(); response.setHeaders(cr.headers().asHttpHeaders()); diff --git a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java index e524c48..c99b15d 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java @@ -42,7 +42,7 @@ import we.fizz.input.PathMapping; public class RequestInputConfig extends InputConfig { private URL url ; private String method ; - private int timeout = 3; + private int timeout; private String protocol; /** * Service Type, 1 service discovery, 2 HTTP service diff --git a/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java b/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java index b5bcd86..4d58891 100644 --- a/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java +++ b/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java @@ -25,9 +25,7 @@ import we.plugin.PluginConfig; import we.util.JacksonUtils; import we.util.UrlTransformUtils; -import java.util.Arrays; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -44,6 +42,7 @@ public class ApiConfig { static final byte SERVICE_DISCOVERY = 2; static final byte REVERSE_PROXY = 3; static final byte CALLBACK = 4; + static final byte DUBBO = 5; } public static final int DELETED = 1; @@ -96,6 +95,16 @@ public class ApiConfig { public CallbackConfig callbackConfig; + public String rpcMethod; + + public String rpcParamTypes; + + public String rpcVersion; + + public String rpcGroup; + + public long timeout = 0; + public static boolean isAntPathPattern(String path) { boolean uriVar = false; for (int i = 0; i < path.length(); i++) { diff --git a/fizz-core/src/main/java/we/proxy/CallbackService.java b/fizz-core/src/main/java/we/proxy/CallbackService.java index 76c9174..8d27ca7 100644 --- a/fizz-core/src/main/java/we/proxy/CallbackService.java +++ b/fizz-core/src/main/java/we/proxy/CallbackService.java @@ -93,7 +93,7 @@ public class CallbackService { if (r.type == ApiConfig.Type.SERVICE_DISCOVERY) { ServiceInstance si = service2instMap.get(r.service); if (si == null) { - send = fizzWebClient.proxySend2service(reqId, method, r.service, r.path, headers, body) + send = fizzWebClient.send2service(reqId, method, r.service, r.path, headers, body) .onErrorResume( crError(exchange, r, method, headers, body) ); } else { String uri = buildUri(req, si, r.path); @@ -238,7 +238,7 @@ public class CallbackService { for (ServiceTypePath stp : req.assignServices) { if (stp.type == ApiConfig.Type.SERVICE_DISCOVERY) { - send = fizzWebClient.proxySend2service(req.id, req.method, stp.service, stp.path, req.headers, req.body) + send = fizzWebClient.send2service(req.id, req.method, stp.service, stp.path, req.headers, req.body) .onErrorResume( crError(req, stp.service, stp.path) ); } else { String traceId = CommonConstants.TRACE_ID_PREFIX + req.id; diff --git a/fizz-core/src/main/java/we/proxy/FizzWebClient.java b/fizz-core/src/main/java/we/proxy/FizzWebClient.java index d6fbb72..b788da5 100644 --- a/fizz-core/src/main/java/we/proxy/FizzWebClient.java +++ b/fizz-core/src/main/java/we/proxy/FizzWebClient.java @@ -17,11 +17,9 @@ package we.proxy; -import com.alibaba.nacos.api.config.annotation.NacosValue; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Value; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -34,13 +32,14 @@ import org.springframework.web.reactive.function.client.WebClient; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import we.config.ProxyWebClientConfig; +import we.config.SystemConfig; import we.flume.clients.log4j2appender.LogService; import we.util.Constants; import we.util.ThreadContext; import we.util.WebUtils; -import javax.annotation.PostConstruct; import javax.annotation.Resource; +import java.time.Duration; import java.util.Collections; import java.util.List; @@ -53,122 +52,58 @@ public class FizzWebClient { private static final Logger log = LoggerFactory.getLogger(FizzWebClient.class); - private static final String aggrSend = "$aggrSend"; - private static final String localhost = "localhost"; private static final String host = "Host"; + @Resource + private SystemConfig systemConfig; + @Resource private DiscoveryClientUriSelector discoveryClientUriSelector; @Resource(name = ProxyWebClientConfig.proxyWebClient) private WebClient proxyWebClient; - // @Resource(name = AggrWebClientConfig.aggrWebClient) - // private WebClient aggrWebClient; - - @NacosValue(value = "${fizz-web-client.timeout:-1}") - @Value("${fizz-web-client.timeout:-1}") - private long timeout = -1; - - @PostConstruct - public void afterPropertiesSet() { - if (timeout != -1) { - CallBackendConfig.DEFAULT.timeout = timeout; - } - log.info("fizz web client timeout is " + CallBackendConfig.DEFAULT.timeout); - } - - public Mono aggrSend(String aggrService, HttpMethod aggrMethod, String aggrPath, @Nullable String originReqIdOrBizId, - HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body, @Nullable Long timeout) { - - // ThreadContext.set(aggrSend, Constants.Symbol.EMPTY); // TODO will be remove in future - CallBackendConfig cbc = null; - // if (timeout != null) { - // cbc = new CallBackendConfig(timeout); - // } - return aggrResolveAddressSend(aggrService, aggrMethod, aggrPath, originReqIdOrBizId, method, uriOrSvc, headers, body, cbc); - } - - public Mono aggrSend(String aggrService, HttpMethod aggrMethod, String aggrPath, @Nullable String originReqIdOrBizId, - HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body) { - - // ThreadContext.set(aggrSend, Constants.Symbol.EMPTY); // TODO will be remove in future - return aggrResolveAddressSend(aggrService, aggrMethod, aggrPath, originReqIdOrBizId, method, uriOrSvc, headers, body, null); - } - public Mono send(String reqId, HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body) { - return send(reqId, method, uriOrSvc, headers, body, null); + return send(reqId, method, uriOrSvc, headers, body, 0); } - public Mono send(String reqId, HttpMethod method, String uriOrSvc, HttpHeaders headers, Object body, CallBackendConfig cbc) { + public Mono send(String reqId, HttpMethod method, String uriOrSvc, HttpHeaders headers, Object body, long timeout) { String s = extractServiceOrAddress(uriOrSvc); if (isService(s)) { String path = uriOrSvc.substring(uriOrSvc.indexOf(Constants.Symbol.FORWARD_SLASH, 10)); - return send2service(reqId, method, s, path, headers, body, cbc); + return send2service(reqId, method, s, path, headers, body, timeout); } else { - return send2uri(reqId, method, uriOrSvc, headers, body, cbc); + return send2uri(reqId, method, uriOrSvc, headers, body, timeout); } } - private Mono aggrResolveAddressSend(String aggrService, HttpMethod aggrMethod, String aggrPath, @Nullable String originReqIdOrBizId, - HttpMethod method, String uriOrSvc, @Nullable HttpHeaders headers, @Nullable Object body, @Nullable CallBackendConfig cbc) { + public Mono send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri, + @Nullable HttpHeaders headers, @Nullable Object body) { - return send(originReqIdOrBizId, method, uriOrSvc, headers, body, cbc); + return send2service(clientReqId, method, service, relativeUri, headers, body, 0); } - public Mono proxySend2service(@Nullable String originReqIdOrBizId, HttpMethod method, String service, String relativeUri, - @Nullable HttpHeaders headers, @Nullable Object body) { + public Mono send2service(@Nullable String clientReqId, HttpMethod method, String service, String relativeUri, + @Nullable HttpHeaders headers, @Nullable Object body, long timeout) { - return send2service(originReqIdOrBizId, method, service, relativeUri, headers, body, null); - } - - public Mono send2service(@Nullable String originReqIdOrBizId, HttpMethod method, String service, String relativeUri, - @Nullable HttpHeaders headers, @Nullable Object body, @Nullable CallBackendConfig cbc) { - - // TODO this the future - // if (cbc == null) { - // InstanceInfo inst = roundRobinChoose1instFrom(service); - // String uri = buildUri(inst, relativeUri); - // return send2uri(originReqIdOrBizId, method, uri, headers, body, null); - // } else { - // List insts = eurekaClient.getInstancesByVipAddress(service, false); - // // TODO 据callBackendConfig, 结合insts的实际metric, 从insts中选择合适的一个,转发请求过去 - // } - // what about multiple nginx instance - - // current String uri = discoveryClientUriSelector.getNextUri(service, relativeUri); - return send2uri(originReqIdOrBizId, method, uri, headers, body, cbc); + return send2uri(clientReqId, method, uri, headers, body, timeout); } - public Mono send2uri(@Nullable String originReqIdOrBizId, HttpMethod method, String uri, - @Nullable HttpHeaders headers, @Nullable Object body, @Nullable Long timeout) { - - CallBackendConfig cbc = null; - // if (timeout != null) { - // cbc = new CallBackendConfig(timeout); - // } - return send2uri(originReqIdOrBizId, method, uri, headers, body, cbc); + public Mono send2uri(@Nullable String clientReqId, HttpMethod method, String uri, @Nullable HttpHeaders headers, @Nullable Object body) { + return send2uri(clientReqId, method, uri, headers, body, 0); } - @SuppressWarnings({ "unchecked", "rawtypes" }) - private Mono send2uri(@Nullable String originReqIdOrBizId, HttpMethod method, String uri, - @Nullable HttpHeaders headers, @Nullable Object body, @Nullable CallBackendConfig cbc) { + public Mono send2uri(@Nullable String clientReqId, HttpMethod method, String uri, @Nullable HttpHeaders headers, @Nullable Object body, long timeout) { if (log.isDebugEnabled()) { StringBuilder b = ThreadContext.getStringBuilder(); - WebUtils.request2stringBuilder(originReqIdOrBizId, method, uri, headers, null, b); - log.debug(b.toString(), LogService.BIZ_ID, originReqIdOrBizId); + WebUtils.request2stringBuilder(clientReqId, method, uri, headers, null, b); + log.debug(b.toString(), LogService.BIZ_ID, clientReqId); } - // if (cbc == null) { - // cbc = CallBackendConfig.DEFAULT; - // } - - // TODO remove this, and all event loop share one web client or one event loop one web client in future - // WebClient.RequestBodySpec req = (ThreadContext.remove(aggrSend) == null ? proxyWebClient : aggrWebClient).method(method).uri(uri).headers( WebClient.RequestBodySpec req = proxyWebClient.method(method).uri(uri).headers( hdrs -> { if (headers != null) { @@ -196,27 +131,16 @@ public class FizzWebClient { } } - return req.exchange() - /* - .name(reqId) - .doOnRequest(i -> {}) - .doOnSuccess(r -> {}) - .doOnError( - t -> { - Schedulers.parallel().schedule(() -> { - log.error("", LogService.BIZ_ID, reqId, t); - }); - } - ) - .timeout(Duration.ofMillis(cbc.timeout)) - */ - ; - - // if (log.isDebugEnabled()) { - // rm = rm.log(); - // } - - // TODO 请求完成后,做metric, 以反哺后续的请求转发 + Mono cr = req.exchange(); + if (timeout == 0) { + if (systemConfig.routeTimeout != 0) { + timeout = systemConfig.routeTimeout; + } + } + if (timeout != 0) { + cr = cr.timeout(Duration.ofMillis(timeout)); + } + return cr; } private void setHostHeader(String uri, HttpHeaders headers) { diff --git a/fizz-core/src/main/java/we/proxy/dubbo/ApacheDubboGenericService.java b/fizz-core/src/main/java/we/proxy/dubbo/ApacheDubboGenericService.java index eca1270..c3f2a0e 100644 --- a/fizz-core/src/main/java/we/proxy/dubbo/ApacheDubboGenericService.java +++ b/fizz-core/src/main/java/we/proxy/dubbo/ApacheDubboGenericService.java @@ -87,7 +87,7 @@ public class ApacheDubboGenericService { */ @SuppressWarnings("unchecked") public Mono send(final Map body, final DubboInterfaceDeclaration interfaceDeclaration, - HashMap attachments) { + Map attachments) { RpcContext.getContext().setAttachments(attachments); ReferenceConfig reference = createReferenceConfig(interfaceDeclaration.getServiceName(), diff --git a/fizz-core/src/test/java/we/proxy/CallbackServiceTests.java b/fizz-core/src/test/java/we/proxy/CallbackServiceTests.java index a912e58..0246408 100644 --- a/fizz-core/src/test/java/we/proxy/CallbackServiceTests.java +++ b/fizz-core/src/test/java/we/proxy/CallbackServiceTests.java @@ -76,7 +76,7 @@ public class CallbackServiceTests { ServerHttpRequest req = exchange.getRequest(); String reqId = req.getId(); - when(mockFizzWebClient.proxySend2service(reqId, HttpMethod.GET, "s1", "p1", headers, body)) + when(mockFizzWebClient.send2service(reqId, HttpMethod.GET, "s1", "p1", headers, body)) .thenReturn( Mono.just( ClientResponse.create(HttpStatus.GONE, ExchangeStrategies.withDefaults()) @@ -86,7 +86,7 @@ public class CallbackServiceTests { ) ); - when(mockFizzWebClient.proxySend2service(reqId, HttpMethod.GET, "s2", "p2", headers, body)) + when(mockFizzWebClient.send2service(reqId, HttpMethod.GET, "s2", "p2", headers, body)) .thenReturn( Mono.just( ClientResponse.create(HttpStatus.FOUND, ExchangeStrategies.withDefaults())