make fizz input scalable

This commit is contained in:
linwaiwai
2021-02-04 13:44:24 +08:00
parent fc24d5a1aa
commit 9cfa5cac4b
20 changed files with 544 additions and 234 deletions

View File

@@ -22,10 +22,10 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.nacos.api.config.annotation.NacosValue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import we.config.AppConfigProperties;
import we.fizz.input.ClientInputConfig;
import we.fizz.input.Input;
import we.fizz.input.InputType;
import we.fizz.input.*;
import org.apache.commons.io.FileUtils;
import org.noear.snack.ONode;
@@ -36,6 +36,8 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import we.fizz.input.extension.mysql.MySQLInput;
import we.fizz.input.extension.request.RequestInput;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@@ -46,6 +48,7 @@ import static we.util.Constants.Symbol.FORWARD_SLASH;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.ref.SoftReference;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
@@ -61,7 +64,8 @@ import java.util.concurrent.ConcurrentHashMap;
*/
@Component
public class ConfigLoader {
@Autowired
public static ConfigurableApplicationContext appContext;
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigLoader.class);
/**
@@ -115,14 +119,17 @@ public class ConfigLoader {
public Pipeline createPipeline(String configStr) throws IOException {
ONode cfgNode = ONode.loadStr(configStr);
InputFactory.registerInput(RequestInput.TYPE, RequestInput.class);
InputFactory.registerInput(MySQLInput.TYPE, MySQLInput.class);
Pipeline pipeline = new Pipeline();
pipeline.setApplicationContext(appContext);
List<Map<String, Object>> stepConfigs = cfgNode.select("$.stepConfigs").toObject(List.class);
for (Map<String, Object> stepConfig : stepConfigs) {
// set the specified env URL
this.handleRequestURL(stepConfig);
Step step = new Step.Builder().read(stepConfig);
SoftReference<Pipeline> weakPipeline = new SoftReference<Pipeline>(pipeline);
Step step = new Step.Builder().read(stepConfig, weakPipeline);
step.setName((String) stepConfig.get("name"));
if (stepConfig.get("stop") != null) {
step.setStop((Boolean) stepConfig.get("stop"));

View File

@@ -25,6 +25,7 @@ import java.util.Map;
import javax.script.ScriptException;
import org.springframework.context.ConfigurableApplicationContext;
import we.schema.util.I18nUtils;
import org.noear.snack.ONode;
import org.slf4j.Logger;
@@ -55,6 +56,7 @@ import we.util.MapUtil;
*
*/
public class Pipeline {
private ConfigurableApplicationContext applicationContext;
private static final Logger LOGGER = LoggerFactory.getLogger(Pipeline.class);
private LinkedList<Step> steps = new LinkedList<Step>();
private StepContext<String, Object> stepContext = new StepContext<>();
@@ -74,6 +76,7 @@ public class Pipeline {
ClientInputConfig config = (ClientInputConfig)input.getConfig();
this.initialStepContext(clientInput);
this.stepContext.setDebug(config.isDebug());
this.stepContext.setApplicationContext(applicationContext);
if(traceId != null) {
this.stepContext.setTraceId(traceId);
@@ -362,4 +365,12 @@ public class Pipeline {
}
}
}
public void setApplicationContext(ConfigurableApplicationContext appContext) {
this.applicationContext = applicationContext;
}
public ConfigurableApplicationContext getApplicationContext() {
return this.applicationContext;
}
}

View File

