fix concurrent issue of stepContext #437

This commit is contained in:
Francis Dong
2022-07-05 12:51:52 +08:00
committed by hongqiaowei
parent ee5b406026
commit 0aed5701e9
2 changed files with 176 additions and 149 deletions

View File

@@ -16,22 +16,23 @@
*/ */
package we.fizz.input; package we.fizz.input;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.script.ScriptException;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode; import org.noear.snack.ONode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import we.exception.ExecuteScriptException; import we.exception.ExecuteScriptException;
import we.fizz.StepContext; import we.fizz.StepContext;
import we.util.Consts; import we.flume.clients.log4j2appender.LogService;
import we.util.JacksonUtils; import we.util.JacksonUtils;
import javax.script.ScriptException;
import java.util.HashMap;
import java.util.Map;
/** /**
* *
* @author linwaiwai * @author linwaiwai
@@ -42,8 +43,8 @@ public class RPCInput extends Input {
protected static final Logger LOGGER = LoggerFactory.getLogger(RPCInput.class.getName()); protected static final Logger LOGGER = LoggerFactory.getLogger(RPCInput.class.getName());
protected static final String FALLBACK_MODE_STOP = "stop"; protected static final String FALLBACK_MODE_STOP = "stop";
protected static final String FALLBACK_MODE_CONTINUE = "continue"; protected static final String FALLBACK_MODE_CONTINUE = "continue";
protected Map<String, Object> request = new HashMap<>(); protected Map<String, Object> request = new ConcurrentHashMap<>();
protected Map<String, Object> response = new HashMap<>(); protected Map<String, Object> response = new ConcurrentHashMap<>();
protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) { protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) {
@@ -79,8 +80,7 @@ public class RPCInput extends Input {
Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class); Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class);
return needRun != null ? needRun : Boolean.TRUE; return needRun != null ? needRun : Boolean.TRUE;
} catch (ScriptException e) { } catch (ScriptException e) {
// LogService.setBizId(inputContext.getStepContext().getTraceId()); LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e);
throw new ExecuteScriptException(e, stepContext, condition); throw new ExecuteScriptException(e, stepContext, condition);
} }

View File

@@ -17,9 +17,16 @@
package we.fizz.input.extension.request; package we.fizz.input.extension.request;
import com.alibaba.fastjson.JSON; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.script.ScriptException;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode; import org.noear.snack.ONode;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -31,13 +38,24 @@ import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder; import org.springframework.web.util.UriComponentsBuilder;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import we.config.SystemConfig; import we.config.SystemConfig;
import we.constants.CommonConstants; import we.constants.CommonConstants;
import we.exception.ExecuteScriptException; import we.exception.ExecuteScriptException;
import we.fizz.StepContext; import we.fizz.StepContext;
import we.fizz.StepResponse; import we.fizz.StepResponse;
import we.fizz.input.*; import we.fizz.input.IInput;
import we.fizz.input.InputConfig;
import we.fizz.input.InputContext;
import we.fizz.input.InputType;
import we.fizz.input.PathMapping;
import we.fizz.input.RPCInput;
import we.fizz.input.RPCResponse;
import we.fizz.input.ScriptHelper;
import we.flume.clients.log4j2appender.LogService;
import we.proxy.FizzWebClient; import we.proxy.FizzWebClient;
import we.proxy.http.HttpInstanceService; import we.proxy.http.HttpInstanceService;
import we.service_registry.RegistryCenterService; import we.service_registry.RegistryCenterService;
@@ -49,14 +67,6 @@ import we.xml.JsonToXml;
import we.xml.XmlToJson; import we.xml.XmlToJson;
import we.xml.XmlToJson.Builder; import we.xml.XmlToJson.Builder;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/** /**
* *
* @author linwaiwai * @author linwaiwai
@@ -112,6 +122,8 @@ public class RequestInput extends RPCInput implements IInput{
protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) { protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) {
RequestInputConfig config = (RequestInputConfig) aConfig; RequestInputConfig config = (RequestInputConfig) aConfig;
Map<String, Object> params = new HashMap<>();
synchronized (inputContext.getStepContext()) {
// 把请求信息放入stepContext // 把请求信息放入stepContext
Map<String, Object> group = new HashMap<>(); Map<String, Object> group = new HashMap<>();
group.put("request", request); group.put("request", request);
@@ -121,15 +133,16 @@ public class RequestInput extends RPCInput implements IInput{
HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase()); HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase());
request.put("method", method); request.put("method", method);
Map<String, Object> params = new HashMap<>();
params.putAll(MapUtil.toHashMap(config.getQueryParams())); params.putAll(MapUtil.toHashMap(config.getQueryParams()));
request.put("params", params); request.put("params", params);
}
ONode ctxNode = null; ONode ctxNode = null;
// 数据转换 // 数据转换
if (inputContext != null && inputContext.getStepContext() != null) { if (inputContext != null && inputContext.getStepContext() != null) {
StepContext<String, Object> stepContext = inputContext.getStepContext(); StepContext<String, Object> stepContext = inputContext.getStepContext();
ctxNode = PathMapping.toONode(stepContext); ctxNode = PathMapping.toONode(stepContext);
synchronized (stepContext) {
Map<String, Object> dataMapping = this.getConfig().getDataMapping(); Map<String, Object> dataMapping = this.getConfig().getDataMapping();
if (dataMapping != null) { if (dataMapping != null) {
Map<String, Object> requestMapping = (Map<String, Object>) dataMapping.get("request"); Map<String, Object> requestMapping = (Map<String, Object>) dataMapping.get("request");
@@ -179,8 +192,7 @@ public class RequestInput extends RPCInput implements IInput{
body.putAll((Map<String, Object>) reqBody); body.putAll((Map<String, Object>) reqBody);
} }
} catch (ScriptException e) { } catch (ScriptException e) {
// LogService.setBizId(inputContext.getStepContext().getTraceId()); LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg); throw new ExecuteScriptException(e, stepContext, scriptCfg);
} }
@@ -190,6 +202,7 @@ public class RequestInput extends RPCInput implements IInput{
} }
} }
} }
}
if (config.isNewVersion()) { if (config.isNewVersion()) {
String host = config.getServiceName(); String host = config.getServiceName();
@@ -214,13 +227,17 @@ public class RequestInput extends RPCInput implements IInput{
UriComponents uriComponents = UriComponentsBuilder.fromUriString(sb.toString()) UriComponents uriComponents = UriComponentsBuilder.fromUriString(sb.toString())
.queryParams(MapUtil.toMultiValueMap(params)).build(); .queryParams(MapUtil.toMultiValueMap(params)).build();
synchronized (inputContext.getStepContext()) {
request.put("url", uriComponents.toUriString()); request.put("url", uriComponents.toUriString());
}
} else { } else {
UriComponents uriComponents = UriComponentsBuilder.fromUriString(config.getBaseUrl() + setPathVariable(ctxNode, config.getPath())) UriComponents uriComponents = UriComponentsBuilder.fromUriString(config.getBaseUrl() + setPathVariable(ctxNode, config.getPath()))
.queryParams(MapUtil.toMultiValueMap(params)).build(); .queryParams(MapUtil.toMultiValueMap(params)).build();
synchronized (inputContext.getStepContext()) {
request.put("url", uriComponents.toUriString()); request.put("url", uriComponents.toUriString());
} }
} }
}
private String setPathVariable(ONode ctxNode, String path) { private String setPathVariable(ONode ctxNode, String path) {
if (ctxNode == null || StringUtils.isBlank(path)) { if (ctxNode == null || StringUtils.isBlank(path)) {
@@ -268,14 +285,16 @@ public class RequestInput extends RPCInput implements IInput{
ct = CONTENT_TYPE_JSON; ct = CONTENT_TYPE_JSON;
} }
synchronized (inputContext.getStepContext()) {
response.put("body", this.parseBody(ct, (String)responseBody)); response.put("body", this.parseBody(ct, (String)responseBody));
}
// 数据转换 // 数据转换
if (inputContext != null && inputContext.getStepContext() != null) { if (inputContext != null && inputContext.getStepContext() != null) {
StepContext<String, Object> stepContext = inputContext.getStepContext(); StepContext<String, Object> stepContext = inputContext.getStepContext();
if (!CollectionUtils.isEmpty(responseMapping)) { if (!CollectionUtils.isEmpty(responseMapping)) {
ONode ctxNode = PathMapping.toONode(stepContext); ONode ctxNode = PathMapping.toONode(stepContext);
synchronized (stepContext) {
// headers // headers
Map<String, Object> fixedHeaders = MapUtil.upperCaseKey((Map<String, Object>) responseMapping.get("fixedHeaders")); Map<String, Object> fixedHeaders = MapUtil.upperCaseKey((Map<String, Object>) responseMapping.get("fixedHeaders"));
Map<String, Object> headerMapping = MapUtil.upperCaseKey((Map<String, Object>) responseMapping.get("headers")); Map<String, Object> headerMapping = MapUtil.upperCaseKey((Map<String, Object>) responseMapping.get("headers"));
@@ -312,8 +331,7 @@ public class RequestInput extends RPCInput implements IInput{
body.putAll((Map<String, Object>) respBody); body.putAll((Map<String, Object>) respBody);
} }
} catch (ScriptException e) { } catch (ScriptException e) {
// LogService.setBizId(inputContext.getStepContext().getTraceId()); LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg); throw new ExecuteScriptException(e, stepContext, scriptCfg);
} }
@@ -324,6 +342,7 @@ public class RequestInput extends RPCInput implements IInput{
} }
} }
} }
}
@Override @Override
protected Mono<RPCResponse> getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) { protected Mono<RPCResponse> getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) {
@@ -377,12 +396,16 @@ public class RequestInput extends RPCInput implements IInput{
headers.remove(CommonConstants.HEADER_CONTENT_LENGTH); headers.remove(CommonConstants.HEADER_CONTENT_LENGTH);
headers.add(systemConfig.fizzTraceIdHeader(), inputContext.getStepContext().getTraceId()); headers.add(systemConfig.fizzTraceIdHeader(), inputContext.getStepContext().getTraceId());
synchronized (inputContext.getStepContext()) {
request.put("headers", MapUtil.headerToHashMap(headers)); request.put("headers", MapUtil.headerToHashMap(headers));
}
Object body = null; Object body = null;
if (CONTENT_TYPE_XML.equals(reqContentType) || CONTENT_TYPE_TEXT_XML.equals(reqContentType)) { if (CONTENT_TYPE_XML.equals(reqContentType) || CONTENT_TYPE_TEXT_XML.equals(reqContentType)) {
// convert JSON to XML if it is XML content type // convert JSON to XML if it is XML content type
synchronized (inputContext.getStepContext()) {
request.put("jsonBody", request.get("body")); request.put("jsonBody", request.get("body"));
}
String jsonStr = null; String jsonStr = null;
if (TypeUtils.isBasicType(request.get("body"))) { if (TypeUtils.isBasicType(request.get("body"))) {
jsonStr = request.get("body").toString(); jsonStr = request.get("body").toString();
@@ -396,7 +419,9 @@ public class RequestInput extends RPCInput implements IInput{
} else { } else {
body = jsonStr; body = jsonStr;
} }
synchronized (inputContext.getStepContext()) {
request.put("body", body); request.put("body", body);
}
LOGGER.info("body={}", body); LOGGER.info("body={}", body);
LOGGER.info("headers={}", JSON.toJSONString(headers)); LOGGER.info("headers={}", JSON.toJSONString(headers));
} else if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType)) { } else if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType)) {
@@ -452,6 +477,7 @@ public class RequestInput extends RPCInput implements IInput{
headers.put("ELAPSEDTIME", elapsedMillis + "ms"); headers.put("ELAPSEDTIME", elapsedMillis + "ms");
RequestRPCResponse reqCr = (RequestRPCResponse) cr; RequestRPCResponse reqCr = (RequestRPCResponse) cr;
synchronized (inputContext.getStepContext()) {
if (reqCr.getStatusCode() != null) { if (reqCr.getStatusCode() != null) {
this.response.put("httpStatus", reqCr.getStatusCode().value()); this.response.put("httpStatus", reqCr.getStatusCode().value());
} }
@@ -460,17 +486,19 @@ public class RequestInput extends RPCInput implements IInput{
inputContext.getStepContext().addElapsedTime(prefix + request.get("url"), inputContext.getStepContext().addElapsedTime(prefix + request.get("url"),
elapsedMillis); elapsedMillis);
} }
}
protected Mono<Object> bodyToMono(ClientResponse cr){ protected Mono<Object> bodyToMono(ClientResponse cr){
return cr.bodyToMono(String.class); return cr.bodyToMono(String.class);
} }
protected void doOnBodyError(Throwable ex, long elapsedMillis) { protected void doOnBodyError(Throwable ex, long elapsedMillis) {
// LogService.setBizId(inputContext.getStepContext().getTraceId()); LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", request.get("url"), ex); LOGGER.warn("failed to call {}", request.get("url"), ex);
synchronized (inputContext.getStepContext()) {
inputContext.getStepContext().addElapsedTime( inputContext.getStepContext().addElapsedTime(
stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis); stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis);
} }
}
// Parse response body according to content-type header // Parse response body according to content-type header
public Object parseBody(String contentType, String responseBody) { public Object parseBody(String contentType, String responseBody) {
@@ -526,8 +554,7 @@ public class RequestInput extends RPCInput implements IInput{
protected void doOnBodySuccess(Object resp, long elapsedMillis) { protected void doOnBodySuccess(Object resp, long elapsedMillis) {
if(inputContext.getStepContext().isDebug()) { if(inputContext.getStepContext().isDebug()) {
// LogService.setBizId(inputContext.getStepContext().getTraceId()); LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"), LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"),
JSON.toJSONString(this.request.get("headers")), JSON.toJSONString(this.request.get("headers")),
JSON.toJSONString(this.request.get("body")), resp); JSON.toJSONString(this.request.get("body")), resp);