diff --git a/src/main/java/we/fizz/ConfigLoader.java b/src/main/java/we/fizz/ConfigLoader.java index db67f7c..682e3ef 100644 --- a/src/main/java/we/fizz/ConfigLoader.java +++ b/src/main/java/we/fizz/ConfigLoader.java @@ -22,10 +22,10 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.nacos.api.config.annotation.NacosValue; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ConfigurableApplicationContext; import we.config.AppConfigProperties; -import we.fizz.input.ClientInputConfig; -import we.fizz.input.Input; -import we.fizz.input.InputType; +import we.fizz.input.*; import org.apache.commons.io.FileUtils; import org.noear.snack.ONode; @@ -36,6 +36,8 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; +import we.fizz.input.extension.mysql.MySQLInput; +import we.fizz.input.extension.request.RequestInput; import javax.annotation.PostConstruct; import javax.annotation.Resource; @@ -46,6 +48,7 @@ import static we.util.Constants.Symbol.FORWARD_SLASH; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.lang.ref.SoftReference; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; @@ -61,7 +64,8 @@ import java.util.concurrent.ConcurrentHashMap; */ @Component public class ConfigLoader { - + @Autowired + public static ConfigurableApplicationContext appContext; private static final Logger LOGGER = LoggerFactory.getLogger(ConfigLoader.class); /** @@ -115,14 +119,17 @@ public class ConfigLoader { public Pipeline createPipeline(String configStr) throws IOException { ONode cfgNode = ONode.loadStr(configStr); + InputFactory.registerInput(RequestInput.TYPE, RequestInput.class); + InputFactory.registerInput(MySQLInput.TYPE, MySQLInput.class); Pipeline pipeline = new Pipeline(); + pipeline.setApplicationContext(appContext); List> stepConfigs = cfgNode.select("$.stepConfigs").toObject(List.class); for (Map stepConfig : stepConfigs) { // set the specified env URL this.handleRequestURL(stepConfig); - - Step step = new Step.Builder().read(stepConfig); + SoftReference weakPipeline = new SoftReference(pipeline); + Step step = new Step.Builder().read(stepConfig, weakPipeline); step.setName((String) stepConfig.get("name")); if (stepConfig.get("stop") != null) { step.setStop((Boolean) stepConfig.get("stop")); diff --git a/src/main/java/we/fizz/Pipeline.java b/src/main/java/we/fizz/Pipeline.java index 6f4bdbd..9571ba6 100644 --- a/src/main/java/we/fizz/Pipeline.java +++ b/src/main/java/we/fizz/Pipeline.java @@ -25,6 +25,7 @@ import java.util.Map; import javax.script.ScriptException; +import org.springframework.context.ConfigurableApplicationContext; import we.schema.util.I18nUtils; import org.noear.snack.ONode; import org.slf4j.Logger; @@ -55,6 +56,7 @@ import we.util.MapUtil; * */ public class Pipeline { + private ConfigurableApplicationContext applicationContext; private static final Logger LOGGER = LoggerFactory.getLogger(Pipeline.class); private LinkedList steps = new LinkedList(); private StepContext stepContext = new StepContext<>(); @@ -74,6 +76,7 @@ public class Pipeline { ClientInputConfig config = (ClientInputConfig)input.getConfig(); this.initialStepContext(clientInput); this.stepContext.setDebug(config.isDebug()); + this.stepContext.setApplicationContext(applicationContext); if(traceId != null) { this.stepContext.setTraceId(traceId); @@ -362,4 +365,12 @@ public class Pipeline { } } } + + public void setApplicationContext(ConfigurableApplicationContext appContext) { + this.applicationContext = applicationContext; + } + + public ConfigurableApplicationContext getApplicationContext() { + return this.applicationContext; + } } diff --git a/src/main/java/we/fizz/Step.java b/src/main/java/we/fizz/Step.java index 93d0bd1..f690210 100644 --- a/src/main/java/we/fizz/Step.java +++ b/src/main/java/we/fizz/Step.java @@ -17,11 +17,13 @@ package we.fizz; +import java.lang.ref.SoftReference; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.springframework.context.ConfigurableApplicationContext; import org.springframework.http.HttpMethod; import org.springframework.http.MediaType; import org.springframework.http.ReactiveHttpOutputMessage; @@ -45,8 +47,8 @@ import we.fizz.input.InputType; * */ public class Step { - - private String name; + private SoftReference weakPipeline; + private String name; // 是否在执行完当前step就返回 private boolean stop; @@ -55,9 +57,22 @@ public class Step { private Map requestConfigs = new HashMap(); + public SoftReference getWeakPipeline() { + return weakPipeline; + } + + public void setWeakPipeline(SoftReference weakPipeline) { + this.weakPipeline = weakPipeline; + } + + public ConfigurableApplicationContext getCurrentApplicationContext() { + return this.getWeakPipeline() != null ? this.getWeakPipeline().get().getApplicationContext(): null; + } + public static class Builder { - public Step read(Map config) { + public Step read(Map config, SoftReference weakPipeline) { Step step = new Step(); + step.setWeakPipeline(weakPipeline); List requests= (List) config.get("requests"); for(Map requestConfig: requests) { InputConfig inputConfig = InputFactory.createInputConfig(requestConfig); @@ -68,6 +83,11 @@ public class Step { } private StepContext stepContext; + + public StepContext getStepContext(){ + return this.stepContext; + } + private StepResponse lastStepResponse = null; private Map inputs = new HashMap(); public void beforeRun(StepContext stepContext2, StepResponse response ) { @@ -80,6 +100,7 @@ public class Step { InputConfig inputConfig = configs.get(configName); InputType type = inputConfig.getType(); Input input = InputFactory.createInput(type.toString()); + input.setWeakStep(new SoftReference(this)); input.setConfig(inputConfig); input.setName(configName); input.setStepResponse(stepResponse); diff --git a/src/main/java/we/fizz/StepContext.java b/src/main/java/we/fizz/StepContext.java index f0183eb..cbae719 100644 --- a/src/main/java/we/fizz/StepContext.java +++ b/src/main/java/we/fizz/StepContext.java @@ -23,10 +23,11 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.springframework.context.ConfigurableApplicationContext; import we.constants.CommonConstants; /** - * + * * @author linwaiwai * @author francis * @@ -35,26 +36,27 @@ import we.constants.CommonConstants; */ @SuppressWarnings("unchecked") public class StepContext extends ConcurrentHashMap { + private ConfigurableApplicationContext applicationContext; public static final String ELAPSED_TIMES = "elapsedTimes"; public static final String DEBUG = "debug"; public static final String RETURN_CONTEXT = "returnContext"; // context field in response body public static final String CONTEXT_FIELD = "_context"; - + // exception info public static final String EXCEPTION_MESSAGE = "exceptionMessage"; public static final String EXCEPTION_STACKS = "exceptionStacks"; public static final String EXCEPTION_DATA = "exceptionData"; - + public void setDebug(Boolean debug) { this.put((K)DEBUG, (V)debug); } - + public String getTraceId() { return (String) this.get(CommonConstants.TRACE_ID); } - + public void setTraceId(String traceId) { this.put((K)CommonConstants.TRACE_ID, (V)traceId); } @@ -74,7 +76,7 @@ public class StepContext extends ConcurrentHashMap { public boolean returnContext() { return Boolean.valueOf((String)getInputReqHeader(RETURN_CONTEXT)); } - + /** * set exception information * @param cause exception @@ -97,7 +99,7 @@ public class StepContext extends ConcurrentHashMap { this.put((K) EXCEPTION_STACKS, (V) arr); } } - + public synchronized void addElapsedTime(String actionName, Long milliSeconds) { List> elapsedTimes = (List>) this.get(ELAPSED_TIMES); if (elapsedTimes == null) { @@ -129,7 +131,7 @@ public class StepContext extends ConcurrentHashMap { /** * 设置Step里调用接口的请求头 - * + * * @param stepName * @param requestName * @param headerName @@ -155,7 +157,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取Step里调用接口的请求头 - * + * * @param stepName * @param requestName * @param headerName @@ -178,7 +180,7 @@ public class StepContext extends ConcurrentHashMap { /** * 设置Step里调用接口的请求body - * + * * @param stepName * @param requestName * @param fieldName @@ -204,7 +206,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取Step里调用接口的请求body - * + * * @param stepName * @param requestName * @param fieldName 字段名 @@ -228,7 +230,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取Step里调用接口的请求body - * + * * @param stepName * @param requestName */ @@ -244,7 +246,7 @@ public class StepContext extends ConcurrentHashMap { } return req.get("body"); } - + /** * 获取Step里调用的接口的URL参数 * @param stepName 步骤名【必填】 @@ -263,7 +265,7 @@ public class StepContext extends ConcurrentHashMap { } return req.get("params"); } - + /** * 获取Step里调用的接口的URL参数 * @param stepName 步骤名【必填】 @@ -277,7 +279,7 @@ public class StepContext extends ConcurrentHashMap { /** * 设置Step里调用接口响应头 - * + * * @param stepName * @param requestName * @param headerName @@ -303,7 +305,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取Step里调用接口响应头 - * + * * @param stepName * @param requestName * @param headerName @@ -326,7 +328,7 @@ public class StepContext extends ConcurrentHashMap { /** * 设置Step里调用接口的响应body - * + * * @param stepName * @param requestName * @param key @@ -352,7 +354,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取Step里调用接口的响应body - * + * * @param stepName * @param requestName * @param key @@ -375,7 +377,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取Step里调用接口的响应body - * + * * @param stepName * @param requestName */ @@ -393,7 +395,7 @@ public class StepContext extends ConcurrentHashMap { /** * 设置Step的结果 - * + * * @param stepName * @param key * @param value @@ -413,7 +415,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取Step的结果 - * + * * @param stepName * @param key */ @@ -431,7 +433,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取Step的结果 - * + * * @param stepName */ public Object getStepResult(String stepName) { @@ -444,7 +446,7 @@ public class StepContext extends ConcurrentHashMap { /** * 设置聚合接口的响应头 - * + * * @param headerName * @param headerValue */ @@ -467,7 +469,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取聚合接口的响应头 - * + * * @param headerName */ public Object getInputRespHeader(String headerName) { @@ -488,7 +490,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取聚合接口的请求头 - * + * * @param headerName */ public Object getInputReqHeader(String headerName) { @@ -509,7 +511,7 @@ public class StepContext extends ConcurrentHashMap { /** * 设置聚合接口的响应body - * + * * @param fieldName * @param value */ @@ -533,7 +535,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取聚合接口的响应body - * + * * @param fieldName */ public Object getInputRespBody(String fieldName) { @@ -554,7 +556,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取聚合接口的响应body - * + * */ public Object getInputRespBody() { Map input = (Map) this.get("input"); @@ -570,7 +572,7 @@ public class StepContext extends ConcurrentHashMap { /** * 获取聚合接口的请求body - * + * * @param fieldName */ public Object getInputReqBody(String fieldName) { @@ -583,19 +585,19 @@ public class StepContext extends ConcurrentHashMap { /** * 获取聚合接口的请求body - * + * */ public Object getInputReqBody() { return getInputReqAttr("body"); } - + /** * 获取客户端URL请求参数(query string) */ public Object getInputReqParam() { return this.getInputReqAttr("params"); } - + /** * 获取客户端URL请求参数(query string) * @param paramName URL参数名 @@ -604,11 +606,11 @@ public class StepContext extends ConcurrentHashMap { Map params = (Map) this.getInputReqAttr("params"); return params == null ? null : paramName == null ? params : params.get(paramName); } - + /** * 获取聚合接口请求属性
* 可选属性:path,method,headers,params,body - * + * */ public Object getInputReqAttr(String key) { Map input = (Map) this.get("input"); @@ -622,4 +624,11 @@ public class StepContext extends ConcurrentHashMap { return request.get(key); } + public ConfigurableApplicationContext getApplicationContext(){ + return this.applicationContext; + } + + public void setApplicationContext(ConfigurableApplicationContext applicationContext) { + this.applicationContext = applicationContext; + } } diff --git a/src/main/java/we/fizz/exception/FizzException.java b/src/main/java/we/fizz/exception/FizzException.java new file mode 100644 index 0000000..92d8710 --- /dev/null +++ b/src/main/java/we/fizz/exception/FizzException.java @@ -0,0 +1,12 @@ +package we.fizz.exception; + +public class FizzException extends Exception { + public FizzException(Throwable exception) { + super(exception); + } + public FizzException(String message) { + super(message); + } + + +} diff --git a/src/main/java/we/fizz/exception/FizzRuntimeException.java b/src/main/java/we/fizz/exception/FizzRuntimeException.java new file mode 100644 index 0000000..9205f0b --- /dev/null +++ b/src/main/java/we/fizz/exception/FizzRuntimeException.java @@ -0,0 +1,7 @@ +package we.fizz.exception; + +public class FizzRuntimeException extends RuntimeException { + public FizzRuntimeException (String message){ + super(message); + } +} diff --git a/src/main/java/we/fizz/input/ClientInputConfig.java b/src/main/java/we/fizz/input/ClientInputConfig.java index 68f5b87..c2746ea 100644 --- a/src/main/java/we/fizz/input/ClientInputConfig.java +++ b/src/main/java/we/fizz/input/ClientInputConfig.java @@ -41,6 +41,7 @@ public class ClientInputConfig extends InputConfig { @SuppressWarnings("unchecked") public ClientInputConfig(Map configBody) { + super(configBody); if(configBody.get("debug") != null) { this.debug = (boolean) configBody.get("debug"); } @@ -73,10 +74,11 @@ public class ClientInputConfig extends InputConfig { validateResponse = ((Map) configBody.get("validateResponse")); } } - + public ClientInputConfig() { - - } + super(null); + } + public boolean isDebug() { return debug; diff --git a/src/main/java/we/fizz/input/IInput.java b/src/main/java/we/fizz/input/IInput.java new file mode 100644 index 0000000..5709c0f --- /dev/null +++ b/src/main/java/we/fizz/input/IInput.java @@ -0,0 +1,29 @@ +package we.fizz.input; + +import org.springframework.context.ConfigurableApplicationContext; +import reactor.core.publisher.Mono; +import we.fizz.Step; +import we.fizz.StepContext; +import we.fizz.StepResponse; + +import java.lang.ref.SoftReference; +import java.util.Map; + +public interface IInput { + public static final InputType TYPE = null; + public static Class inputConfigClass() { + return null; + } + public String getName() ; + public boolean needRun(StepContext stepContext); + public void beforeRun(InputContext context); + public Mono run(); + + public StepResponse getStepResponse() ; + public void setStepResponse(StepResponse stepResponse); + public SoftReference getWeakStep(); + public void setWeakStep(SoftReference weakStep); + + public ConfigurableApplicationContext getCurrentApplicationContext(); + +} diff --git a/src/main/java/we/fizz/input/Input.java b/src/main/java/we/fizz/input/Input.java index 0904b68..bd8d38c 100644 --- a/src/main/java/we/fizz/input/Input.java +++ b/src/main/java/we/fizz/input/Input.java @@ -16,10 +16,12 @@ */ package we.fizz.input; -import java.util.HashMap; +import java.lang.ref.SoftReference; import java.util.Map; +import org.springframework.context.ConfigurableApplicationContext; import reactor.core.publisher.Mono; +import we.fizz.Step; import we.fizz.StepContext; import we.fizz.StepResponse; @@ -34,7 +36,7 @@ public class Input { protected InputContext inputContext; protected StepResponse lastStepResponse = null; protected StepResponse stepResponse; - + private SoftReference weakStep; public void setConfig(InputConfig inputConfig) { config = inputConfig; } @@ -67,7 +69,7 @@ public class Input { } public void setName(String configName) { this.name = configName; - + } public StepResponse getStepResponse() { @@ -76,7 +78,20 @@ public class Input { public void setStepResponse(StepResponse stepResponse) { this.stepResponse = stepResponse; } - - - + + public SoftReference getWeakStep() { + return weakStep; + } + + public void setWeakStep(SoftReference weakStep) { + this.weakStep = weakStep; + } + + public ConfigurableApplicationContext getCurrentApplicationContext(){ + return this.getWeakStep() != null ? this.getWeakStep().get().getCurrentApplicationContext() : null; + } + + public static Class inputConfigClass (){ + return InputConfig.class; + } } diff --git a/src/main/java/we/fizz/input/InputConfig.java b/src/main/java/we/fizz/input/InputConfig.java index 897efee..5df57fb 100644 --- a/src/main/java/we/fizz/input/InputConfig.java +++ b/src/main/java/we/fizz/input/InputConfig.java @@ -17,6 +17,7 @@ package we.fizz.input; +import java.util.HashMap; import java.util.Map; /** @@ -28,6 +29,10 @@ public class InputConfig { private InputType type; protected Map dataMapping; + protected Map configMap; + public InputConfig(Map aConfigMap) { + configMap = aConfigMap; + } public InputType getType() { return type; @@ -45,4 +50,17 @@ public class InputConfig { this.dataMapping = dataMapping; } + private Map fallback = new HashMap(); + + public Map getFallback() { + return fallback; + } + + public void setFallback(Map fallback) { + this.fallback = fallback; + } + + public void parse(){ + + } } diff --git a/src/main/java/we/fizz/input/InputFactory.java b/src/main/java/we/fizz/input/InputFactory.java index 733a6ca..5a335b7 100644 --- a/src/main/java/we/fizz/input/InputFactory.java +++ b/src/main/java/we/fizz/input/InputFactory.java @@ -17,6 +17,11 @@ package we.fizz.input; +import we.fizz.exception.FizzRuntimeException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.HashMap; import java.util.Map; /** @@ -25,38 +30,54 @@ import java.util.Map; * */ public class InputFactory { - public static InputConfig createInputConfig(Map config) { + public static Map inputClasses = new HashMap(); + public static void registerInput(InputType type, Class inputClass){ + inputClasses.put(type, inputClass); + } + public static void unregisterInput(InputType type){ + inputClasses.remove(type); + } + public static InputConfig createInputConfig(Map config) { String type = (String) config.get("type"); InputType typeEnum = InputType.valueOf(type.toUpperCase()); InputConfig inputConfig = null; - switch(typeEnum) { - case REQUEST: - inputConfig = new RequestInputConfig(config); - - break; - case MYSQL: - inputConfig = new MySQLInputConfig(config); - break; + if (inputClasses.containsKey(typeEnum)){ + Class InputClass = inputClasses.get(typeEnum); + + try { + Method inputConfigClassMethod = InputClass.getMethod("inputConfigClass"); + Class InputConfigClass = (Class) inputConfigClassMethod.invoke(null); + Constructor constructor = null; + constructor = InputConfigClass.getDeclaredConstructor(Map.class); + constructor.setAccessible(true); + inputConfig = (InputConfig) constructor.newInstance(config); + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new FizzRuntimeException(e.getMessage()); + } + inputConfig.setType(typeEnum); + inputConfig.setDataMapping((Map) config.get("dataMapping")); + inputConfig.parse(); + return inputConfig; } - inputConfig.setType(typeEnum); - inputConfig.setDataMapping((Map) config.get("dataMapping")); - - return inputConfig; + return null; } public static Input createInput(String type) { InputType typeEnum = InputType.valueOf(type.toUpperCase()); Input input = null; - switch(typeEnum) { - case REQUEST: - input = new RequestInput(); - break; - case MYSQL: - input = new MySQLInput(); - break; + if (inputClasses.containsKey(typeEnum)) { + Class InputClass = inputClasses.get(typeEnum); + Constructor constructor = null; + try { + constructor = InputClass.getDeclaredConstructor(); + constructor.setAccessible(true); + input = (Input) constructor.newInstance(); + return input; + } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new FizzRuntimeException(e.getMessage()); + } } - - return input; + return null; } } diff --git a/src/main/java/we/fizz/input/InputType.java b/src/main/java/we/fizz/input/InputType.java index 752e8fe..b15b7e5 100644 --- a/src/main/java/we/fizz/input/InputType.java +++ b/src/main/java/we/fizz/input/InputType.java @@ -17,16 +17,28 @@ package we.fizz.input; +import java.util.HashMap; +import java.util.Map; + /** * * @author linwaiwai * */ -public enum InputType { - REQUEST("REQUEST"), - MYSQL("MYSQL"); - private final String type; - private InputType(String aType) { +public class InputType { + + private final String type; + static private Map inputs = new HashMap(); + public InputType(String aType) { this.type = aType; + inputs.put(aType, this); } + + public static InputType valueOf(String string) { + return inputs.get(string); + } + public String toString(){ + return type; + } + } \ No newline at end of file diff --git a/src/main/java/we/fizz/input/RPCInput.java b/src/main/java/we/fizz/input/RPCInput.java new file mode 100644 index 0000000..63a8256 --- /dev/null +++ b/src/main/java/we/fizz/input/RPCInput.java @@ -0,0 +1,129 @@ +package we.fizz.input; +import org.noear.snack.ONode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.CollectionUtils; +import reactor.core.publisher.Mono; +import we.exception.ExecuteScriptException; +import we.fizz.StepContext; +import we.fizz.input.extension.request.RequestInputConfig; +import we.flume.clients.log4j2appender.LogService; +import we.util.JacksonUtils; + +import javax.script.ScriptException; +import java.util.HashMap; +import java.util.Map; + +public class RPCInput extends Input { + protected static final Logger LOGGER = LoggerFactory.getLogger(RPCInput.class.getName()); + protected static final String FALLBACK_MODE_STOP = "stop"; + protected static final String FALLBACK_MODE_CONTINUE = "continue"; + + protected Map request = new HashMap<>(); + protected Map response = new HashMap<>(); + + protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) { + + } + + protected void doOnResponseSuccess(RPCResponse cr, long elapsedMillis) { + + } + protected Mono bodyToMono(RPCResponse cr){ + return cr.getBodyMono(); + } + + protected void doOnBodyError(Throwable ex, long elapsedMillis) { + } + + protected void doOnBodySuccess(String resp, long elapsedMillis) { + } + + protected void doResponseMapping(InputConfig aConfig, InputContext inputContext, String responseBody) { + } + + @Override + @SuppressWarnings("unchecked") + public boolean needRun(StepContext stepContext) { + Map condition = ((RequestInputConfig) config).getCondition(); + if (CollectionUtils.isEmpty(condition)) { + // 没有配置condition,直接运行 + return Boolean.TRUE; + } + + ONode ctxNode = PathMapping.toONode(stepContext); + try { + Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class); + return needRun != null ? needRun : Boolean.TRUE; + } catch (ScriptException e) { + LogService.setBizId(inputContext.getStepContext().getTraceId()); + LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e); + throw new ExecuteScriptException(e, stepContext, condition); + } + } + + private String prefix; + + @Override + public Mono run() { + long t1 = System.currentTimeMillis(); + this.doRequestMapping(config, inputContext); + inputContext.getStepContext().addElapsedTime(stepResponse.getStepName() + "-" + this.name + "-RequestMapping", + System.currentTimeMillis() - t1); + + prefix = stepResponse.getStepName() + "-" + "调用接口"; + long start = System.currentTimeMillis(); + Mono rpcResponse = this.getClientSpecFromContext(config, inputContext); + Mono body = rpcResponse.flatMap(cr->{ + return Mono.just(cr).doOnError(throwable -> cleanup(cr)); + }).doOnSuccess(cr -> { + long elapsedMillis = System.currentTimeMillis() - start; + this.doOnResponseSuccess(cr, elapsedMillis); + + }).flatMap(cr -> { return this.bodyToMono(cr); }).doOnSuccess(resp -> { + long elapsedMillis = System.currentTimeMillis() - start; + this.doOnBodySuccess(resp, elapsedMillis); + }).doOnError(ex -> { + long elapsedMillis = System.currentTimeMillis() - start; + this.doOnBodyError(ex, elapsedMillis); + }); + + // fallback handler + InputConfig reqConfig = (InputConfig) config; + if (reqConfig.getFallback() != null) { + Map fallback = reqConfig.getFallback(); + String mode = fallback.get("mode"); + if (FALLBACK_MODE_STOP.equals(mode)) { + body = body.onErrorStop(); + } else if (FALLBACK_MODE_CONTINUE.equals(mode)) { + body = body.onErrorResume(ex -> { + return Mono.just(fallback.get("defaultResult")); + }); + } else { + body = body.onErrorStop(); + } + } + + return body.flatMap(item -> { + Map result = new HashMap(); + result.put("data", item); + result.put("request", this); + + long t3 = System.currentTimeMillis(); + this.doResponseMapping(config, inputContext, item); + inputContext.getStepContext().addElapsedTime( + stepResponse.getStepName() + "-" + this.name + "-ResponseMapping", System.currentTimeMillis() - t3); + + return Mono.just(result); + }); + } + + private void cleanup(RPCResponse clientResponse) { + + } + + protected Mono getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) { + return null; + } + +} diff --git a/src/main/java/we/fizz/input/RPCResponse.java b/src/main/java/we/fizz/input/RPCResponse.java new file mode 100644 index 0000000..5db0f2b --- /dev/null +++ b/src/main/java/we/fizz/input/RPCResponse.java @@ -0,0 +1,32 @@ +package we.fizz.input; + +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.util.MultiValueMap; +import reactor.core.publisher.Mono; + +/** + * + * @author linwaiwai + * + */ +public class RPCResponse { + private MultiValueMap headers; + private Mono bodyMono; + + public MultiValueMap getHeaders() { + return headers; + } + + public void setHeaders(MultiValueMap headers) { + this.headers = headers; + } + + public Mono getBodyMono() { + return bodyMono; + } + + public void setBodyMono(Mono bodyMono) { + this.bodyMono = bodyMono; + } +} diff --git a/src/main/java/we/fizz/input/MySQLInput.java b/src/main/java/we/fizz/input/extension/mysql/MySQLInput.java similarity index 69% rename from src/main/java/we/fizz/input/MySQLInput.java rename to src/main/java/we/fizz/input/extension/mysql/MySQLInput.java index f482d0c..9776c93 100644 --- a/src/main/java/we/fizz/input/MySQLInput.java +++ b/src/main/java/we/fizz/input/extension/mysql/MySQLInput.java @@ -15,13 +15,20 @@ * along with this program. If not, see . */ -package we.fizz.input; +package we.fizz.input.extension.mysql; + +import we.fizz.input.IInput; +import we.fizz.input.Input; +import we.fizz.input.InputType; /** * * @author linwaiwai * */ -public class MySQLInput extends Input { - +public class MySQLInput extends Input implements IInput { + static public InputType TYPE = new InputType("MYSQL"); + public static Class inputConfigClass (){ + return MySQLInputConfig.class; + } } diff --git a/src/main/java/we/fizz/input/MySQLInputConfig.java b/src/main/java/we/fizz/input/extension/mysql/MySQLInputConfig.java similarity index 90% rename from src/main/java/we/fizz/input/MySQLInputConfig.java rename to src/main/java/we/fizz/input/extension/mysql/MySQLInputConfig.java index 2ba2319..129a925 100644 --- a/src/main/java/we/fizz/input/MySQLInputConfig.java +++ b/src/main/java/we/fizz/input/extension/mysql/MySQLInputConfig.java @@ -15,7 +15,9 @@ * along with this program. If not, see . */ -package we.fizz.input; +package we.fizz.input.extension.mysql; + +import we.fizz.input.InputConfig; import java.util.Map; @@ -27,7 +29,7 @@ import java.util.Map; public class MySQLInputConfig extends InputConfig { public MySQLInputConfig(Map configBody) { - // TODO Auto-generated constructor stub + super(configBody); } } diff --git a/src/main/java/we/fizz/input/RequestInput.java b/src/main/java/we/fizz/input/extension/request/RequestInput.java similarity index 64% rename from src/main/java/we/fizz/input/RequestInput.java rename to src/main/java/we/fizz/input/extension/request/RequestInput.java index 7ca89cd..c115855 100644 --- a/src/main/java/we/fizz/input/RequestInput.java +++ b/src/main/java/we/fizz/input/extension/request/RequestInput.java @@ -15,19 +15,19 @@ * along with this program. If not, see . */ -package we.fizz.input; +package we.fizz.input.extension.request; import java.util.HashMap; import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.script.ScriptException; import org.noear.snack.ONode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; -import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.util.UriComponents; @@ -36,11 +36,11 @@ import org.springframework.web.util.UriComponentsBuilder; import com.alibaba.fastjson.JSON; import reactor.core.publisher.Mono; -import we.FizzAppContext; import we.constants.CommonConstants; import we.exception.ExecuteScriptException; import we.fizz.StepContext; import we.fizz.StepResponse; +import we.fizz.input.*; import we.flume.clients.log4j2appender.LogService; import we.proxy.FizzWebClient; import we.util.JacksonUtils; @@ -53,9 +53,9 @@ import we.util.MapUtil; * */ @SuppressWarnings("unchecked") -public class RequestInput extends Input { - +public class RequestInput extends RPCInput implements IInput{ private static final Logger LOGGER = LoggerFactory.getLogger(RequestInput.class); + static public InputType TYPE = new InputType("REQUEST"); private InputType type; protected Map dataMapping; protected Map request = new HashMap<>(); @@ -63,15 +63,17 @@ public class RequestInput extends Input { private static final String FALLBACK_MODE_STOP = "stop"; private static final String FALLBACK_MODE_CONTINUE = "continue"; - - private static final String CONTENT_TYPE_JSON = "application/json"; - private static final String CONTENT_TYPE_XML = "application/xml"; - private static final String CONTENT_TYPE_JS = "application/javascript"; - private static final String CONTENT_TYPE_HTML = "text/html"; - private static final String CONTENT_TYPE_TEXT = "text/plain"; - - private static final String CONTENT_TYPE = "content-type"; - + + private static final String CONTENT_TYPE_JSON = "application/json"; + private static final String CONTENT_TYPE_XML = "application/xml"; + private static final String CONTENT_TYPE_JS = "application/javascript"; + private static final String CONTENT_TYPE_HTML = "text/html"; + private static final String CONTENT_TYPE_TEXT = "text/plain"; + + private static final String CONTENT_TYPE = "content-type"; + + + public InputType getType() { return type; } @@ -88,7 +90,7 @@ public class RequestInput extends Input { this.dataMapping = dataMapping; } - private void doRequestMapping(InputConfig aConfig, InputContext inputContext) { + protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) { RequestInputConfig config = (RequestInputConfig) aConfig; // 把请求信息放入stepContext @@ -112,7 +114,7 @@ public class RequestInput extends Input { Map requestMapping = (Map) dataMapping.get("request"); if (requestMapping != null && !StringUtils.isEmpty(requestMapping)) { ONode ctxNode = PathMapping.toONode(stepContext); - + // headers request.put("headers", PathMapping.transform(ctxNode, stepContext, @@ -155,9 +157,11 @@ public class RequestInput extends Input { request.put("url", uriComponents.toUriString()); } - private void doResponseMapping(InputConfig aConfig, InputContext inputContext, Object responseBody) { + protected void doResponseMapping(InputConfig aConfig, InputContext inputContext, Object responseBody) { + RequestInputConfig config = (RequestInputConfig) aConfig; response.put("body", responseBody); + // 数据转换 if (inputContext != null && inputContext.getStepContext() != null) { StepContext stepContext = inputContext.getStepContext(); @@ -210,7 +214,8 @@ public class RequestInput extends Input { } } - private Mono getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) { + @Override + protected Mono getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) { RequestInputConfig config = (RequestInputConfig) aConfig; int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout(); @@ -222,6 +227,7 @@ public class RequestInput extends Input { if (headers == null) { headers = new HashMap<>(); } + if (!headers.containsKey("Content-Type")) { // defalut content-type headers.put("Content-Type", "application/json; charset=UTF-8"); @@ -232,9 +238,19 @@ public class RequestInput extends Input { String aggrPath = (String)inputContext.getStepContext().getInputReqAttr("path"); String aggrService = aggrPath.split("\\/")[2]; - FizzWebClient client = FizzAppContext.appContext.getBean(FizzWebClient.class); - return client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url, +// FizzWebClient client = FizzAppContext.appContext.getBean(FizzWebClient.class); + FizzWebClient client = this.getCurrentApplicationContext().getBean(FizzWebClient.class); + Mono clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url, MapUtil.toHttpHeaders(headers), request.get("body"), (long)timeout); + return clientResponse.flatMap(cr->{ + RequestRPCResponse response = new RequestRPCResponse(); + response.setHeaders(cr.headers().asHttpHeaders()); + response.setBodyMono(cr.bodyToMono(String.class)); + response.setStatus(cr.statusCode()); + return Mono.just(response); + }); + + } private Map getResponses(Map stepContext2) { @@ -242,103 +258,32 @@ public class RequestInput extends Input { return null; } - @Override - @SuppressWarnings("unchecked") - public boolean needRun(StepContext stepContext) { - Map condition = ((RequestInputConfig) config).getCondition(); - if (CollectionUtils.isEmpty(condition)) { - // 没有配置condition,直接运行 - return Boolean.TRUE; - } - - ONode ctxNode = PathMapping.toONode(stepContext); - try { - Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class); - return needRun != null ? needRun : Boolean.TRUE; - } catch (ScriptException e) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); - LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e); - throw new ExecuteScriptException(e, stepContext, condition); - } - } - - @Override - public Mono run() { - long t1 = System.currentTimeMillis(); - this.doRequestMapping(config, inputContext); - inputContext.getStepContext().addElapsedTime(stepResponse.getStepName() + "-" + this.name + "-RequestMapping", - System.currentTimeMillis() - t1); - - Map tmpMap = new HashMap<>(); - - String prefix = stepResponse.getStepName() + "-" + "调用接口"; - long start = System.currentTimeMillis(); - Mono clientResponse = this.getClientSpecFromContext(config, inputContext); - Mono body = clientResponse.flatMap(cr->{ - return Mono.just(cr).doOnError(throwable -> cleanup(cr)); - }).flatMap(cr -> { - long elapsedMillis = System.currentTimeMillis() - start; - HttpHeaders httpHeaders = cr.headers().asHttpHeaders(); - Map headers = new HashMap<>(); - httpHeaders.forEach((key, value) -> { - if (value.size() > 1) { - headers.put(key, value); - } else { - headers.put(key, httpHeaders.getFirst(key)); - } - }); - tmpMap.put(CONTENT_TYPE, httpHeaders.getFirst(CONTENT_TYPE)); - headers.put("elapsedTime", elapsedMillis + "ms"); - this.response.put("headers", headers); - inputContext.getStepContext().addElapsedTime(prefix + request.get("url"), elapsedMillis); - - return cr.bodyToMono(String.class); - }).doOnSuccess(resp -> { - long elapsedMillis = System.currentTimeMillis() - start; - if(inputContext.getStepContext().isDebug()) { - LogService.setBizId(inputContext.getStepContext().getTraceId()); - LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"), - JSON.toJSONString(this.request.get("headers")), - JSON.toJSONString(this.request.get("body")), resp); - } - }).doOnError(ex -> { - LogService.setBizId(inputContext.getStepContext().getTraceId()); - LOGGER.warn("failed to call {}", request.get("url"), ex); - long elapsedMillis = System.currentTimeMillis() - start; - inputContext.getStepContext().addElapsedTime( - stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis); - }); - - // fallback handler - RequestInputConfig reqConfig = (RequestInputConfig) config; - if (reqConfig.getFallback() != null) { - Map fallback = reqConfig.getFallback(); - String mode = fallback.get("mode"); - if (FALLBACK_MODE_STOP.equals(mode)) { - body = body.onErrorStop(); - } else if (FALLBACK_MODE_CONTINUE.equals(mode)) { - body = body.onErrorResume(ex -> { - return Mono.just(fallback.get("defaultResult")); - }); + protected void doOnResponseSuccess(ClientResponse cr, long elapsedMillis) { + HttpHeaders httpHeaders = cr.headers().asHttpHeaders(); + Map headers = new HashMap<>(); + httpHeaders.forEach((key, value) -> { + if (value.size() > 1) { + headers.put(key, value); } else { - body = body.onErrorStop(); + headers.put(key, httpHeaders.getFirst(key)); } - } - - return body.flatMap(item -> { - Map result = new HashMap(); - result.put("data", item); - result.put("request", this); - - long t3 = System.currentTimeMillis(); - this.doResponseMapping(config, inputContext, parseBody((String) tmpMap.get(CONTENT_TYPE), item)); - inputContext.getStepContext().addElapsedTime( - stepResponse.getStepName() + "-" + this.name + "-ResponseMapping", System.currentTimeMillis() - t3); - - return Mono.just(result); }); + headers.put("elapsedTime", elapsedMillis + "ms"); + this.response.put("headers", headers); + inputContext.getStepContext().addElapsedTime(prefix + request.get("url"), + elapsedMillis); } - + protected Mono bodyToMono(ClientResponse cr){ + return cr.bodyToMono(String.class); + } + + protected void doOnBodyError(Throwable ex, long elapsedMillis) { + LogService.setBizId(inputContext.getStepContext().getTraceId()); + LOGGER.warn("failed to call {}", request.get("url"), ex); + inputContext.getStepContext().addElapsedTime( + stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis); + } + // Parse response body according to content-type header public Object parseBody(String contentType, String responseBody) { String[] cts = contentType.split(";"); @@ -346,32 +291,32 @@ public class RequestInput extends Input { for (int i = 0; i < cts.length; i++) { String ct = cts[i].toLowerCase(); switch (ct) { - case CONTENT_TYPE_JSON: - body = JSON.parse(responseBody); - break; - case CONTENT_TYPE_TEXT: - // parse text as json if start with "{" and end with "}" or start with "[" and - // end with "]" - if ((responseBody.startsWith("{") && responseBody.endsWith("}")) - || (responseBody.startsWith("[") && responseBody.endsWith("]"))) { - try { - body = JSON.parse(responseBody); - } catch (Exception e) { + case CONTENT_TYPE_JSON: + body = JSON.parse(responseBody); + break; + case CONTENT_TYPE_TEXT: + // parse text as json if start with "{" and end with "}" or start with "[" and + // end with "]" + if ((responseBody.startsWith("{") && responseBody.endsWith("}")) + || (responseBody.startsWith("[") && responseBody.endsWith("]"))) { + try { + body = JSON.parse(responseBody); + } catch (Exception e) { + body = responseBody; + } + } else { body = responseBody; } - } else { + break; + case CONTENT_TYPE_XML: body = responseBody; - } - break; - case CONTENT_TYPE_XML: - body = responseBody; - break; - case CONTENT_TYPE_HTML: - body = responseBody; - break; - case CONTENT_TYPE_JS: - body = responseBody; - break; + break; + case CONTENT_TYPE_HTML: + body = responseBody; + break; + case CONTENT_TYPE_JS: + body = responseBody; + break; } if (body != null) { break; @@ -382,11 +327,28 @@ public class RequestInput extends Input { } return body; } - + + protected void doOnBodySuccess(String resp, long elapsedMillis) { + if(inputContext.getStepContext().isDebug()) { + LogService.setBizId(inputContext.getStepContext().getTraceId()); + LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"), + JSON.toJSONString(this.request.get("headers")), + JSON.toJSONString(this.request.get("body")), resp); + } + } + + private String prefix; + + private void cleanup(ClientResponse clientResponse) { if (clientResponse != null) { clientResponse.bodyToMono(Void.class).subscribe(); } } + public static Class inputConfigClass (){ + return RequestInputConfig.class; + } + + } diff --git a/src/main/java/we/fizz/input/RequestInputConfig.java b/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java similarity index 88% rename from src/main/java/we/fizz/input/RequestInputConfig.java rename to src/main/java/we/fizz/input/extension/request/RequestInputConfig.java index a473599..029cbdc 100644 --- a/src/main/java/we/fizz/input/RequestInputConfig.java +++ b/src/main/java/we/fizz/input/extension/request/RequestInputConfig.java @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package we.fizz.input; +package we.fizz.input.extension.request; import java.net.MalformedURLException; import java.net.URL; @@ -25,6 +25,8 @@ import java.util.Map; import org.springframework.util.MultiValueMap; import org.springframework.util.StringUtils; import org.springframework.web.util.UriComponentsBuilder; +import we.fizz.input.InputConfig; +import we.fizz.input.InputType; /** * @@ -32,14 +34,16 @@ import org.springframework.web.util.UriComponentsBuilder; * @author francis * */ -public class RequestInputConfig extends InputConfig{ +public class RequestInputConfig extends InputConfig { + private URL url ; private String method ; private int timeout = 3; - private Map fallback = new HashMap(); + private Map condition; public RequestInputConfig(Map configBody) { + super(configBody); String url = (String) configBody.get("url"); if(StringUtils.isEmpty(url)) { throw new RuntimeException("Request URL can not be blank"); @@ -54,7 +58,8 @@ public class RequestInputConfig extends InputConfig{ timeout = Integer.valueOf(configBody.get("timeout").toString()); } if (configBody.get("fallback") != null) { - fallback = (Map)configBody.get("fallback"); + Map fallback = (Map)configBody.get("fallback"); + setFallback(fallback); } if (configBody.get("condition") != null) { setCondition((Map)configBody.get("condition")); @@ -110,13 +115,6 @@ public class RequestInputConfig extends InputConfig{ this.timeout = timeout; } - public Map getFallback() { - return fallback; - } - - public void setFallback(Map fallback) { - this.fallback = fallback; - } public Map getCondition() { return condition; diff --git a/src/main/java/we/fizz/input/extension/request/RequestRPCResponse.java b/src/main/java/we/fizz/input/extension/request/RequestRPCResponse.java new file mode 100644 index 0000000..1376cab --- /dev/null +++ b/src/main/java/we/fizz/input/extension/request/RequestRPCResponse.java @@ -0,0 +1,15 @@ +package we.fizz.input.extension.request; + +import org.springframework.http.HttpStatus; +import we.fizz.input.RPCResponse; + +public class RequestRPCResponse extends RPCResponse { + private HttpStatus statusCode; + public void setStatus(HttpStatus statusCode) { + this.statusCode = statusCode; + } + + public HttpStatus getStatusCode() { + return statusCode; + } +} diff --git a/src/test/java/we/fizz/input/RequestInputTests.java b/src/test/java/we/fizz/input/RequestInputTests.java index 96ffdc9..3f0271f 100644 --- a/src/test/java/we/fizz/input/RequestInputTests.java +++ b/src/test/java/we/fizz/input/RequestInputTests.java @@ -20,6 +20,7 @@ package we.fizz.input; import static org.junit.jupiter.api.Assertions.assertEquals; import org.junit.jupiter.api.Test; +import we.fizz.input.extension.request.RequestInput; /** *