From 5289a84cb416e77dfe695b4092d7d7e47e901153 Mon Sep 17 00:00:00 2001 From: Francis Dong Date: Fri, 4 Jun 2021 15:08:25 +0800 Subject: [PATCH] Support form-data request in aggregation #207 --- .../src/main/resources/application.yml | 2 +- .../src/main/java/we/util/MapUtil.java | 107 +++++++++++++++++- .../src/main/java/we/util/UUIDUtil.java | 32 ++++++ .../main/java/we/config/WebFluxConfig.java | 37 +++--- .../java/we/constants/CommonConstants.java | 6 + .../main/java/we/filter/AggregateFilter.java | 63 +++++++---- fizz-core/src/main/java/we/fizz/Pipeline.java | 4 + .../src/main/java/we/fizz/StepContext.java | 22 ++++ .../main/java/we/fizz/input/PathMapping.java | 2 +- .../input/extension/request/RequestInput.java | 60 +++++++--- .../src/main/java/we/proxy/FizzWebClient.java | 24 ++-- 11 files changed, 294 insertions(+), 65 deletions(-) create mode 100644 fizz-common/src/main/java/we/util/UUIDUtil.java diff --git a/fizz-bootstrap/src/main/resources/application.yml b/fizz-bootstrap/src/main/resources/application.yml index b3a1dc1..84cf88d 100644 --- a/fizz-bootstrap/src/main/resources/application.yml +++ b/fizz-bootstrap/src/main/resources/application.yml @@ -102,7 +102,7 @@ gateway: prefix: /proxy aggr: # set headers when calling the backend API - proxy_set_headers: host,X-Real-IP,X-Forwarded-Proto,X-Forwarded-For + proxy_set_headers: X-Real-IP,X-Forwarded-Proto,X-Forwarded-For refresh-local-cache: # initial delay 5 minutes diff --git a/fizz-common/src/main/java/we/util/MapUtil.java b/fizz-common/src/main/java/we/util/MapUtil.java index ef88445..5d3ea5b 100644 --- a/fizz-common/src/main/java/we/util/MapUtil.java +++ b/fizz-common/src/main/java/we/util/MapUtil.java @@ -23,11 +23,12 @@ import java.util.Map; import java.util.Map.Entry; import org.springframework.http.HttpHeaders; +import org.springframework.http.codec.multipart.FilePart; +import org.springframework.http.codec.multipart.FormFieldPart; +import org.springframework.http.codec.multipart.Part; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; -import com.alibaba.fastjson.JSON; - /** * * @author Francis Dong @@ -96,6 +97,108 @@ public class MapUtil { return mvmap; } + public static MultiValueMap toMultipartDataMap(Map params) { + MultiValueMap mvmap = new LinkedMultiValueMap<>(); + if (params == null || params.isEmpty()) { + return mvmap; + } + for (Entry entry : params.entrySet()) { + Object val = entry.getValue(); + List list = new ArrayList<>(); + if (val instanceof List) { + List vals = (List) val; + for (Object value : vals) { + if (value != null) { + list.add(value); + } + } + } else { + if (val != null) { + list.add(val.toString()); + } + } + if (list.size() > 0) { + mvmap.put(entry.getKey(), list); + } + } + return mvmap; + } + + + /** + * Extract form data from multipart map exclude file + * @param params + * @param fileKeyPrefix + * @param filePartMap Map + * @return + */ + public static Map extractFormData(MultiValueMap params, String fileKeyPrefix, Map filePartMap) { + HashMap m = new HashMap<>(); + if (params == null || params.isEmpty()) { + return m; + } + for (Entry> entry : params.entrySet()) { + List val = entry.getValue(); + if (val != null && val.size() > 0) { + if (val.size() > 1) { + List formFieldValues = new ArrayList<>(); + val.stream().forEach(part -> { + if (part instanceof FormFieldPart) { + FormFieldPart p = (FormFieldPart) part; + formFieldValues.add(p.value()); + } else if (part instanceof FilePart) { + FilePart fp = (FilePart) part; + String k = fileKeyPrefix + UUIDUtil.getUUID() + "-" + fp.filename(); + formFieldValues.add(k); + filePartMap.put(k, fp); + } + }); + if (formFieldValues.size() > 0) { + m.put(entry.getKey(), formFieldValues); + } + } else { + if (val.get(0) instanceof FormFieldPart) { + FormFieldPart p = (FormFieldPart) val.get(0); + m.put(entry.getKey(), p.value()); + } else if (val.get(0) instanceof FilePart) { + FilePart fp = (FilePart) val.get(0); + String k = fileKeyPrefix + UUIDUtil.getUUID() + "-" + fp.filename(); + m.put(entry.getKey(), k); + filePartMap.put(k, fp); + } + } + } + } + return m; + } + + /** + * Replace file field with FilePart object + * @param params + * @param fileKeyPrefix + * @param filePartMap + */ + public static void replaceWithFilePart(MultiValueMap params, String fileKeyPrefix, Map filePartMap) { + if (params == null || params.isEmpty() || filePartMap == null || filePartMap.isEmpty()) { + return; + } + + for (Entry> entry : params.entrySet()) { + List list = entry.getValue(); + if (list != null && list.size() > 0) { + List newlist = new ArrayList<>(); + for (int i = 0; i < list.size(); i++) { + if (list.get(i).toString().startsWith(fileKeyPrefix)) { + newlist.add(filePartMap.get(list.get(i).toString())); + }else { + newlist.add(list.get(i)); + } + } + params.put(entry.getKey(), newlist); + } + } + } + public static Map toHashMap(MultiValueMap params) { HashMap m = new HashMap<>(); diff --git a/fizz-common/src/main/java/we/util/UUIDUtil.java b/fizz-common/src/main/java/we/util/UUIDUtil.java new file mode 100644 index 0000000..7a5cec3 --- /dev/null +++ b/fizz-common/src/main/java/we/util/UUIDUtil.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2021 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package we.util; + +import java.util.UUID; + +/** + * + * @author Francis Dong + * + */ +public class UUIDUtil { + + public static String getUUID() { + return UUID.randomUUID().toString().replaceAll("-", ""); + } + +} diff --git a/fizz-core/src/main/java/we/config/WebFluxConfig.java b/fizz-core/src/main/java/we/config/WebFluxConfig.java index d117b7a..70a61aa 100644 --- a/fizz-core/src/main/java/we/config/WebFluxConfig.java +++ b/fizz-core/src/main/java/we/config/WebFluxConfig.java @@ -17,30 +17,21 @@ package we.config; -import com.alibaba.nacos.api.config.annotation.NacosValue; -import io.netty.channel.ChannelOption; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.web.ServerProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; -import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.http.client.reactive.ReactorResourceFactory; import org.springframework.http.codec.ServerCodecConfigurer; +import org.springframework.http.codec.multipart.MultipartHttpMessageReader; +import org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader; import org.springframework.web.reactive.config.EnableWebFlux; import org.springframework.web.reactive.config.ResourceHandlerRegistry; import org.springframework.web.reactive.config.WebFluxConfigurer; -import org.springframework.web.reactive.resource.HttpResource; -import reactor.netty.http.HttpResources; -import reactor.netty.resources.ConnectionProvider; -import reactor.netty.resources.LoopResources; - -import java.time.Duration; +import com.alibaba.nacos.api.config.annotation.NacosValue; /** * @author hongqiaowei @@ -52,7 +43,7 @@ import java.time.Duration; public class WebFluxConfig { private static final Logger log = LoggerFactory.getLogger(WebFluxConfig.class); - + // @NacosValue(value = "${server.connection-pool.max-connections:500}", autoRefreshed = true) // @Value( "${server.connection-pool.max-connections:500}" ) // private int maxConnections; @@ -145,11 +136,31 @@ public class WebFluxConfig { @EnableWebFlux public static class FizzWebFluxConfigurer implements WebFluxConfigurer { + /** + * Configure the maximum amount of disk space allowed for file parts. Default 100M (104857600) + */ + @NacosValue(value = "${server.fileUpload.maxDiskUsagePerPart:104857600}", autoRefreshed = true) + @Value( "${server.fileUpload.maxDiskUsagePerPart:104857600}" ) + private long maxDiskUsagePerPart; + + /** + * Maximum parts of multipart form-data, including form field parts; Default -1 no limit + */ + @NacosValue(value = "${server.fileUpload.maxParts:-1}", autoRefreshed = true) + @Value( "${server.fileUpload.maxParts:-1}" ) + private int maxParts; + @Override public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) { configurer.defaultCodecs().maxInMemorySize(-1); + SynchronossPartHttpMessageReader partReader = new SynchronossPartHttpMessageReader(); + partReader.setMaxParts(maxParts); + partReader.setMaxDiskUsagePerPart(maxDiskUsagePerPart); + MultipartHttpMessageReader multipartReader = new MultipartHttpMessageReader(partReader); + configurer.defaultCodecs().multipartReader(multipartReader); } + @Override public void addResourceHandlers(ResourceHandlerRegistry registry) { registry.addResourceHandler("/*.*") diff --git a/fizz-core/src/main/java/we/constants/CommonConstants.java b/fizz-core/src/main/java/we/constants/CommonConstants.java index 6c5f127..e389199 100644 --- a/fizz-core/src/main/java/we/constants/CommonConstants.java +++ b/fizz-core/src/main/java/we/constants/CommonConstants.java @@ -77,5 +77,11 @@ public class CommonConstants { */ public static final String CONTENT_TYPE_JSON = "application/json; charset=UTF-8"; + + /** + * File key prefix to identify upload file + */ + public static final String FILE_KEY_PREFIX = "__fizz_file__"; + } diff --git a/fizz-core/src/main/java/we/filter/AggregateFilter.java b/fizz-core/src/main/java/we/filter/AggregateFilter.java index 7b70708..85ae256 100644 --- a/fizz-core/src/main/java/we/filter/AggregateFilter.java +++ b/fizz-core/src/main/java/we/filter/AggregateFilter.java @@ -17,9 +17,15 @@ package we.filter; -import com.alibaba.fastjson.JSON; -import com.alibaba.nacos.api.config.annotation.NacosValue; -import io.netty.buffer.UnpooledByteBufAllocator; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import javax.annotation.Resource; + import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,12 +36,19 @@ import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.NettyDataBufferFactory; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.http.codec.multipart.FilePart; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import org.springframework.web.server.WebFilter; import org.springframework.web.server.WebFilterChain; + +import com.alibaba.fastjson.JSON; +import com.alibaba.nacos.api.config.annotation.NacosValue; + +import io.netty.buffer.UnpooledByteBufAllocator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -52,14 +65,6 @@ import we.util.Constants; import we.util.MapUtil; import we.util.WebUtils; -import javax.annotation.Resource; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - /** * @author Francis Dong */ @@ -153,19 +158,35 @@ public class AggregateFilter implements WebFilter { clientInput.put("contentType", request.getHeaders().getFirst(CommonConstants.HEADER_CONTENT_TYPE)); Mono result = null; - if (HttpMethod.POST.name().equalsIgnoreCase(method)) { - result = DataBufferUtils.join(request.getBody()).defaultIfEmpty(emptyBody).flatMap(buf -> { - if(buf != null && buf != emptyBody) { - try { - clientInput.put("body", buf.toString(StandardCharsets.UTF_8)); - } finally { - DataBufferUtils.release(buf); - } - } + MediaType contentType = request.getHeaders().getContentType(); + + if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) { + result = exchange.getMultipartData().flatMap(md -> { + Map filePartMap = new HashMap<>(); + clientInput.put("body", MapUtil.extractFormData(md, CommonConstants.FILE_KEY_PREFIX, filePartMap)); + clientInput.put("filePartMap", filePartMap); + return pipeline.run(input, clientInput, traceId); + }); + } else if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) { + result = exchange.getFormData().flatMap(fd -> { + clientInput.put("body", MapUtil.toHashMap(fd)); return pipeline.run(input, clientInput, traceId); }); } else { - result = pipeline.run(input, clientInput, traceId); + if (HttpMethod.POST.name().equalsIgnoreCase(method)) { + result = DataBufferUtils.join(request.getBody()).defaultIfEmpty(emptyBody).flatMap(buf -> { + if (buf != null && buf != emptyBody) { + try { + clientInput.put("body", buf.toString(StandardCharsets.UTF_8)); + } finally { + DataBufferUtils.release(buf); + } + } + return pipeline.run(input, clientInput, traceId); + }); + } else { + result = pipeline.run(input, clientInput, traceId); + } } return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> { LogService.setBizId(traceId); diff --git a/fizz-core/src/main/java/we/fizz/Pipeline.java b/fizz-core/src/main/java/we/fizz/Pipeline.java index d1f74c5..274a2e5 100644 --- a/fizz-core/src/main/java/we/fizz/Pipeline.java +++ b/fizz-core/src/main/java/we/fizz/Pipeline.java @@ -28,6 +28,7 @@ import javax.script.ScriptException; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.data.util.Pair; import org.springframework.http.HttpHeaders; +import org.springframework.http.codec.multipart.FilePart; import we.schema.util.I18nUtils; import org.noear.snack.ONode; @@ -187,6 +188,7 @@ public class Pipeline { inputRequest.put("method", clientInput.get("method")); inputRequest.put("headers", clientInput.get("headers")); inputRequest.put("params", clientInput.get("params")); + stepContext.addFilePartMap((Map) clientInput.get("filePartMap")); if (CONTENT_TYPE_XML.equals(config.getContentType()) || (StringUtils.isEmpty(config.getContentType()) && isXmlContentType((String) clientInput.get("contentType")))) { @@ -202,6 +204,8 @@ public class Pipeline { } } inputRequest.put("body", builder.build().toJson().toMap()); + } else if (clientInput.get("body") instanceof Map) { + inputRequest.put("body", clientInput.get("body")); } else { inputRequest.put("body", JSON.parse((String) clientInput.get("body"))); } diff --git a/fizz-core/src/main/java/we/fizz/StepContext.java b/fizz-core/src/main/java/we/fizz/StepContext.java index 16f6515..4684985 100644 --- a/fizz-core/src/main/java/we/fizz/StepContext.java +++ b/fizz-core/src/main/java/we/fizz/StepContext.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.http.codec.multipart.FilePart; + import we.constants.CommonConstants; /** @@ -47,6 +49,8 @@ public class StepContext extends ConcurrentHashMap { public static final String EXCEPTION_MESSAGE = "exceptionMessage"; public static final String EXCEPTION_STACKS = "exceptionStacks"; public static final String EXCEPTION_DATA = "exceptionData"; + + private Map filePartMap = new HashMap<>(); public void setDebug(Boolean debug) { this.put((K)DEBUG, (V)debug); @@ -75,6 +79,24 @@ public class StepContext extends ConcurrentHashMap { public boolean returnContext() { return Boolean.valueOf((String)getInputReqHeader(RETURN_CONTEXT)); } + + public void addFilePart(String key, FilePart filePart) { + this.filePartMap.put(key, filePart); + } + + public void addFilePartMap(Map filePartMap) { + if(filePartMap != null && !filePartMap.isEmpty()) { + this.filePartMap.putAll(filePartMap); + } + } + + public FilePart getFilePart(String key) { + return this.filePartMap.get(key); + } + + public Map getFilePartMap() { + return this.filePartMap; + } /** * set exception information diff --git a/fizz-core/src/main/java/we/fizz/input/PathMapping.java b/fizz-core/src/main/java/we/fizz/input/PathMapping.java index 2f9cc65..6f9d46d 100644 --- a/fizz-core/src/main/java/we/fizz/input/PathMapping.java +++ b/fizz-core/src/main/java/we/fizz/input/PathMapping.java @@ -213,7 +213,7 @@ public class PathMapping { * * @param ctxNode * @param path e.g: step1.request1.headers.abc or - * step1.request1.headers.abc|123 (default value seperate by "|") + * step1.request1.headers.abc|123 (default value separate by "|") * @return */ public static Object getValueByPath(ONode ctxNode, String path) { diff --git a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java index ece53ab..4c71e26 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java @@ -17,24 +17,23 @@ package we.fizz.input.extension.request; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - import javax.script.ScriptException; import org.apache.commons.lang3.StringUtils; import org.noear.snack.ONode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.util.CollectionUtils; +import org.springframework.util.MultiValueMap; +import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; @@ -47,7 +46,14 @@ import we.constants.CommonConstants; import we.exception.ExecuteScriptException; import we.fizz.StepContext; import we.fizz.StepResponse; -import we.fizz.input.*; +import we.fizz.input.IInput; +import we.fizz.input.InputConfig; +import we.fizz.input.InputContext; +import we.fizz.input.InputType; +import we.fizz.input.PathMapping; +import we.fizz.input.RPCInput; +import we.fizz.input.RPCResponse; +import we.fizz.input.ScriptHelper; import we.flume.clients.log4j2appender.LogService; import we.proxy.FizzWebClient; import we.proxy.http.HttpInstanceService; @@ -78,6 +84,8 @@ public class RequestInput extends RPCInput implements IInput{ private static final String CONTENT_TYPE_HTML = "text/html"; private static final String CONTENT_TYPE_TEXT = "text/plain"; private static final String CONTENT_TYPE_AUTO = "auto"; + private static final String CONTENT_TYPE_MULTIPART_FORM_DATA = "multipart/form-data"; + private static final String CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded"; private static final String CONTENT_TYPE = "content-type"; @@ -90,8 +98,6 @@ public class RequestInput extends RPCInput implements IInput{ private static Pattern PATH_VAR_PATTERN = Pattern.compile("(\\{)([^/]*)(\\})"); - private static String DEFAULT_VALUE_SEPERATOR = "|"; - public InputType getType() { return type; } @@ -158,9 +164,14 @@ public class RequestInput extends RPCInput implements IInput{ } // body + boolean supportMultiLevels = true; + if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType) || + CONTENT_TYPE_FORM_URLENCODED.equals(reqContentType)) { + supportMultiLevels = false; + } Map body = PathMapping.transform(ctxNode, stepContext, (Map) requestMapping.get("fixedBody"), - (Map) requestMapping.get("body")); + (Map) requestMapping.get("body"), supportMultiLevels); if (body.containsKey(CommonConstants.WILDCARD_TILDE)) { request.put("body", body.get(CommonConstants.WILDCARD_TILDE)); } else { @@ -316,7 +327,6 @@ public class RequestInput extends RPCInput implements IInput{ HttpMethod method = HttpMethod.valueOf(config.getMethod()); String url = (String) request.get("url"); - String body = JSON.toJSONString(request.get("body")); Map hds = (Map) request.get("headers"); if (hds == null) { @@ -324,10 +334,14 @@ public class RequestInput extends RPCInput implements IInput{ } HttpHeaders headers = MapUtil.toHttpHeaders(hds); + // default content-type if (!headers.containsKey(CommonConstants.HEADER_CONTENT_TYPE)) { - // default content-type if (CONTENT_TYPE_XML.equals(reqContentType) || CONTENT_TYPE_TEXT_XML.equals(reqContentType)) { headers.add(CommonConstants.HEADER_CONTENT_TYPE, CONTENT_TYPE_XML); + } else if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType)) { + headers.add(CommonConstants.HEADER_CONTENT_TYPE, CONTENT_TYPE_MULTIPART_FORM_DATA); + } else if (CONTENT_TYPE_FORM_URLENCODED.equals(reqContentType)) { + headers.add(CommonConstants.HEADER_CONTENT_TYPE, CONTENT_TYPE_FORM_URLENCODED); } else { headers.add(CommonConstants.HEADER_CONTENT_TYPE, CommonConstants.CONTENT_TYPE_JSON); } @@ -343,23 +357,35 @@ public class RequestInput extends RPCInput implements IInput{ headers.remove(CommonConstants.HEADER_CONTENT_LENGTH); headers.add(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId()); + request.put("headers", MapUtil.headerToHashMap(headers)); - // convert JSON to XML if it is XML content type + Object body = null; if (CONTENT_TYPE_XML.equals(reqContentType) || CONTENT_TYPE_TEXT_XML.equals(reqContentType)) { + // convert JSON to XML if it is XML content type request.put("jsonBody", request.get("body")); - LOGGER.info("jsonBody={}", JSON.toJSONString(request.get("body"))); - JsonToXml jsonToXml = new JsonToXml.Builder(body).build(); + String jsonStr = JSON.toJSONString(request.get("body")); + LOGGER.info("jsonBody={}", jsonStr); + JsonToXml jsonToXml = new JsonToXml.Builder(jsonStr).build(); body = jsonToXml.toString(); request.put("body", body); LOGGER.info("body={}", body); LOGGER.info("headers={}", JSON.toJSONString(headers)); + } else if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType)) { + MultiValueMap mpDataMap = MapUtil + .toMultipartDataMap((Map) request.get("body")); + MapUtil.replaceWithFilePart(mpDataMap, CommonConstants.FILE_KEY_PREFIX, + inputContext.getStepContext().getFilePartMap()); + body = BodyInserters.fromMultipartData(mpDataMap); + } else if (CONTENT_TYPE_FORM_URLENCODED.equals(reqContentType)) { + body = BodyInserters.fromFormData(MapUtil.toMultiValueMap((Map) request.get("body"))); + } else { + body = JSON.toJSONString(request.get("body")); } HttpMethod aggrMethod = HttpMethod.valueOf(inputContext.getStepContext().getInputReqAttr("method").toString()); String aggrPath = (String)inputContext.getStepContext().getInputReqAttr("path"); String aggrService = aggrPath.split("\\/")[2]; -// FizzWebClient client = FizzAppContext.appContext.getBean(FizzWebClient.class); FizzWebClient client = this.getCurrentApplicationContext().getBean(FizzWebClient.class); Mono clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url, headers, body, (long)timeout); @@ -370,8 +396,6 @@ public class RequestInput extends RPCInput implements IInput{ response.setStatus(cr.statusCode()); return Mono.just(response); }); - - } private Map getResponses(Map stepContext2) { @@ -467,12 +491,14 @@ public class RequestInput extends RPCInput implements IInput{ } } + @SuppressWarnings("unused") private void cleanup(ClientResponse clientResponse) { if (clientResponse != null) { clientResponse.bodyToMono(Void.class).subscribe(); } } + @SuppressWarnings("rawtypes") public static Class inputConfigClass (){ return RequestInputConfig.class; } diff --git a/fizz-core/src/main/java/we/proxy/FizzWebClient.java b/fizz-core/src/main/java/we/proxy/FizzWebClient.java index d515608..d6fbb72 100644 --- a/fizz-core/src/main/java/we/proxy/FizzWebClient.java +++ b/fizz-core/src/main/java/we/proxy/FizzWebClient.java @@ -27,6 +27,7 @@ import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.lang.Nullable; import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.BodyInserter; import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; @@ -152,7 +153,8 @@ public class FizzWebClient { return send2uri(originReqIdOrBizId, method, uri, headers, body, cbc); } - private Mono send2uri(@Nullable String originReqIdOrBizId, HttpMethod method, String uri, + @SuppressWarnings({ "unchecked", "rawtypes" }) + private Mono send2uri(@Nullable String originReqIdOrBizId, HttpMethod method, String uri, @Nullable HttpHeaders headers, @Nullable Object body, @Nullable CallBackendConfig cbc) { if (log.isDebugEnabled()) { @@ -181,15 +183,17 @@ public class FizzWebClient { ); if (body != null) { - if (body instanceof Flux) { - Flux db = (Flux) body; - req.body(BodyInserters.fromDataBuffers(db)); - } else if (body instanceof String) { - String s = (String) body; - req.body(Mono.just(s), String.class); - } else { - req.bodyValue(body); - } + if (body instanceof BodyInserter) { + req.body((BodyInserter) body); + } else if (body instanceof Flux) { + Flux db = (Flux) body; + req.body(BodyInserters.fromDataBuffers(db)); + } else if (body instanceof String) { + String s = (String) body; + req.body(Mono.just(s), String.class); + } else { + req.bodyValue(body); + } } return req.exchange()