@@ -17,11 +17,13 @@
package we.fizz;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import org.springframework.http.ReactiveHttpOutputMessage;
@@ -45,8 +47,8 @@ import we.fizz.input.InputType;
*
*/
public class Step {
private String name;
private SoftReference<Pipeline> weakPipeline;
private String name;
// 是否在执行完当前step就返回
private boolean stop;
@@ -55,9 +57,22 @@ public class Step {
private Map<String, InputConfig> requestConfigs = new HashMap<String, InputConfig>();
public SoftReference<Pipeline> getWeakPipeline() {
return weakPipeline;
}
public void setWeakPipeline(SoftReference<Pipeline> weakPipeline) {
this.weakPipeline = weakPipeline;
}
public ConfigurableApplicationContext getCurrentApplicationContext() {
return this.getWeakPipeline() != null ? this.getWeakPipeline().get().getApplicationContext(): null;
}
public static class Builder {
public Step read(Map<String, Object> config) {
public Step read(Map<String, Object> config, SoftReference<Pipeline> weakPipeline) {
Step step = new Step();
step.setWeakPipeline(weakPipeline);
List<Map> requests= (List<Map>) config.get("requests");
for(Map requestConfig: requests) {
InputConfig inputConfig = InputFactory.createInputConfig(requestConfig);
@@ -68,6 +83,11 @@ public class Step {
}
private StepContext<String, Object> stepContext;
public StepContext<String, Object> getStepContext(){
return this.stepContext;
}
private StepResponse lastStepResponse = null;
private Map<String, Input> inputs = new HashMap<String, Input>();
public void beforeRun(StepContext<String, Object> stepContext2, StepResponse response ) {
@@ -80,6 +100,7 @@ public class Step {
InputConfig inputConfig = configs.get(configName);
InputType type = inputConfig.getType();
Input input = InputFactory.createInput(type.toString());
input.setWeakStep(new SoftReference<Step>(this));
input.setConfig(inputConfig);
input.setName(configName);
input.setStepResponse(stepResponse);

View File

@@ -23,10 +23,11 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.context.ConfigurableApplicationContext;
import we.constants.CommonConstants;
/**
*
*
* @author linwaiwai
* @author francis
*
@@ -35,26 +36,27 @@ import we.constants.CommonConstants;
*/
@SuppressWarnings("unchecked")
public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
private ConfigurableApplicationContext applicationContext;
public static final String ELAPSED_TIMES = "elapsedTimes";
public static final String DEBUG = "debug";
public static final String RETURN_CONTEXT = "returnContext";
// context field in response body
public static final String CONTEXT_FIELD = "_context";
// exception info
public static final String EXCEPTION_MESSAGE = "exceptionMessage";
public static final String EXCEPTION_STACKS = "exceptionStacks";
public static final String EXCEPTION_DATA = "exceptionData";
public void setDebug(Boolean debug) {
this.put((K)DEBUG, (V)debug);
}
public String getTraceId() {
return (String) this.get(CommonConstants.TRACE_ID);
}
public void setTraceId(String traceId) {
this.put((K)CommonConstants.TRACE_ID, (V)traceId);
}
@@ -74,7 +76,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
public boolean returnContext() {
return Boolean.valueOf((String)getInputReqHeader(RETURN_CONTEXT));
}
/**
* set exception information
* @param cause exception
@@ -97,7 +99,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
this.put((K) EXCEPTION_STACKS, (V) arr);
}
}
public synchronized void addElapsedTime(String actionName, Long milliSeconds) {
List<Map<String, Long>> elapsedTimes = (List<Map<String, Long>>) this.get(ELAPSED_TIMES);
if (elapsedTimes == null) {
@@ -129,7 +131,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 设置Step里调用接口的请求头
*
*
* @param stepName
* @param requestName
* @param headerName
@@ -155,7 +157,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取Step里调用接口的请求头
*
*
* @param stepName
* @param requestName
* @param headerName
@@ -178,7 +180,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 设置Step里调用接口的请求body
*
*
* @param stepName
* @param requestName
* @param fieldName
@@ -204,7 +206,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取Step里调用接口的请求body
*
*
* @param stepName
* @param requestName
* @param fieldName 字段名
@@ -228,7 +230,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取Step里调用接口的请求body
*
*
* @param stepName
* @param requestName
*/
@@ -244,7 +246,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
}
return req.get("body");
}
/**
* 获取Step里调用的接口的URL参数
* @param stepName 步骤名【必填】
@@ -263,7 +265,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
}
return req.get("params");
}
/**
* 获取Step里调用的接口的URL参数
* @param stepName 步骤名【必填】
@@ -277,7 +279,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 设置Step里调用接口响应头
*
*
* @param stepName
* @param requestName
* @param headerName
@@ -303,7 +305,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取Step里调用接口响应头
*
*
* @param stepName
* @param requestName
* @param headerName
@@ -326,7 +328,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 设置Step里调用接口的响应body
*
*
* @param stepName
* @param requestName
* @param key
@@ -352,7 +354,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取Step里调用接口的响应body
*
*
* @param stepName
* @param requestName
* @param key
@@ -375,7 +377,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取Step里调用接口的响应body
*
*
* @param stepName
* @param requestName
*/
@@ -393,7 +395,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 设置Step的结果
*
*
* @param stepName
* @param key
* @param value
@@ -413,7 +415,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取Step的结果
*
*
* @param stepName
* @param key
*/
@@ -431,7 +433,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取Step的结果
*
*
* @param stepName
*/
public Object getStepResult(String stepName) {
@@ -444,7 +446,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 设置聚合接口的响应头
*
*
* @param headerName
* @param headerValue
*/
@@ -467,7 +469,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取聚合接口的响应头
*
*
* @param headerName
*/
public Object getInputRespHeader(String headerName) {
@@ -488,7 +490,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取聚合接口的请求头
*
*
* @param headerName
*/
public Object getInputReqHeader(String headerName) {
@@ -509,7 +511,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 设置聚合接口的响应body
*
*
* @param fieldName
* @param value
*/
@@ -533,7 +535,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取聚合接口的响应body
*
*
* @param fieldName
*/
public Object getInputRespBody(String fieldName) {
@@ -554,7 +556,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取聚合接口的响应body
*
*
*/
public Object getInputRespBody() {
Map<String, Object> input = (Map<String, Object>) this.get("input");
@@ -570,7 +572,7 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取聚合接口的请求body
*
*
* @param fieldName
*/
public Object getInputReqBody(String fieldName) {
@@ -583,19 +585,19 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
/**
* 获取聚合接口的请求body
*
*
*/
public Object getInputReqBody() {
return getInputReqAttr("body");
}
/**
* 获取客户端URL请求参数query string
*/
public Object getInputReqParam() {
return this.getInputReqAttr("params");
}
/**
* 获取客户端URL请求参数query string
* @param paramName URL参数名
@@ -604,11 +606,11 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
Map<String, Object> params = (Map<String, Object>) this.getInputReqAttr("params");
return params == null ? null : paramName == null ? params : params.get(paramName);
}
/**
* 获取聚合接口请求属性<br/>
* 可选属性path,method,headers,params,body
*
*
*/
public Object getInputReqAttr(String key) {
Map<String, Object> input = (Map<String, Object>) this.get("input");
@@ -622,4 +624,11 @@ public class StepContext<K, V> extends ConcurrentHashMap<K, V> {
return request.get(key);
}
public ConfigurableApplicationContext getApplicationContext(){
return this.applicationContext;
}
public void setApplicationContext(ConfigurableApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
}

View File

@@ -0,0 +1,12 @@
package we.fizz.exception;
public class FizzException extends Exception {
public FizzException(Throwable exception) {
super(exception);
}
public FizzException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,7 @@
package we.fizz.exception;
public class FizzRuntimeException extends RuntimeException {
public FizzRuntimeException (String message){
super(message);
}
}

View File

@@ -41,6 +41,7 @@ public class ClientInputConfig extends InputConfig {
@SuppressWarnings("unchecked")
public ClientInputConfig(Map configBody) {
super(configBody);
if(configBody.get("debug") != null) {
this.debug = (boolean) configBody.get("debug");
}
@@ -73,10 +74,11 @@ public class ClientInputConfig extends InputConfig {
validateResponse = ((Map) configBody.get("validateResponse"));
}
}
public ClientInputConfig() {
}
super(null);
}
public boolean isDebug() {
return debug;

View File

@@ -0,0 +1,29 @@
package we.fizz.input;
import org.springframework.context.ConfigurableApplicationContext;
import reactor.core.publisher.Mono;
import we.fizz.Step;
import we.fizz.StepContext;
import we.fizz.StepResponse;
import java.lang.ref.SoftReference;
import java.util.Map;
public interface IInput {
public static final InputType TYPE = null;
public static Class inputConfigClass() {
return null;
}
public String getName() ;
public boolean needRun(StepContext<String, Object> stepContext);
public void beforeRun(InputContext context);
public Mono<Map> run();
public StepResponse getStepResponse() ;
public void setStepResponse(StepResponse stepResponse);
public SoftReference<Step> getWeakStep();
public void setWeakStep(SoftReference<Step> weakStep);
public ConfigurableApplicationContext getCurrentApplicationContext();
}

View File

@@ -16,10 +16,12 @@
*/
package we.fizz.input;
import java.util.HashMap;
import java.lang.ref.SoftReference;
import java.util.Map;
import org.springframework.context.ConfigurableApplicationContext;
import reactor.core.publisher.Mono;
import we.fizz.Step;
import we.fizz.StepContext;
import we.fizz.StepResponse;
@@ -34,7 +36,7 @@ public class Input {
protected InputContext inputContext;
protected StepResponse lastStepResponse = null;
protected StepResponse stepResponse;
private SoftReference<Step> weakStep;
public void setConfig(InputConfig inputConfig) {
config = inputConfig;
}
@@ -67,7 +69,7 @@ public class Input {
}
public void setName(String configName) {
this.name = configName;
}
public StepResponse getStepResponse() {
@@ -76,7 +78,20 @@ public class Input {
public void setStepResponse(StepResponse stepResponse) {
this.stepResponse = stepResponse;
}
public SoftReference<Step> getWeakStep() {
return weakStep;
}
public void setWeakStep(SoftReference<Step> weakStep) {
this.weakStep = weakStep;
}
public ConfigurableApplicationContext getCurrentApplicationContext(){
return this.getWeakStep() != null ? this.getWeakStep().get().getCurrentApplicationContext() : null;
}
public static Class inputConfigClass (){
return InputConfig.class;
}
}

View File

@@ -17,6 +17,7 @@
package we.fizz.input;
import java.util.HashMap;
import java.util.Map;
/**
@@ -28,6 +29,10 @@ public class InputConfig {
private InputType type;
protected Map<String, Object> dataMapping;
protected Map<String, Object> configMap;
public InputConfig(Map aConfigMap) {
configMap = aConfigMap;
}
public InputType getType() {
return type;
@@ -45,4 +50,17 @@ public class InputConfig {
this.dataMapping = dataMapping;
}
private Map<String,String> fallback = new HashMap<String, String>();
public Map<String, String> getFallback() {
return fallback;
}
public void setFallback(Map<String, String> fallback) {
this.fallback = fallback;
}
public void parse(){
}
}

View File

@@ -17,6 +17,11 @@
package we.fizz.input;
import we.fizz.exception.FizzRuntimeException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;
/**
@@ -25,38 +30,54 @@ import java.util.Map;
*
*/
public class InputFactory {
public static InputConfig createInputConfig(Map config) {
public static Map<InputType, Class> inputClasses = new HashMap<InputType, Class>();
public static void registerInput(InputType type, Class inputClass){
inputClasses.put(type, inputClass);
}
public static void unregisterInput(InputType type){
inputClasses.remove(type);
}
public static InputConfig createInputConfig(Map config) {
String type = (String) config.get("type");
InputType typeEnum = InputType.valueOf(type.toUpperCase());
InputConfig inputConfig = null;
switch(typeEnum) {
case REQUEST:
inputConfig = new RequestInputConfig(config);
break;
case MYSQL:
inputConfig = new MySQLInputConfig(config);
break;
if (inputClasses.containsKey(typeEnum)){
Class<?> InputClass = inputClasses.get(typeEnum);
try {
Method inputConfigClassMethod = InputClass.getMethod("inputConfigClass");
Class<?> InputConfigClass = (Class<?>) inputConfigClassMethod.invoke(null);
Constructor constructor = null;
constructor = InputConfigClass.getDeclaredConstructor(Map.class);
constructor.setAccessible(true);
inputConfig = (InputConfig) constructor.newInstance(config);
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new FizzRuntimeException(e.getMessage());
}
inputConfig.setType(typeEnum);
inputConfig.setDataMapping((Map<String, Object>) config.get("dataMapping"));
inputConfig.parse();
return inputConfig;
}
inputConfig.setType(typeEnum);
inputConfig.setDataMapping((Map<String, Object>) config.get("dataMapping"));
return inputConfig;
return null;
}
public static Input createInput(String type) {
InputType typeEnum = InputType.valueOf(type.toUpperCase());
Input input = null;
switch(typeEnum) {
case REQUEST:
input = new RequestInput();
break;
case MYSQL:
input = new MySQLInput();
break;
if (inputClasses.containsKey(typeEnum)) {
Class<?> InputClass = inputClasses.get(typeEnum);
Constructor constructor = null;
try {
constructor = InputClass.getDeclaredConstructor();
constructor.setAccessible(true);
input = (Input) constructor.newInstance();
return input;
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new FizzRuntimeException(e.getMessage());
}
}
return input;
return null;
}
}

View File

@@ -17,16 +17,28 @@
package we.fizz.input;
import java.util.HashMap;
import java.util.Map;
/**
*
* @author linwaiwai
*
*/
public enum InputType {
REQUEST("REQUEST"),
MYSQL("MYSQL");
private final String type;
private InputType(String aType) {
public class InputType {
private final String type;
static private Map<String,InputType > inputs = new HashMap<String,InputType >();
public InputType(String aType) {
this.type = aType;
inputs.put(aType, this);
}
public static InputType valueOf(String string) {
return inputs.get(string);
}
public String toString(){
return type;
}
}

View File

@@ -0,0 +1,129 @@
package we.fizz.input;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import reactor.core.publisher.Mono;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.input.extension.request.RequestInputConfig;
import we.flume.clients.log4j2appender.LogService;
import we.util.JacksonUtils;
import javax.script.ScriptException;
import java.util.HashMap;
import java.util.Map;
public class RPCInput extends Input {
protected static final Logger LOGGER = LoggerFactory.getLogger(RPCInput.class.getName());
protected static final String FALLBACK_MODE_STOP = "stop";
protected static final String FALLBACK_MODE_CONTINUE = "continue";
protected Map<String, Object> request = new HashMap<>();
protected Map<String, Object> response = new HashMap<>();
protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) {
}
protected void doOnResponseSuccess(RPCResponse cr, long elapsedMillis) {
}
protected Mono<String> bodyToMono(RPCResponse cr){
return cr.getBodyMono();
}
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
}
protected void doOnBodySuccess(String resp, long elapsedMillis) {
}
protected void doResponseMapping(InputConfig aConfig, InputContext inputContext, String responseBody) {
}
@Override
@SuppressWarnings("unchecked")
public boolean needRun(StepContext<String, Object> stepContext) {
Map<String, Object> condition = ((RequestInputConfig) config).getCondition();
if (CollectionUtils.isEmpty(condition)) {
// 没有配置condition直接运行
return Boolean.TRUE;
}
ONode ctxNode = PathMapping.toONode(stepContext);
try {
Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class);
return needRun != null ? needRun : Boolean.TRUE;
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e);
throw new ExecuteScriptException(e, stepContext, condition);
}
}
private String prefix;
@Override
public Mono<Map> run() {
long t1 = System.currentTimeMillis();
this.doRequestMapping(config, inputContext);
inputContext.getStepContext().addElapsedTime(stepResponse.getStepName() + "-" + this.name + "-RequestMapping",
System.currentTimeMillis() - t1);
prefix = stepResponse.getStepName() + "-" + "调用接口";
long start = System.currentTimeMillis();
Mono<RPCResponse> rpcResponse = this.getClientSpecFromContext(config, inputContext);
Mono<String> body = rpcResponse.flatMap(cr->{
return Mono.just(cr).doOnError(throwable -> cleanup(cr));
}).doOnSuccess(cr -> {
long elapsedMillis = System.currentTimeMillis() - start;
this.doOnResponseSuccess(cr, elapsedMillis);
}).flatMap(cr -> { return this.bodyToMono(cr); }).doOnSuccess(resp -> {
long elapsedMillis = System.currentTimeMillis() - start;
this.doOnBodySuccess(resp, elapsedMillis);
}).doOnError(ex -> {
long elapsedMillis = System.currentTimeMillis() - start;
this.doOnBodyError(ex, elapsedMillis);
});
// fallback handler
InputConfig reqConfig = (InputConfig) config;
if (reqConfig.getFallback() != null) {
Map<String, String> fallback = reqConfig.getFallback();
String mode = fallback.get("mode");
if (FALLBACK_MODE_STOP.equals(mode)) {
body = body.onErrorStop();
} else if (FALLBACK_MODE_CONTINUE.equals(mode)) {
body = body.onErrorResume(ex -> {
return Mono.just(fallback.get("defaultResult"));
});
} else {
body = body.onErrorStop();
}
}
return body.flatMap(item -> {
Map<String, Object> result = new HashMap<String, Object>();
result.put("data", item);
result.put("request", this);
long t3 = System.currentTimeMillis();
this.doResponseMapping(config, inputContext, item);
inputContext.getStepContext().addElapsedTime(
stepResponse.getStepName() + "-" + this.name + "-ResponseMapping", System.currentTimeMillis() - t3);
return Mono.just(result);
});
}
private void cleanup(RPCResponse clientResponse) {
}
protected Mono<RPCResponse> getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) {
return null;
}
}

View File

@@ -0,0 +1,32 @@
package we.fizz.input;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.util.MultiValueMap;
import reactor.core.publisher.Mono;
/**
*
* @author linwaiwai
*
*/
public class RPCResponse {
private MultiValueMap headers;
private Mono<String> bodyMono;
public MultiValueMap getHeaders() {
return headers;
}
public void setHeaders(MultiValueMap headers) {
this.headers = headers;
}
public Mono<String> getBodyMono() {
return bodyMono;
}
public void setBodyMono(Mono<String> bodyMono) {
this.bodyMono = bodyMono;
}
}

View File

@@ -15,13 +15,20 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.fizz.input;
package we.fizz.input.extension.mysql;
import we.fizz.input.IInput;
import we.fizz.input.Input;
import we.fizz.input.InputType;
/**
*
* @author linwaiwai
*
*/
public class MySQLInput extends Input {
public class MySQLInput extends Input implements IInput {
static public InputType TYPE = new InputType("MYSQL");
public static Class inputConfigClass (){
return MySQLInputConfig.class;
}
}

View File

@@ -15,7 +15,9 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.fizz.input;
package we.fizz.input.extension.mysql;
import we.fizz.input.InputConfig;
import java.util.Map;
@@ -27,7 +29,7 @@ import java.util.Map;
public class MySQLInputConfig extends InputConfig {
public MySQLInputConfig(Map configBody) {
// TODO Auto-generated constructor stub
super(configBody);
}
}

View File

@@ -15,19 +15,19 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.fizz.input;
package we.fizz.input.extension.request;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.script.ScriptException;
import org.noear.snack.ONode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.util.UriComponents;
@@ -36,11 +36,11 @@ import org.springframework.web.util.UriComponentsBuilder;
import com.alibaba.fastjson.JSON;
import reactor.core.publisher.Mono;
import we.FizzAppContext;
import we.constants.CommonConstants;
import we.exception.ExecuteScriptException;
import we.fizz.StepContext;
import we.fizz.StepResponse;
import we.fizz.input.*;
import we.flume.clients.log4j2appender.LogService;
import we.proxy.FizzWebClient;
import we.util.JacksonUtils;
@@ -53,9 +53,9 @@ import we.util.MapUtil;
*
*/
@SuppressWarnings("unchecked")
public class RequestInput extends Input {
public class RequestInput extends RPCInput implements IInput{
private static final Logger LOGGER = LoggerFactory.getLogger(RequestInput.class);
static public InputType TYPE = new InputType("REQUEST");
private InputType type;
protected Map<String, Object> dataMapping;
protected Map<String, Object> request = new HashMap<>();
@@ -63,15 +63,17 @@ public class RequestInput extends Input {
private static final String FALLBACK_MODE_STOP = "stop";
private static final String FALLBACK_MODE_CONTINUE = "continue";
private static final String CONTENT_TYPE_JSON = "application/json";
private static final String CONTENT_TYPE_XML = "application/xml";
private static final String CONTENT_TYPE_JS = "application/javascript";
private static final String CONTENT_TYPE_HTML = "text/html";
private static final String CONTENT_TYPE_TEXT = "text/plain";
private static final String CONTENT_TYPE = "content-type";
private static final String CONTENT_TYPE_JSON = "application/json";
private static final String CONTENT_TYPE_XML = "application/xml";
private static final String CONTENT_TYPE_JS = "application/javascript";
private static final String CONTENT_TYPE_HTML = "text/html";
private static final String CONTENT_TYPE_TEXT = "text/plain";
private static final String CONTENT_TYPE = "content-type";
public InputType getType() {
return type;
}
@@ -88,7 +90,7 @@ public class RequestInput extends Input {
this.dataMapping = dataMapping;
}
private void doRequestMapping(InputConfig aConfig, InputContext inputContext) {
protected void doRequestMapping(InputConfig aConfig, InputContext inputContext) {
RequestInputConfig config = (RequestInputConfig) aConfig;
// 把请求信息放入stepContext
@@ -112,7 +114,7 @@ public class RequestInput extends Input {
Map<String, Object> requestMapping = (Map<String, Object>) dataMapping.get("request");
if (requestMapping != null && !StringUtils.isEmpty(requestMapping)) {
ONode ctxNode = PathMapping.toONode(stepContext);
// headers
request.put("headers",
PathMapping.transform(ctxNode, stepContext,
@@ -155,9 +157,11 @@ public class RequestInput extends Input {
request.put("url", uriComponents.toUriString());
}
private void doResponseMapping(InputConfig aConfig, InputContext inputContext, Object responseBody) {
protected void doResponseMapping(InputConfig aConfig, InputContext inputContext, Object responseBody) {
RequestInputConfig config = (RequestInputConfig) aConfig;
response.put("body", responseBody);
// 数据转换
if (inputContext != null && inputContext.getStepContext() != null) {
StepContext<String, Object> stepContext = inputContext.getStepContext();
@@ -210,7 +214,8 @@ public class RequestInput extends Input {
}
}
private Mono<ClientResponse> getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) {
@Override
protected Mono<RPCResponse> getClientSpecFromContext(InputConfig aConfig, InputContext inputContext) {
RequestInputConfig config = (RequestInputConfig) aConfig;
int timeout = config.getTimeout() < 1 ? 3000 : config.getTimeout() > 10000 ? 10000 : config.getTimeout();
@@ -222,6 +227,7 @@ public class RequestInput extends Input {
if (headers == null) {
headers = new HashMap<>();
}
if (!headers.containsKey("Content-Type")) {
// defalut content-type
headers.put("Content-Type", "application/json; charset=UTF-8");
@@ -232,9 +238,19 @@ public class RequestInput extends Input {
String aggrPath = (String)inputContext.getStepContext().getInputReqAttr("path");
String aggrService = aggrPath.split("\\/")[2];
FizzWebClient client = FizzAppContext.appContext.getBean(FizzWebClient.class);
return client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url,
// FizzWebClient client = FizzAppContext.appContext.getBean(FizzWebClient.class);
FizzWebClient client = this.getCurrentApplicationContext().getBean(FizzWebClient.class);
Mono<ClientResponse> clientResponse = client.aggrSend(aggrService, aggrMethod, aggrPath, null, method, url,
MapUtil.toHttpHeaders(headers), request.get("body"), (long)timeout);
return clientResponse.flatMap(cr->{
RequestRPCResponse response = new RequestRPCResponse();
response.setHeaders(cr.headers().asHttpHeaders());
response.setBodyMono(cr.bodyToMono(String.class));
response.setStatus(cr.statusCode());
return Mono.just(response);
});
}
private Map<String, Object> getResponses(Map<String, StepResponse> stepContext2) {
@@ -242,103 +258,32 @@ public class RequestInput extends Input {
return null;
}
@Override
@SuppressWarnings("unchecked")
public boolean needRun(StepContext<String, Object> stepContext) {
Map<String, Object> condition = ((RequestInputConfig) config).getCondition();
if (CollectionUtils.isEmpty(condition)) {
// 没有配置condition直接运行
return Boolean.TRUE;
}
ONode ctxNode = PathMapping.toONode(stepContext);
try {
Boolean needRun = ScriptHelper.execute(condition, ctxNode, stepContext, Boolean.class);
return needRun != null ? needRun : Boolean.TRUE;
} catch (ScriptException e) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
LOGGER.warn("execute script failed, {}", JacksonUtils.writeValueAsString(condition), e);
throw new ExecuteScriptException(e, stepContext, condition);
}
}
@Override
public Mono<Map> run() {
long t1 = System.currentTimeMillis();
this.doRequestMapping(config, inputContext);
inputContext.getStepContext().addElapsedTime(stepResponse.getStepName() + "-" + this.name + "-RequestMapping",
System.currentTimeMillis() - t1);
Map<String, Object> tmpMap = new HashMap<>();
String prefix = stepResponse.getStepName() + "-" + "调用接口";
long start = System.currentTimeMillis();
Mono<ClientResponse> clientResponse = this.getClientSpecFromContext(config, inputContext);
Mono<String> body = clientResponse.flatMap(cr->{
return Mono.just(cr).doOnError(throwable -> cleanup(cr));
}).flatMap(cr -> {
long elapsedMillis = System.currentTimeMillis() - start;
HttpHeaders httpHeaders = cr.headers().asHttpHeaders();
Map<String, Object> headers = new HashMap<>();
httpHeaders.forEach((key, value) -> {
if (value.size() > 1) {
headers.put(key, value);
} else {
headers.put(key, httpHeaders.getFirst(key));
}
});
tmpMap.put(CONTENT_TYPE, httpHeaders.getFirst(CONTENT_TYPE));
headers.put("elapsedTime", elapsedMillis + "ms");
this.response.put("headers", headers);
inputContext.getStepContext().addElapsedTime(prefix + request.get("url"), elapsedMillis);
return cr.bodyToMono(String.class);
}).doOnSuccess(resp -> {
long elapsedMillis = System.currentTimeMillis() - start;
if(inputContext.getStepContext().isDebug()) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"),
JSON.toJSONString(this.request.get("headers")),
JSON.toJSONString(this.request.get("body")), resp);
}
}).doOnError(ex -> {
LogService.setBizId(inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", request.get("url"), ex);
long elapsedMillis = System.currentTimeMillis() - start;
inputContext.getStepContext().addElapsedTime(
stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis);
});
// fallback handler
RequestInputConfig reqConfig = (RequestInputConfig) config;
if (reqConfig.getFallback() != null) {
Map<String, String> fallback = reqConfig.getFallback();
String mode = fallback.get("mode");
if (FALLBACK_MODE_STOP.equals(mode)) {
body = body.onErrorStop();
} else if (FALLBACK_MODE_CONTINUE.equals(mode)) {
body = body.onErrorResume(ex -> {
return Mono.just(fallback.get("defaultResult"));
});
protected void doOnResponseSuccess(ClientResponse cr, long elapsedMillis) {
HttpHeaders httpHeaders = cr.headers().asHttpHeaders();
Map<String, Object> headers = new HashMap<>();
httpHeaders.forEach((key, value) -> {
if (value.size() > 1) {
headers.put(key, value);
} else {
body = body.onErrorStop();
headers.put(key, httpHeaders.getFirst(key));
}
}
return body.flatMap(item -> {
Map<String, Object> result = new HashMap<String, Object>();
result.put("data", item);
result.put("request", this);
long t3 = System.currentTimeMillis();
this.doResponseMapping(config, inputContext, parseBody((String) tmpMap.get(CONTENT_TYPE), item));
inputContext.getStepContext().addElapsedTime(
stepResponse.getStepName() + "-" + this.name + "-ResponseMapping", System.currentTimeMillis() - t3);
return Mono.just(result);
});
headers.put("elapsedTime", elapsedMillis + "ms");
this.response.put("headers", headers);
inputContext.getStepContext().addElapsedTime(prefix + request.get("url"),
elapsedMillis);
}
protected Mono<String> bodyToMono(ClientResponse cr){
return cr.bodyToMono(String.class);
}
protected void doOnBodyError(Throwable ex, long elapsedMillis) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
LOGGER.warn("failed to call {}", request.get("url"), ex);
inputContext.getStepContext().addElapsedTime(
stepResponse.getStepName() + "-" + "调用接口 failed " + request.get("url"), elapsedMillis);
}
// Parse response body according to content-type header
public Object parseBody(String contentType, String responseBody) {
String[] cts = contentType.split(";");
@@ -346,32 +291,32 @@ public class RequestInput extends Input {
for (int i = 0; i < cts.length; i++) {
String ct = cts[i].toLowerCase();
switch (ct) {
case CONTENT_TYPE_JSON:
body = JSON.parse(responseBody);
break;
case CONTENT_TYPE_TEXT:
// parse text as json if start with "{" and end with "}" or start with "[" and
// end with "]"
if ((responseBody.startsWith("{") && responseBody.endsWith("}"))
|| (responseBody.startsWith("[") && responseBody.endsWith("]"))) {
try {
body = JSON.parse(responseBody);
} catch (Exception e) {
case CONTENT_TYPE_JSON:
body = JSON.parse(responseBody);
break;
case CONTENT_TYPE_TEXT:
// parse text as json if start with "{" and end with "}" or start with "[" and
// end with "]"
if ((responseBody.startsWith("{") && responseBody.endsWith("}"))
|| (responseBody.startsWith("[") && responseBody.endsWith("]"))) {
try {
body = JSON.parse(responseBody);
} catch (Exception e) {
body = responseBody;
}
} else {
body = responseBody;
}
} else {
break;
case CONTENT_TYPE_XML:
body = responseBody;
}
break;
case CONTENT_TYPE_XML:
body = responseBody;
break;
case CONTENT_TYPE_HTML:
body = responseBody;
break;
case CONTENT_TYPE_JS:
body = responseBody;
break;
break;
case CONTENT_TYPE_HTML:
body = responseBody;
break;
case CONTENT_TYPE_JS:
body = responseBody;
break;
}
if (body != null) {
break;
@@ -382,11 +327,28 @@ public class RequestInput extends Input {
}
return body;
}
protected void doOnBodySuccess(String resp, long elapsedMillis) {
if(inputContext.getStepContext().isDebug()) {
LogService.setBizId(inputContext.getStepContext().getTraceId());
LOGGER.info("{} 耗时:{}ms URL={}, reqHeader={} req={} resp={}", prefix, elapsedMillis, request.get("url"),
JSON.toJSONString(this.request.get("headers")),
JSON.toJSONString(this.request.get("body")), resp);
}
}
private String prefix;
private void cleanup(ClientResponse clientResponse) {
if (clientResponse != null) {
clientResponse.bodyToMono(Void.class).subscribe();
}
}
public static Class inputConfigClass (){
return RequestInputConfig.class;
}
}

View File

@@ -15,7 +15,7 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package we.fizz.input;
package we.fizz.input.extension.request;
import java.net.MalformedURLException;
import java.net.URL;
@@ -25,6 +25,8 @@ import java.util.Map;
import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.util.UriComponentsBuilder;
import we.fizz.input.InputConfig;
import we.fizz.input.InputType;
/**
*
@@ -32,14 +34,16 @@ import org.springframework.web.util.UriComponentsBuilder;
* @author francis
*
*/
public class RequestInputConfig extends InputConfig{
public class RequestInputConfig extends InputConfig {
private URL url ;
private String method ;
private int timeout = 3;
private Map<String,String> fallback = new HashMap<String, String>();
private Map<String, Object> condition;
public RequestInputConfig(Map configBody) {
super(configBody);
String url = (String) configBody.get("url");
if(StringUtils.isEmpty(url)) {
throw new RuntimeException("Request URL can not be blank");
@@ -54,7 +58,8 @@ public class RequestInputConfig extends InputConfig{
timeout = Integer.valueOf(configBody.get("timeout").toString());
}
if (configBody.get("fallback") != null) {
fallback = (Map<String,String>)configBody.get("fallback");
Map<String,String> fallback = (Map<String,String>)configBody.get("fallback");
setFallback(fallback);
}
if (configBody.get("condition") != null) {
setCondition((Map)configBody.get("condition"));
@@ -110,13 +115,6 @@ public class RequestInputConfig extends InputConfig{
this.timeout = timeout;
}
public Map<String, String> getFallback() {
return fallback;
}
public void setFallback(Map<String, String> fallback) {
this.fallback = fallback;
}
public Map<String, Object> getCondition() {
return condition;

View File

@@ -0,0 +1,15 @@
package we.fizz.input.extension.request;
import org.springframework.http.HttpStatus;
import we.fizz.input.RPCResponse;
public class RequestRPCResponse extends RPCResponse {
private HttpStatus statusCode;
public void setStatus(HttpStatus statusCode) {
this.statusCode = statusCode;
}
public HttpStatus getStatusCode() {
return statusCode;
}
}

View File

@@ -20,6 +20,7 @@ package we.fizz.input;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
import we.fizz.input.extension.request.RequestInput;
/**
*