Support form-data request in aggregation #207

This commit is contained in:
Francis Dong
2021-06-04 15:08:25 +08:00
committed by dxfeng10
parent 48e9e5575b
commit 5289a84cb4
11 changed files with 294 additions and 65 deletions

View File

@@ -102,7 +102,7 @@ gateway:
prefix: /proxy
aggr:
# set headers when calling the backend API
proxy_set_headers: host,X-Real-IP,X-Forwarded-Proto,X-Forwarded-For
proxy_set_headers: X-Real-IP,X-Forwarded-Proto,X-Forwarded-For
refresh-local-cache:
# initial delay 5 minutes

View File

@@ -23,11 +23,12 @@ import java.util.Map;
import java.util.Map.Entry;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.http.codec.multipart.FormFieldPart;
import org.springframework.http.codec.multipart.Part;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import com.alibaba.fastjson.JSON;
/**
*
* @author Francis Dong
@@ -96,6 +97,108 @@ public class MapUtil {
return mvmap;
}
public static MultiValueMap<String, Object> toMultipartDataMap(Map<String, Object> params) {
MultiValueMap<String, Object> mvmap = new LinkedMultiValueMap<>();
if (params == null || params.isEmpty()) {
return mvmap;
}
for (Entry<String, Object> entry : params.entrySet()) {
Object val = entry.getValue();
List<Object> list = new ArrayList<>();
if (val instanceof List) {
List<Object> vals = (List<Object>) val;
for (Object value : vals) {
if (value != null) {
list.add(value);
}
}
} else {
if (val != null) {
list.add(val.toString());
}
}
if (list.size() > 0) {
mvmap.put(entry.getKey(), list);
}
}
return mvmap;
}
/**
* Extract form data from multipart map exclude file
* @param params
* @param fileKeyPrefix
* @param filePartMap Map
* @return
*/
public static Map<String, Object> extractFormData(MultiValueMap<String, Part> params, String fileKeyPrefix, Map<String, FilePart> filePartMap) {
HashMap<String, Object> m = new HashMap<>();
if (params == null || params.isEmpty()) {
return m;
}
for (Entry<String, List<Part>> entry : params.entrySet()) {
List<Part> val = entry.getValue();
if (val != null && val.size() > 0) {
if (val.size() > 1) {
List<Object> formFieldValues = new ArrayList<>();
val.stream().forEach(part -> {
if (part instanceof FormFieldPart) {
FormFieldPart p = (FormFieldPart) part;
formFieldValues.add(p.value());
} else if (part instanceof FilePart) {
FilePart fp = (FilePart) part;
String k = fileKeyPrefix + UUIDUtil.getUUID() + "-" + fp.filename();
formFieldValues.add(k);
filePartMap.put(k, fp);
}
});
if (formFieldValues.size() > 0) {
m.put(entry.getKey(), formFieldValues);
}
} else {
if (val.get(0) instanceof FormFieldPart) {
FormFieldPart p = (FormFieldPart) val.get(0);
m.put(entry.getKey(), p.value());
} else if (val.get(0) instanceof FilePart) {
FilePart fp = (FilePart) val.get(0);
String k = fileKeyPrefix + UUIDUtil.getUUID() + "-" + fp.filename();
m.put(entry.getKey(), k);
filePartMap.put(k, fp);
}
}
}
}
return m;
}
/**
* Replace file field with FilePart object
* @param params
* @param fileKeyPrefix
* @param filePartMap
*/
public static void replaceWithFilePart(MultiValueMap<String, Object> params, String fileKeyPrefix, Map<String, FilePart> filePartMap) {
if (params == null || params.isEmpty() || filePartMap == null || filePartMap.isEmpty()) {
return;
}
for (Entry<String, List<Object>> entry : params.entrySet()) {
List<Object> list = entry.getValue();
if (list != null && list.size() > 0) {
List<Object> newlist = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
if (list.get(i).toString().startsWith(fileKeyPrefix)) {
newlist.add(filePartMap.get(list.get(i).toString()));
}else {
newlist.add(list.get(i));
}
}
params.put(entry.getKey(), newlist);
}
}
}
public static Map<String, Object> toHashMap(MultiValueMap<String, String> params) {
HashMap<String, Object> m = new HashMap<>();

View File

@@ -0,0 +1,32 @@
/*
* Copyright (C) 2021 the original author or authors.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.util;
import java.util.UUID;
/**
*
* @author Francis Dong
*
*/
public class UUIDUtil {
public static String getUUID() {
return UUID.randomUUID().toString().replaceAll("-", "");
}
}

View File

@@ -17,30 +17,21 @@
package we.config;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import io.netty.channel.ChannelOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.reactive.ReactorResourceFactory;
import org.springframework.http.codec.ServerCodecConfigurer;
import org.springframework.http.codec.multipart.MultipartHttpMessageReader;
import org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader;
import org.springframework.web.reactive.config.EnableWebFlux;
import org.springframework.web.reactive.config.ResourceHandlerRegistry;
import org.springframework.web.reactive.config.WebFluxConfigurer;
import org.springframework.web.reactive.resource.HttpResource;
import reactor.netty.http.HttpResources;
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import java.time.Duration;
import com.alibaba.nacos.api.config.annotation.NacosValue;
/**
* @author hongqiaowei
@@ -52,7 +43,7 @@ import java.time.Duration;
public class WebFluxConfig {
private static final Logger log = LoggerFactory.getLogger(WebFluxConfig.class);
// @NacosValue(value = "${server.connection-pool.max-connections:500}", autoRefreshed = true)
// @Value( "${server.connection-pool.max-connections:500}" )
// private int maxConnections;
@@ -145,11 +136,31 @@ public class WebFluxConfig {
@EnableWebFlux
public static class FizzWebFluxConfigurer implements WebFluxConfigurer {
/**
* Configure the maximum amount of disk space allowed for file parts. Default 100M (104857600)
*/
@NacosValue(value = "${server.fileUpload.maxDiskUsagePerPart:104857600}", autoRefreshed = true)
@Value( "${server.fileUpload.maxDiskUsagePerPart:104857600}" )
private long maxDiskUsagePerPart;
/**
* Maximum parts of multipart form-data, including form field parts; Default -1 no limit
*/
@NacosValue(value = "${server.fileUpload.maxParts:-1}", autoRefreshed = true)
@Value( "${server.fileUpload.maxParts:-1}" )
private int maxParts;
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.defaultCodecs().maxInMemorySize(-1);
SynchronossPartHttpMessageReader partReader = new SynchronossPartHttpMessageReader();
partReader.setMaxParts(maxParts);
partReader.setMaxDiskUsagePerPart(maxDiskUsagePerPart);
MultipartHttpMessageReader multipartReader = new MultipartHttpMessageReader(partReader);
configurer.defaultCodecs().multipartReader(multipartReader);
}
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
registry.addResourceHandler("/*.*")

