From 0aed5701e9e1ce2d78aa7b703cc1c89c21977118 Mon Sep 17 00:00:00 2001 From: Francis Dong Date: Tue, 5 Jul 2022 12:51:52 +0800 Subject: [PATCH] fix concurrent issue of stepContext #437 --- .../src/main/java/we/fizz/input/RPCInput.java | 20 +- .../input/extension/request/RequestInput.java | 305 ++++++++++-------- 2 files changed, 176 insertions(+), 149 deletions(-) diff --git a/fizz-core/src/main/java/we/fizz/input/RPCInput.java b/fizz-core/src/main/java/we/fizz/input/RPCInput.java index b1384dc..eeb29fd 100644 --- a/fizz-core/src/main/java/we/fizz/input/RPCInput.java +++ b/fizz-core/src/main/java/we/fizz/input/RPCInput.java @@ -16,22 +16,23 @@ */ package we.fizz.input; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.script.ScriptException; -import org.apache.logging.log4j.ThreadContext; import org.noear.snack.ONode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.CollectionUtils; + import reactor.core.publisher.Mono; import we.exception.ExecuteScriptException; import we.fizz.StepContext; -import we.util.Consts; +import we.flume.clients.log4j2appender.LogService; import we.util.JacksonUtils; -import javax.script.ScriptException; -import java.util.HashMap; -import java.util.Map; - /** * * @author linwaiwai @@ -42,8 +43,8 @@ public class RPCInput extends Input { protected static final Logger LOGGER = LoggerFactory.getLogger(RPCInput.class.getName()); protected static final String FALLBACK_MODE_STOP = "stop"; protected static final String FALLBACK_MODE_CONTINUE = "continue"; - protected Map request = new HashMap<>(); - protected Map response = new HashMap<>(); + protected Map request = new ConcurrentHashMap<>(); + protected Map response = new ConcurrentHashMap<>(); protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) { @@ -79,8 +80,7 @@ public class RPCInput extends Input { Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class); return needRun != null ? needRun : Boolean.TRUE; } catch (ScriptException e) { - // LogService.setBizId(inputContext.getStepContext().getTraceId()); - ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); + LogService.setBizId(inputContext.getStepContext().getTraceId()); LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e); throw new ExecuteScriptException(e, stepContext, condition); } diff --git a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java index 099fb7f..447b8f5 100644 --- a/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java +++ b/fizz-core/src/main/java/we/fizz/input/extension/request/RequestInput.java @@ -17,9 +17,16 @@ package we.fizz.input.extension.request; -import com.alibaba.fastjson.JSON; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.script.ScriptException; + import org.apache.commons.lang3.StringUtils; -import org.apache.logging.log4j.ThreadContext; import org.noear.snack.ONode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,13 +38,24 @@ import org.springframework.web.reactive.function.BodyInserters; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.util.UriComponents; import org.springframework.web.util.UriComponentsBuilder; + +import com.alibaba.fastjson.JSON; + import reactor.core.publisher.Mono; import we.config.SystemConfig; import we.constants.CommonConstants; import we.exception.ExecuteScriptException; import we.fizz.StepContext; import we.fizz.StepResponse; -import we.fizz.input.*; +import we.fizz.input.IInput; +import we.fizz.input.InputConfig; +import we.fizz.input.InputContext; +import we.fizz.input.InputType; +import we.fizz.input.PathMapping; +import we.fizz.input.RPCInput; +import we.fizz.input.RPCResponse; +import we.fizz.input.ScriptHelper; +import we.flume.clients.log4j2appender.LogService; import we.proxy.FizzWebClient; import we.proxy.http.HttpInstanceService; import we.service_registry.RegistryCenterService; @@ -49,14 +67,6 @@ import we.xml.JsonToXml; import we.xml.XmlToJson; import we.xml.XmlToJson.Builder; -import javax.script.ScriptException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - /** * * @author linwaiwai @@ -111,81 +121,84 @@ public class RequestInput extends RPCInput implements IInput{ protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) { RequestInputConfig config = (RequestInputConfig) aConfig; - - // 把请求信息放入stepContext - Map group = new HashMap<>(); - group.put("request", request); - group.put("response", response); - this.stepResponse.addRequest(name, group); - - HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase()); - request.put("method", method); - + Map params = new HashMap<>(); - params.putAll(MapUtil.toHashMap(config.getQueryParams())); - request.put("params", params); + synchronized (inputContext.getStepContext()) { + // 把请求信息放入stepContext + Map group = new HashMap<>(); + group.put("request", request); + group.put("response", response); + this.stepResponse.addRequest(name, group); + + HttpMethod method = HttpMethod.valueOf(config.getMethod().toUpperCase()); + request.put("method", method); + + params.putAll(MapUtil.toHashMap(config.getQueryParams())); + request.put("params", params); + } ONode ctxNode = null; // 数据转换 if (inputContext != null && inputContext.getStepContext() != null) { StepContext stepContext = inputContext.getStepContext(); ctxNode = PathMapping.toONode(stepContext); - Map dataMapping = this.getConfig().getDataMapping(); - if (dataMapping != null) { - Map requestMapping = (Map) dataMapping.get("request"); - if (!CollectionUtils.isEmpty(requestMapping)) { - reqContentType = (String) requestMapping.get("contentType"); - - // headers - Map headers = PathMapping.transform(ctxNode, stepContext, - MapUtil.upperCaseKey(MapUtil.list2Map(requestMapping.get("fixedHeaders"))), - MapUtil.upperCaseKey(MapUtil.list2Map(requestMapping.get("headers"))), false); - if (headers.containsKey(CommonConstants.WILDCARD_TILDE) - && headers.get(CommonConstants.WILDCARD_TILDE) instanceof Map) { - request.put("headers", headers.get(CommonConstants.WILDCARD_TILDE)); - } else { - request.put("headers", headers); - } - - // params - params.putAll(PathMapping.transform(ctxNode, stepContext, - MapUtil.list2Map(requestMapping.get("fixedParams")), - MapUtil.list2Map(requestMapping.get("params")), false)); - if (params.containsKey(CommonConstants.WILDCARD_TILDE) - && params.get(CommonConstants.WILDCARD_TILDE) instanceof Map) { - request.put("params", params.get(CommonConstants.WILDCARD_TILDE)); - } else { - request.put("params", params); - } - - // body - boolean supportMultiLevels = true; - if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType) || - CONTENT_TYPE_FORM_URLENCODED.equals(reqContentType)) { - supportMultiLevels = false; - } - Map body = PathMapping.transform(ctxNode, stepContext, - MapUtil.list2Map(requestMapping.get("fixedBody")), - MapUtil.list2Map(requestMapping.get("body")), supportMultiLevels); - if (body.containsKey(CommonConstants.WILDCARD_TILDE)) { - request.put("body", body.get(CommonConstants.WILDCARD_TILDE)); - } else { - // script - if (requestMapping.get("script") != null) { - Map scriptCfg = (Map) requestMapping.get("script"); - try { - Object reqBody = ScriptHelper.execute(scriptCfg, ctxNode, stepContext); - if (reqBody != null) { - body.putAll((Map) reqBody); - } - } catch (ScriptException e) { - // LogService.setBizId(inputContext.getStepContext().getTraceId()); - ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); - LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); - throw new ExecuteScriptException(e, stepContext, scriptCfg); - } + synchronized (stepContext) { + Map dataMapping = this.getConfig().getDataMapping(); + if (dataMapping != null) { + Map requestMapping = (Map) dataMapping.get("request"); + if (!CollectionUtils.isEmpty(requestMapping)) { + reqContentType = (String) requestMapping.get("contentType"); + + // headers + Map headers = PathMapping.transform(ctxNode, stepContext, + MapUtil.upperCaseKey(MapUtil.list2Map(requestMapping.get("fixedHeaders"))), + MapUtil.upperCaseKey(MapUtil.list2Map(requestMapping.get("headers"))), false); + if (headers.containsKey(CommonConstants.WILDCARD_TILDE) + && headers.get(CommonConstants.WILDCARD_TILDE) instanceof Map) { + request.put("headers", headers.get(CommonConstants.WILDCARD_TILDE)); + } else { + request.put("headers", headers); + } + + // params + params.putAll(PathMapping.transform(ctxNode, stepContext, + MapUtil.list2Map(requestMapping.get("fixedParams")), + MapUtil.list2Map(requestMapping.get("params")), false)); + if (params.containsKey(CommonConstants.WILDCARD_TILDE) + && params.get(CommonConstants.WILDCARD_TILDE) instanceof Map) { + request.put("params", params.get(CommonConstants.WILDCARD_TILDE)); + } else { + request.put("params", params); + } + + // body + boolean supportMultiLevels = true; + if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType) || + CONTENT_TYPE_FORM_URLENCODED.equals(reqContentType)) { + supportMultiLevels = false; + } + Map body = PathMapping.transform(ctxNode, stepContext, + MapUtil.list2Map(requestMapping.get("fixedBody")), + MapUtil.list2Map(requestMapping.get("body")), supportMultiLevels); + if (body.containsKey(CommonConstants.WILDCARD_TILDE)) { + request.put("body", body.get(CommonConstants.WILDCARD_TILDE)); + } else { + // script + if (requestMapping.get("script") != null) { + Map scriptCfg = (Map) requestMapping.get("script"); + try { + Object reqBody = ScriptHelper.execute(scriptCfg, ctxNode, stepContext); + if (reqBody != null) { + body.putAll((Map) reqBody); + } + } catch (ScriptException e) { + LogService.setBizId(inputContext.getStepContext().getTraceId()); + LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); + throw new ExecuteScriptException(e, stepContext, scriptCfg); + } + } + request.put("body", body); } - request.put("body", body); } } } @@ -214,11 +227,15 @@ public class RequestInput extends RPCInput implements IInput{ UriComponents uriComponents = UriComponentsBuilder.fromUriString(sb.toString()) .queryParams(MapUtil.toMultiValueMap(params)).build(); - request.put("url", uriComponents.toUriString()); + synchronized (inputContext.getStepContext()) { + request.put("url", uriComponents.toUriString()); + } } else { UriComponents uriComponents = UriComponentsBuilder.fromUriString(config.getBaseUrl() + setPathVariable(ctxNode, config.getPath())) .queryParams(MapUtil.toMultiValueMap(params)).build(); - request.put("url", uriComponents.toUriString()); + synchronized (inputContext.getStepContext()) { + request.put("url", uriComponents.toUriString()); + } } } @@ -268,57 +285,59 @@ public class RequestInput extends RPCInput implements IInput{ ct = CONTENT_TYPE_JSON; } - response.put("body", this.parseBody(ct, (String)responseBody)); + synchronized (inputContext.getStepContext()) { + response.put("body", this.parseBody(ct, (String)responseBody)); + } // 数据转换 if (inputContext != null && inputContext.getStepContext() != null) { StepContext stepContext = inputContext.getStepContext(); if (!CollectionUtils.isEmpty(responseMapping)) { ONode ctxNode = PathMapping.toONode(stepContext); - - // headers - Map fixedHeaders = MapUtil.upperCaseKey((Map) responseMapping.get("fixedHeaders")); - Map headerMapping = MapUtil.upperCaseKey((Map) responseMapping.get("headers")); - if ((fixedHeaders != null && !fixedHeaders.isEmpty()) - || (headerMapping != null && !headerMapping.isEmpty())) { - Map headers = new HashMap<>(); - headers.putAll(PathMapping.transform(ctxNode, stepContext, fixedHeaders, headerMapping, false)); - if (headers.containsKey(CommonConstants.WILDCARD_TILDE) - && headers.get(CommonConstants.WILDCARD_TILDE) instanceof Map) { - response.put("headers", headers.get(CommonConstants.WILDCARD_TILDE)); - } else { - response.put("headers", headers); - } - } - - // body - Map fixedBody = (Map) responseMapping.get("fixedBody"); - Map bodyMapping = (Map) responseMapping.get("body"); - Map scriptCfg = (Map) responseMapping.get("script"); - if ((fixedBody != null && !fixedBody.isEmpty()) || (bodyMapping != null && !bodyMapping.isEmpty()) - || (scriptCfg != null && scriptCfg.get("type") != null - && scriptCfg.get("source") != null)) { - // body - Map body = new HashMap<>(); - body.putAll(PathMapping.transform(ctxNode, stepContext, fixedBody, bodyMapping)); - if (body.containsKey(CommonConstants.WILDCARD_TILDE)) { - response.put("body", body.get(CommonConstants.WILDCARD_TILDE)); - } else { - // script - if (scriptCfg != null && scriptCfg.get("type") != null && scriptCfg.get("source") != null) { - try { - Object respBody = ScriptHelper.execute(scriptCfg, ctxNode, stepContext); - if (respBody != null) { - body.putAll((Map) respBody); - } - } catch (ScriptException e) { - // LogService.setBizId(inputContext.getStepContext().getTraceId()); - ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); - LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); - throw new ExecuteScriptException(e, stepContext, scriptCfg); - } + synchronized (stepContext) { + // headers + Map fixedHeaders = MapUtil.upperCaseKey((Map) responseMapping.get("fixedHeaders")); + Map headerMapping = MapUtil.upperCaseKey((Map) responseMapping.get("headers")); + if ((fixedHeaders != null && !fixedHeaders.isEmpty()) + || (headerMapping != null && !headerMapping.isEmpty())) { + Map headers = new HashMap<>(); + headers.putAll(PathMapping.transform(ctxNode, stepContext, fixedHeaders, headerMapping, false)); + if (headers.containsKey(CommonConstants.WILDCARD_TILDE) + && headers.get(CommonConstants.WILDCARD_TILDE) instanceof Map) { + response.put("headers", headers.get(CommonConstants.WILDCARD_TILDE)); + } else { + response.put("headers", headers); + } + } + + // body + Map fixedBody = (Map) responseMapping.get("fixedBody"); + Map bodyMapping = (Map) responseMapping.get("body"); + Map scriptCfg = (Map) responseMapping.get("script"); + if ((fixedBody != null && !fixedBody.isEmpty()) || (bodyMapping != null && !bodyMapping.isEmpty()) + || (scriptCfg != null && scriptCfg.get("type") != null + && scriptCfg.get("source") != null)) { + // body + Map body = new HashMap<>(); + body.putAll(PathMapping.transform(ctxNode, stepContext, fixedBody, bodyMapping)); + if (body.containsKey(CommonConstants.WILDCARD_TILDE)) { + response.put("body", body.get(CommonConstants.WILDCARD_TILDE)); + } else { + // script + if (scriptCfg != null && scriptCfg.get("type") != null && scriptCfg.get("source") != null) { + try { + Object respBody = ScriptHelper.execute(scriptCfg, ctxNode, stepContext); + if (respBody != null) { + body.putAll((Map) respBody); + } + } catch (ScriptException e) { + LogService.setBizId(inputContext.getStepContext().getTraceId()); + LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e); + throw new ExecuteScriptException(e, stepContext, scriptCfg); + } + } + response.put("body", body); } - response.put("body", body); } } } @@ -377,12 +396,16 @@ public class RequestInput extends RPCInput implements IInput{ headers.remove(CommonConstants.HEADER_CONTENT_LENGTH); headers.add(systemConfig.fizzTraceIdHeader(), inputContext.getStepContext().getTraceId()); - request.put("headers", MapUtil.headerToHashMap(headers)); + synchronized (inputContext.getStepContext()) { + request.put("headers", MapUtil.headerToHashMap(headers)); + } Object body = null; if (CONTENT_TYPE_XML.equals(reqContentType) || CONTENT_TYPE_TEXT_XML.equals(reqContentType)) { // convert JSON to XML if it is XML content type - request.put("jsonBody", request.get("body")); + synchronized (inputContext.getStepContext()) { + request.put("jsonBody", request.get("body")); + } String jsonStr = null; if (TypeUtils.isBasicType(request.get("body"))) { jsonStr = request.get("body").toString(); @@ -396,7 +419,9 @@ public class RequestInput extends RPCInput implements IInput{ } else { body = jsonStr; } - request.put("body", body); + synchronized (inputContext.getStepContext()) { + request.put("body", body); + } LOGGER.info("body={}", body); LOGGER.info("headers={}", JSON.toJSONString(headers)); } else if (CONTENT_TYPE_MULTIPART_FORM_DATA.equals(reqContentType)) { @@ -452,24 +477,27 @@ public class RequestInput extends RPCInput implements IInput{ headers.put("ELAPSEDTIME", elapsedMillis + "ms"); RequestRPCResponse reqCr = (RequestRPCResponse) cr; - if (reqCr.getStatusCode() != null) { - this.response.put("httpStatus", reqCr.getStatusCode().value()); + synchronized (inputContext.getStepContext()) { + if (reqCr.getStatusCode() != null) { + this.response.put("httpStatus", reqCr.getStatusCode().value()); + } + this.response.put("headers", headers); + this.respContentType = httpHeaders.getFirst(CONTENT_TYPE); + inputContext.getStepContext().addElapsedTime(prefix + request.get("url"), + elapsedMillis); } - this.response.put("headers", headers); - this.respContentType = httpHeaders.getFirst(CONTENT_TYPE); - inputContext.getStepContext().addElapsedTime(prefix + request.get("url"), - elapsedMillis); } protected Mono bodyToMono(ClientResponse cr){ return cr.bodyToMono(String.class); } protected void doOnBodyError(Throwable ex, long elapsedMillis) { - // LogService.setBizId(inputContext.getStepContext().getTraceId()); - ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); + LogService.setBizId(inputContext.getStepContext().getTraceId()); LOGGER.warn("failed to call {}", request.get("url"), ex); - inputContext.getStepContext().addElapsedTime( - stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis); + synchronized (inputContext.getStepContext()) { + inputContext.getStepContext().addElapsedTime( + stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis); + } } // Parse response body according to content-type header @@ -526,8 +554,7 @@ public class RequestInput extends RPCInput implements IInput{ protected void doOnBodySuccess(Object resp, long elapsedMillis) { if(inputContext.getStepContext().isDebug()) { - // LogService.setBizId(inputContext.getStepContext().getTraceId()); - ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId()); + LogService.setBizId(inputContext.getStepContext().getTraceId()); LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"), JSON.toJSONString(this.request.get("headers")), JSON.toJSONString(this.request.get("body")), resp);