Support redis cluster

This commit is contained in:
hongqiaowei
2022-07-05 16:00:30 +08:00
parent bcace11fd4
commit 66dd30668f
2 changed files with 28 additions and 33 deletions

View File

@@ -16,23 +16,23 @@
*/
package we.fizz.input;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.script.ScriptException;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.flume.clients.log4j2appender.LogService;
import we.util.Consts;
import we.util.JacksonUtils;
import javax.script.ScriptException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
*
* @author linwaiwai
@@ -80,7 +80,8 @@ public class RPCInput extends Input {
Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class);
return needRun != null ? needRun : Boolean.TRUE;
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e);
throw new ExecuteScriptException(e, stepContext, condition);
}

View File

@@ -17,16 +17,9 @@
package we.fizz.input.extension.request;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.script.ScriptException;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.ThreadContext;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,24 +31,13 @@ import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Mono;
import we.config.SystemConfig;
import we.constants.CommonConstants;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.StepResponse;
import we.fizz.input.IInput;
import we.fizz.input.InputConfig;
import we.fizz.input.InputContext;
import we.fizz.input.InputType;
import we.fizz.input.PathMapping;
import we.fizz.input.RPCInput;
import we.fizz.input.RPCResponse;
import we.fizz.input.ScriptHelper;
import we.flume.clients.log4j2appender.LogService;
import we.fizz.input.*;
import we.proxy.FizzWebClient;
import we.proxy.http.HttpInstanceService;
import we.service_registry.RegistryCenterService;
@@ -67,6 +49,14 @@ import we.xml.JsonToXml;
import we.xml.XmlToJson;
import we.xml.XmlToJson.Builder;
import javax.script.ScriptException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
*
* @author linwaiwai
@@ -192,7 +182,8 @@ public class RequestInput extends RPCInput implements IInput{
body.putAll((Map<String, Object>) reqBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -331,7 +322,8 @@ public class RequestInput extends RPCInput implements IInput{
body.putAll((Map<String, Object>) respBody);
}
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(scriptCfg), e);
throw new ExecuteScriptException(e, stepContext, scriptCfg);
}
@@ -492,7 +484,8 @@ public class RequestInput extends RPCInput implements IInput{
}
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", request.get("url"), ex);
synchronized (inputContext.getStepContext()) {
inputContext.getStepContext().addElapsedTime(
@@ -554,7 +547,8 @@ public class RequestInput extends RPCInput implements IInput{
protected void doOnBodySuccess(Object resp, long elapsedMillis) {
if(inputContext.getStepContext().isDebug()) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
// LogService.setBizId(inputContext.getStepContext().getTraceId());
ThreadContext.put(Consts.TRACE_ID, inputContext.getStepContext().getTraceId());
LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"),
JSON.toJSONString(this.request.get("headers")),
JSON.toJSONString(this.request.get("body")), resp);