View File

@@ -77,5 +77,11 @@ public class CommonConstants {
*/
public static final String CONTENT_TYPE_JSON = "application/json; charset=UTF-8";
/**
* File key prefix to identify upload file
*/
public static final String FILE_KEY_PREFIX = "__fizz_file__";
}

View File

@@ -17,9 +17,15 @@
package we.filter;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.annotation.Resource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,12 +36,19 @@ import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.codec.multipart.FilePart;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import io.netty.buffer.UnpooledByteBufAllocator;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
@@ -52,14 +65,6 @@ import we.util.Constants;
import we.util.MapUtil;
import we.util.WebUtils;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
/**
* @author Francis Dong
*/
@@ -153,19 +158,35 @@ public class AggregateFilter implements WebFilter {
clientInput.put("contentType", request.getHeaders().getFirst(CommonConstants.HEADER_CONTENT_TYPE));
Mono<AggregateResult> result = null;
if (HttpMethod.POST.name().equalsIgnoreCase(method)) {
result = DataBufferUtils.join(request.getBody()).defaultIfEmpty(emptyBody).flatMap(buf -> {
if(buf != null && buf != emptyBody) {
try {
clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
} finally {
DataBufferUtils.release(buf);
}
}
MediaType contentType = request.getHeaders().getContentType();
if (MediaType.MULTIPART_FORM_DATA.isCompatibleWith(contentType)) {
result = exchange.getMultipartData().flatMap(md -> {
Map<String, FilePart> filePartMap = new HashMap<>();
clientInput.put("body", MapUtil.extractFormData(md, CommonConstants.FILE_KEY_PREFIX, filePartMap));
clientInput.put("filePartMap", filePartMap);
return pipeline.run(input, clientInput, traceId);
});
} else if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(contentType)) {
result = exchange.getFormData().flatMap(fd -> {
clientInput.put("body", MapUtil.toHashMap(fd));
return pipeline.run(input, clientInput, traceId);
});
} else {
result = pipeline.run(input, clientInput, traceId);
if (HttpMethod.POST.name().equalsIgnoreCase(method)) {
result = DataBufferUtils.join(request.getBody()).defaultIfEmpty(emptyBody).flatMap(buf -> {
if (buf != null && buf != emptyBody) {
try {
clientInput.put("body", buf.toString(StandardCharsets.UTF_8));
} finally {
DataBufferUtils.release(buf);
}
}
return pipeline.run(input, clientInput, traceId);
});
} else {
result = pipeline.run(input, clientInput, traceId);
}
}
return result.subscribeOn(Schedulers.elastic()).flatMap(aggResult -> {
LogService.setBizId(traceId);

View File

@@ -28,6 +28,7 @@ import javax.script.ScriptException;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.data.util.Pair;
import org.springframework.http.HttpHeaders;
import org.springframework.http.codec.multipart.FilePart;
import we.schema.util.I18nUtils;
import org.noear.snack.ONode;
@@ -187,6 +188,7 @@ public class Pipeline {
inputRequest.put("method", clientInput.get("method"));
inputRequest.put("headers", clientInput.get("headers"));
inputRequest.put("params", clientInput.get("params"));
stepContext.addFilePartMap((Map<String, FilePart>) clientInput.get("filePartMap"));
if (CONTENT_TYPE_XML.equals(config.getContentType()) || (StringUtils.isEmpty(config.getContentType())
&& isXmlContentType((String) clientInput.get("contentType")))) {
@@ -202,6 +204,8 @@ public class Pipeline {
}
}
inputRequest.put("body", builder.build().toJson().toMap());
} else if (clientInput.get("body") instanceof Map) {
inputRequest.put("body", clientInput.get("body"));
} else {
inputRequest.put("body", JSON.parse((String) clientInput.get("body")));
}

View File

@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.http.codec.multipart.FilePart;
import we.constants.CommonConstants;
/**
@@ -47,6 +49,8 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
public static final String EXCEPTION_MESSAGE = "exceptionMessage";
public static final String EXCEPTION_STACKS = "exceptionStacks";
public static final String EXCEPTION_DATA = "exceptionData";
private Map<String, FilePart> filePartMap = new HashMap<>();
public void setDebug(Boolean debug) {
this.put((K)DEBUG, (V)debug);
@@ -75,6 +79,24 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
public boolean returnContext() {
return Boolean.valueOf((String)getInputReqHeader(RETURN_CONTEXT));
}
public void addFilePart(String key, FilePart filePart) {
this.filePartMap.put(key, filePart);
}
public void addFilePartMap(Map<String, FilePart> filePartMap) {
if(filePartMap != null && !filePartMap.isEmpty()) {
this.filePartMap.putAll(filePartMap);
}
}
public FilePart getFilePart(String key) {
return this.filePartMap.get(key);
}
public Map<String, FilePart> getFilePartMap() {
return this.filePartMap;
}
/**
* set exception information

View File

@@ -213,7 +213,7 @@ public class PathMapping {
*
* @param ctxNode
* @param path e.g: step1.request1.headers.abc or
* step1.request1.headers.abc|123 (default value seperate by "|")
* step1.request1.headers.abc|123 (default value separate by "|")
* @return
*/
public static Object getValueByPath(ONode ctxNode, String path) {

View File

@@ -17,24 +17,23 @@
package we.fizz.input.extension.request;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.script.ScriptException;
import org.apache.commons.lang3.StringUtils;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.CollectionUtils;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
@@ -47,7 +46,14 @@ import we.constants.CommonConstants;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.StepResponse;
import we.fizz.input.*;
import we.fizz.input.IInput;
import we.fizz.input.InputConfig;
import we.fizz.input.InputContext;
import we.fizz.input.InputType;
import we.fizz.input.PathMapping;
import we.fizz.input.RPCInput;
import we.fizz.input.RPCResponse;
import we.fizz.input.ScriptHelper;
import we.flume.clients.log4j2appender.LogService;
import we.proxy.FizzWebClient;
import we.proxy.http.HttpInstanceService;
@@ -78,6 +84,8 @@ public class RequestInput extends RPCInput implements IInput{
private static final String CONTENT_TYPE_HTML = "text/html";
private static final String CONTENT_TYPE_TEXT = "text/plain";
private static final String CONTENT_TYPE_AUTO = "auto";
private static final String CONTENT_TYPE_MULTIPART_FORM_DATA = "multipart/form-data";
private static final String CONTENT_TYPE_FORM_URLENCODED = "application/x-www-form-urlencoded";
private static final String CONTENT_TYPE = "content-type";
@@ -90,8 +98,6 @@ public class RequestInput extends RPCInput implements IInput{
private static Pattern PATH_VAR_PATTERN = Pattern.compile("(\\{)([^/]*)(\\})");
private static String DEFAULT_VALUE_SEPERATOR = "|";
public InputType getType() {
return type;
}
@@ -158,9 +164,14 @@ public class RequestInput extends RPCInput implements IInput{
}
// body
boolean supportMultiLevels = true;
if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType) ||
CONTENT_TYPE_FORM_URLENCODED.equals(reqContentType)) {
supportMultiLevels = false;
}
Map<String,Object> body = PathMapping.transform(ctxNode, stepContext,
(Map<String, Object>) requestMapping.get("fixedBody"),
(Map<String, Object>) requestMapping.get("body"));
(Map<String, Object>) requestMapping.get("body"), supportMultiLevels);
if (body.containsKey(CommonConstants.WILDCARD_TILDE)) {
request.put("body", body.get(CommonConstants.WILDCARD_TILDE));
} else {
@@ -316,7 +327,6 @@ public class RequestInput extends RPCInput implements IInput{
HttpMethod method = HttpMethod.valueOf(config.getMethod());
String url = (String) request.get("url");
String body = JSON.toJSONString(request.get("body"));
Map<String, Object> hds = (Map<String, Object>) request.get("headers");
if (hds == null) {
@@ -324,10 +334,14 @@ public class RequestInput extends RPCInput implements IInput{
}
HttpHeaders headers = MapUtil.toHttpHeaders(hds);
// default content-type
if (!headers.containsKey(CommonConstants.HEADER_CONTENT_TYPE)) {
// default content-type
if (CONTENT_TYPE_XML.equals(reqContentType) || CONTENT_TYPE_TEXT_XML.equals(reqContentType)) {
headers.add(CommonConstants.HEADER_CONTENT_TYPE, CONTENT_TYPE_XML);
} else if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType)) {
headers.add(CommonConstants.HEADER_CONTENT_TYPE, CONTENT_TYPE_MULTIPART_FORM_DATA);
} else if (CONTENT_TYPE_FORM_URLENCODED.equals(reqContentType)) {
headers.add(CommonConstants.HEADER_CONTENT_TYPE, CONTENT_TYPE_FORM_URLENCODED);
} else {
headers.add(CommonConstants.HEADER_CONTENT_TYPE, CommonConstants.CONTENT_TYPE_JSON);
}
@@ -343,23 +357,35 @@ public class RequestInput extends RPCInput implements IInput{
headers.remove(CommonConstants.HEADER_CONTENT_LENGTH);
headers.add(CommonConstants.HEADER_TRACE_ID, inputContext.getStepContext().getTraceId());
request.put("headers", MapUtil.headerToHashMap(headers));
// convert JSON to XML if it is XML content type
Object body = null;
if (CONTENT_TYPE_XML.equals(reqContentType) || CONTENT_TYPE_TEXT_XML.equals(reqContentType)) {
// convert JSON to XML if it is XML content type
request.put("jsonBody", request.get("body"));
LOGGER.info("jsonBody={}", JSON.toJSONString(request.get("body")));
JsonToXml jsonToXml = new JsonToXml.Builder(body).build();
String jsonStr = JSON.toJSONString(request.get("body"));
LOGGER.info("jsonBody={}", jsonStr);
JsonToXml jsonToXml = new JsonToXml.Builder(jsonStr).build();
body = jsonToXml.toString();
request.put("body", body);
LOGGER.info("body={}", body);
LOGGER.info("headers={}", JSON.toJSONString(headers));
} else if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType)) {
MultiValueMap<String, Object> mpDataMap = MapUtil
.toMultipartDataMap((Map<String, Object>) request.get("body"));
MapUtil.replaceWithFilePart(mpDataMap, CommonConstants.FILE_KEY_PREFIX,
inputContext.getStepContext().getFilePartMap());
body = BodyInserters.fromMultipartData(mpDataMap);
} else if (CONTENT_TYPE_FORM_URLENCODED.equals(reqContentType)) {
body = BodyInserters.fromFormData(MapUtil.toMultiValueMap((Map<String, Object>) request.get("body")));
} else {
body = JSON.toJSONString(request.get("body"));
}
HttpMethod aggrMethod = HttpMethod.valueOf(inputContext.getStepContext().getInputReqAttr("method").toString());
String aggrPath = (String)inputContext.getStepContext().getInputReqAttr("path");
String aggrService = aggrPath.split("\\/")[2];
// FizzWebClient client = FizzAppContext.appContext.getBean(FizzWebClient.class);
FizzWebClient client = this.getCurrentApplicationContext().getBean(FizzWebClient.class);
Mono<ClientResponse> clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url,
headers, body, (long)timeout);
@@ -370,8 +396,6 @@ public class RequestInput extends RPCInput implements IInput{
response.setStatus(cr.statusCode());
return Mono.just(response);
});
}
private Map<String, Object> getResponses(Map<String, StepResponse> stepContext2) {
@@ -467,12 +491,14 @@ public class RequestInput extends RPCInput implements IInput{
}
}
@SuppressWarnings("unused")
private void cleanup(ClientResponse clientResponse) {
if (clientResponse != null) {
clientResponse.bodyToMono(Void.class).subscribe();
}
}
@SuppressWarnings("rawtypes")
public static Class inputConfigClass (){
return RequestInputConfig.class;
}

View File

@@ -27,6 +27,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.WebClient;
@@ -152,7 +153,8 @@ public class FizzWebClient {
return send2uri(originReqIdOrBizId, method, uri, headers, body, cbc);
}
private Mono<ClientResponse> send2uri(@Nullable String originReqIdOrBizId, HttpMethod method, String uri,
@SuppressWarnings({ "unchecked", "rawtypes" })
private Mono<ClientResponse> send2uri(@Nullable String originReqIdOrBizId, HttpMethod method, String uri,
@Nullable HttpHeaders headers, @Nullable Object body, @Nullable CallBackendConfig cbc) {
if (log.isDebugEnabled()) {
@@ -181,15 +183,17 @@ public class FizzWebClient {
);
if (body != null) {
if (body instanceof Flux) {
Flux<DataBuffer> db = (Flux<DataBuffer>) body;
req.body(BodyInserters.fromDataBuffers(db));
} else if (body instanceof String) {
String s = (String) body;
req.body(Mono.just(s), String.class);
} else {
req.bodyValue(body);
}
if (body instanceof BodyInserter) {
req.body((BodyInserter) body);
} else if (body instanceof Flux) {
Flux<DataBuffer> db = (Flux<DataBuffer>) body;
req.body(BodyInserters.fromDataBuffers(db));
} else if (body instanceof String) {
String s = (String) body;
req.body(Mono.just(s), String.class);
} else {
req.bodyValue(body);
}
}
return req.exchange()