make aggregation step optional

This commit is contained in:
Francis Dong
2020-12-01 15:29:32 +08:00
parent 9114a87624
commit 7f048a88e8

View File

@@ -95,30 +95,38 @@ public class Pipeline {
return Mono.just(aggregateResult);
}
LinkedList<Step> opSteps = (LinkedList<Step>) steps.clone();
Step step1 = opSteps.removeFirst();
step1.beforeRun(stepContext, null);
Mono<List<StepResponse>> result = createStep(step1).expand(response -> {
if (opSteps.isEmpty() || response.isStop()) {
return Mono.empty();
}
Step step = opSteps.pop();
step.beforeRun(stepContext, response);
return createStep(step);
}).flatMap(response -> Flux.just(response)).collectList();
return result.flatMap(clientResponse -> {
// 数据转换
long t3 = System.currentTimeMillis();
AggregateResult aggResult = this.doInputDataMapping(input, null);
this.stepContext.addElapsedTime(input.getName()+"聚合接口响应结果数据转换", System.currentTimeMillis() - t3);
if(this.stepContext.isDebug()) {
LogService.setBizId(this.stepContext.getTraceId());
String jsonString = JSON.toJSONString(aggResult);
LOGGER.info("aggResult {}", jsonString);
LOGGER.info("stepContext {}", JSON.toJSONString(stepContext));
}
return Mono.just(aggResult);
});
if(CollectionUtils.isEmpty(steps)) {
return handleOutput(input);
}else {
LinkedList<Step> opSteps = (LinkedList<Step>) steps.clone();
Step step1 = opSteps.removeFirst();
step1.beforeRun(stepContext, null);
Mono<List<StepResponse>> result = createStep(step1).expand(response -> {
if (opSteps.isEmpty() || response.isStop()) {
return Mono.empty();
}
Step step = opSteps.pop();
step.beforeRun(stepContext, response);
return createStep(step);
}).flatMap(response -> Flux.just(response)).collectList();
return result.flatMap(clientResponse -> {
return handleOutput(input);
});
}
}
private Mono<AggregateResult> handleOutput(Input input){
// 数据转换
long t3 = System.currentTimeMillis();
AggregateResult aggResult = this.doInputDataMapping(input, null);
this.stepContext.addElapsedTime(input.getName()+"聚合接口响应结果数据转换", System.currentTimeMillis() - t3);
if(this.stepContext.isDebug()) {
LogService.setBizId(this.stepContext.getTraceId());
String jsonString = JSON.toJSONString(aggResult);
LOGGER.info("aggResult {}", jsonString);
LOGGER.info("stepContext {}", JSON.toJSONString(stepContext));
}
return Mono.just(aggResult);
}
@SuppressWarnings("unchecked")