From 7f048a88e85a1985ec0366edbbb22f191045abda Mon Sep 17 00:00:00 2001 From: Francis Dong Date: Tue, 1 Dec 2020 15:29:32 +0800 Subject: [PATCH] make aggregation step optional --- src/main/java/we/fizz/Pipeline.java | 56 ++++++++++++++++------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/src/main/java/we/fizz/Pipeline.java b/src/main/java/we/fizz/Pipeline.java index 3fa6564..44f8ea2 100644 --- a/src/main/java/we/fizz/Pipeline.java +++ b/src/main/java/we/fizz/Pipeline.java @@ -95,30 +95,38 @@ public class Pipeline { return Mono.just(aggregateResult); } - LinkedList opSteps = (LinkedList) steps.clone(); - Step step1 = opSteps.removeFirst(); - step1.beforeRun(stepContext, null); - Mono> result = createStep(step1).expand(response -> { - if (opSteps.isEmpty() || response.isStop()) { - return Mono.empty(); - } - Step step = opSteps.pop(); - step.beforeRun(stepContext, response); - return createStep(step); - }).flatMap(response -> Flux.just(response)).collectList(); - return result.flatMap(clientResponse -> { - // 数据转换 - long t3 = System.currentTimeMillis(); - AggregateResult aggResult = this.doInputDataMapping(input, null); - this.stepContext.addElapsedTime(input.getName()+"聚合接口响应结果数据转换", System.currentTimeMillis() - t3); - if(this.stepContext.isDebug()) { - LogService.setBizId(this.stepContext.getTraceId()); - String jsonString = JSON.toJSONString(aggResult); - LOGGER.info("aggResult {}", jsonString); - LOGGER.info("stepContext {}", JSON.toJSONString(stepContext)); - } - return Mono.just(aggResult); - }); + if(CollectionUtils.isEmpty(steps)) { + return handleOutput(input); + }else { + LinkedList opSteps = (LinkedList) steps.clone(); + Step step1 = opSteps.removeFirst(); + step1.beforeRun(stepContext, null); + Mono> result = createStep(step1).expand(response -> { + if (opSteps.isEmpty() || response.isStop()) { + return Mono.empty(); + } + Step step = opSteps.pop(); + step.beforeRun(stepContext, response); + return createStep(step); + }).flatMap(response -> Flux.just(response)).collectList(); + return result.flatMap(clientResponse -> { + return handleOutput(input); + }); + } + } + + private Mono handleOutput(Input input){ + // 数据转换 + long t3 = System.currentTimeMillis(); + AggregateResult aggResult = this.doInputDataMapping(input, null); + this.stepContext.addElapsedTime(input.getName()+"聚合接口响应结果数据转换", System.currentTimeMillis() - t3); + if(this.stepContext.isDebug()) { + LogService.setBizId(this.stepContext.getTraceId()); + String jsonString = JSON.toJSONString(aggResult); + LOGGER.info("aggResult {}", jsonString); + LOGGER.info("stepContext {}", JSON.toJSONString(stepContext)); + } + return Mono.just(aggResult); } @SuppressWarnings("unchecked")