From f055bcb070a19d17bfd0353712537b743f1d5e73 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Tue, 12 Oct 2021 13:23:36 +0800 Subject: [PATCH 1/8] Update netty to 4.1.69.Final --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 133fa4a..8d4ba2a 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ Dysprosium-SR23 5.3.7.RELEASE 2.2.6.RELEASE - 4.1.68.Final + 4.1.69.Final 4.4.14 2.14.1 1.7.32 From 6128c78d867be9f2795426b3131ac18e3cb7e396 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Tue, 12 Oct 2021 13:25:11 +0800 Subject: [PATCH 2/8] Update netty to 4.1.69.Final --- fizz-bootstrap/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml index e4972d7..f033b84 100644 --- a/fizz-bootstrap/pom.xml +++ b/fizz-bootstrap/pom.xml @@ -20,7 +20,7 @@ Dragonfruit-SR3 Dysprosium-SR23 5.3.7.RELEASE - 4.1.68.Final + 4.1.69.Final 4.4.14 2.14.1 1.7.32 From 0190ac2c31f8f4e61a6698485377817dd79fceec Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Tue, 12 Oct 2021 20:04:54 +0800 Subject: [PATCH 3/8] Rename Dict to GlobalResource --- .../we/controller/CacheCheckController.java | 8 +- .../src/main/java/we/dict/DictService.java | 152 ---------------- .../GlobalResource.java} | 38 ++-- .../GlobalResourceService.java | 168 ++++++++++++++++++ .../java/we/plugin/FizzPluginFilterChain.java | 2 +- fizz-core/src/main/java/we/proxy/Route.java | 4 +- .../GlobalResourceTests.java} | 35 ++-- 7 files changed, 217 insertions(+), 190 deletions(-) delete mode 100644 fizz-core/src/main/java/we/dict/DictService.java rename fizz-core/src/main/java/we/{dict/Dict.java => global_resource/GlobalResource.java} (75%) create mode 100644 fizz-core/src/main/java/we/global_resource/GlobalResourceService.java rename fizz-core/src/test/java/we/{dict/DictTests.java => global_resource/GlobalResourceTests.java} (56%) diff --git a/fizz-core/src/main/java/we/controller/CacheCheckController.java b/fizz-core/src/main/java/we/controller/CacheCheckController.java index 874c2dd..d5e7ce9 100644 --- a/fizz-core/src/main/java/we/controller/CacheCheckController.java +++ b/fizz-core/src/main/java/we/controller/CacheCheckController.java @@ -22,7 +22,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; -import we.dict.DictService; +import we.global_resource.GlobalResourceService; import we.plugin.auth.ApiConfigService; import we.plugin.auth.ApiConifg2appsService; import we.plugin.auth.AppService; @@ -56,7 +56,7 @@ public class CacheCheckController { private ApiConifg2appsService apiConifg2appsService; @Resource - private DictService dictService; + private GlobalResourceService globalResourceService; @GetMapping("/gatewayGroups") public Mono gatewayGroups(ServerWebExchange exchange) { @@ -88,8 +88,8 @@ public class CacheCheckController { return Mono.just(JacksonUtils.writeValueAsString(apiConifg2appsService.getApiConfig2appsMap())); } - @GetMapping("/dicts") + @GetMapping("/globalResources") public Mono dicts(ServerWebExchange exchange) { - return Mono.just(JacksonUtils.writeValueAsString(dictService.getDictMap())); + return Mono.just(JacksonUtils.writeValueAsString(globalResourceService.getResourceMap())); } } diff --git a/fizz-core/src/main/java/we/dict/DictService.java b/fizz-core/src/main/java/we/dict/DictService.java deleted file mode 100644 index 9e8018e..0000000 --- a/fizz-core/src/main/java/we/dict/DictService.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright (C) 2020 the original author or authors. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package we.dict; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.data.redis.core.ReactiveStringRedisTemplate; -import org.springframework.stereotype.Service; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; -import we.FizzAppContext; -import we.config.AggregateRedisConfig; -import we.util.JacksonUtils; -import we.util.ReactiveResult; -import we.util.Result; -import we.util.Utils; - -import javax.annotation.PostConstruct; -import javax.annotation.Resource; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * @author hongqiaowei - */ - -@Service -public class DictService { - - private static final Logger log = LoggerFactory.getLogger(DictService.class); - - private Map dictMap = new HashMap<>(64); - - @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE) - private ReactiveStringRedisTemplate rt; - - @PostConstruct - public void init() throws Throwable { - initDict().subscribe( - r -> { - if (r.code == ReactiveResult.SUCC) { - lsnInitChange().subscribe( - res -> { - if (res.code == ReactiveResult.FAIL) { - log.error(res.toString()); - if (res.t == null) { - throw Utils.runtimeExceptionWithoutStack("lsn dict error"); - } - throw new RuntimeException(res.t); - } - } - ); - } else { - log.error(r.toString()); - if (r.t == null) { - throw Utils.runtimeExceptionWithoutStack("init dict error"); - } - throw new RuntimeException(r.t); - } - } - ); - } - - private Mono> initDict() { - Flux> dicts = rt.opsForHash().entries("fizz_dict"); - dicts.collectList() - .defaultIfEmpty(Collections.emptyList()) - .flatMap( - es -> { - if (FizzAppContext.appContext != null) { - for (Map.Entry e : es) { - String json = (String) e.getValue(); - Dict dict = JacksonUtils.readValue(json, Dict.class); - dictMap.put(dict.key, dict); - log.info("init dict: {}", dict); - } - } - return Mono.empty(); - } - ) - .doOnError( - t -> { - log.error("init dict", t); - } - ) - .block(); - return Mono.just(Result.succ()); - } - - private Mono> lsnInitChange() { - Result result = Result.succ(); - String channel = "fizz_dict_channel"; - rt.listenToChannel(channel) - .doOnError( - t -> { - result.code = ReactiveResult.FAIL; - result.t = t; - log.error("lsn {}", channel, t); - } - ) - .doOnSubscribe( - s -> { - log.info("success to lsn on {}", channel); - } - ) - .doOnNext( - msg -> { - if (FizzAppContext.appContext != null) { - String message = msg.getMessage(); - try { - Dict dict = JacksonUtils.readValue(message, Dict.class); - if (dict.isDeleted == Dict.DELETED) { - dictMap.remove(dict.key); - log.info("remove dict {}", dict.key); - } else { - Dict put = dictMap.put(dict.key, dict); - log.info("update dict {} with {}", put, dict); - } - } catch (Throwable t) { - log.error("message: {}", message, t); - } - } - } - ) - .subscribe(); - return Mono.just(result); - } - - public Map getDictMap() { - return dictMap; - } - - public Dict get(String key) { - return dictMap.get(key); - } -} diff --git a/fizz-core/src/main/java/we/dict/Dict.java b/fizz-core/src/main/java/we/global_resource/GlobalResource.java similarity index 75% rename from fizz-core/src/main/java/we/dict/Dict.java rename to fizz-core/src/main/java/we/global_resource/GlobalResource.java index 542d468..644b29d 100644 --- a/fizz-core/src/main/java/we/dict/Dict.java +++ b/fizz-core/src/main/java/we/global_resource/GlobalResource.java @@ -15,7 +15,7 @@ * along with this program. If not, see . */ -package we.dict; +package we.global_resource; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -26,10 +26,11 @@ import java.util.List; import java.util.Map; /** + * just a dict. * @author hongqiaowei */ -public class Dict { +public class GlobalResource { public static final int BOOLEAN = 1; public static final int STRING = 2; @@ -46,7 +47,9 @@ public class Dict { public int type; - public String value; + public String val; + + public Object originalVal; /** for aggregate use mainly */ public boolean booleanVal; @@ -73,7 +76,7 @@ public class Dict { public long update; @JsonCreator - public Dict( + public GlobalResource( @JsonProperty("isDeleted") int isDeleted, @JsonProperty("id") int id, @JsonProperty("key") String key, @@ -87,29 +90,38 @@ public class Dict { this.id = id; this.key = key; this.type = type; - this.value = value; + this.val = value; this.create = create; this.update = update; if (type == BOOLEAN) { - booleanVal = Boolean.parseBoolean(value); + booleanVal = Boolean.parseBoolean(value); + originalVal = booleanVal; + } else if (type == STRING) { - stringVal = value; + stringVal = value; + originalVal = stringVal; + } else if (type == NUMBER) { numberVal = new BigDecimal(value); if (value.indexOf('.') == -1) { - intVal = numberVal.intValue(); - longVal = numberVal.longValue(); + intVal = numberVal.intValue(); + longVal = numberVal.longValue(); + originalVal = longVal; } else { - floatVal = numberVal.floatValue(); - doubleVal = numberVal.doubleValue(); + floatVal = numberVal.floatValue(); + doubleVal = numberVal.doubleValue(); + originalVal = doubleVal; } + } else { // JSON jsonVal = value; if (value.startsWith("{")) { - valMap = JacksonUtils.readValue(jsonVal, Map.class); + valMap = JacksonUtils.readValue(jsonVal, Map.class); + originalVal = valMap; } else { - valList = JacksonUtils.readValue(jsonVal, List.class); + valList = JacksonUtils.readValue(jsonVal, List.class); + originalVal = valList; } } } diff --git a/fizz-core/src/main/java/we/global_resource/GlobalResourceService.java b/fizz-core/src/main/java/we/global_resource/GlobalResourceService.java new file mode 100644 index 0000000..6e2991a --- /dev/null +++ b/fizz-core/src/main/java/we/global_resource/GlobalResourceService.java @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package we.global_resource; + +import org.noear.snack.ONode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.ReactiveStringRedisTemplate; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import we.Fizz; +import we.config.AggregateRedisConfig; +import we.fizz.input.PathMapping; +import we.util.JacksonUtils; +import we.util.ReactiveResult; +import we.util.Result; +import we.util.Utils; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * @author hongqiaowei + */ + +@Service +public class GlobalResourceService { + + private static final Logger log = LoggerFactory.getLogger(GlobalResourceService.class); + + public static ONode resNode; + + private Map resourceMap = new HashMap<>(64); + + private Map objectMap = new HashMap<>(64); + + @Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE) + private ReactiveStringRedisTemplate rt; + + @PostConstruct + public void init() throws Throwable { + initGlobalResource().subscribe( + r -> { + if (r.code == ReactiveResult.SUCC) { + lsnGlobalResourceChange().subscribe( + res -> { + if (res.code == ReactiveResult.FAIL) { + log.error(res.toString()); + if (res.t == null) { + throw Utils.runtimeExceptionWithoutStack("lsn global resource error"); + } + throw new RuntimeException(res.t); + } + updateResNode(); + } + ); + } else { + log.error(r.toString()); + if (r.t == null) { + throw Utils.runtimeExceptionWithoutStack("init global resource error"); + } + throw new RuntimeException(r.t); + } + } + ); + } + + private void updateResNode() { + resNode = PathMapping.toONode(objectMap); + log.info("new object map: {}", JacksonUtils.writeValueAsString(objectMap)); + } + + private Mono> initGlobalResource() { + Flux> dicts = rt.opsForHash().entries("fizz_global_resource"); + dicts.collectList() + .defaultIfEmpty(Collections.emptyList()) + .flatMap( + es -> { + if (Fizz.context != null) { + for (Map.Entry e : es) { + String json = (String) e.getValue(); + GlobalResource r = JacksonUtils.readValue(json, GlobalResource.class); + resourceMap.put(r.key, r); + objectMap.put(r.key, r.originalVal); + log.info("init global resource: {}", r); + } + } + return Mono.empty(); + } + ) + .doOnError( + t -> { + log.error("init global resource", t); + } + ) + .block(); + return Mono.just(Result.succ()); + } + + private Mono> lsnGlobalResourceChange() { + Result result = Result.succ(); + String channel = "fizz_global_resource_channel"; + rt.listenToChannel(channel) + .doOnError( + t -> { + result.code = ReactiveResult.FAIL; + result.t = t; + log.error("lsn {}", channel, t); + } + ) + .doOnSubscribe( + s -> { + log.info("success to lsn on {}", channel); + } + ) + .doOnNext( + msg -> { + if (Fizz.context != null) { + String message = msg.getMessage(); + try { + GlobalResource r = JacksonUtils.readValue(message, GlobalResource.class); + if (r.isDeleted == GlobalResource.DELETED) { + resourceMap.remove(r.key); + objectMap.remove(r.key); + log.info("remove global resource {}", r.key); + } else { + GlobalResource put = resourceMap.put(r.key, r); + objectMap.put(r.key, r); + log.info("update global resource {} with {}", put, r); + } + updateResNode(); + } catch (Throwable t) { + log.error("message: {}", message, t); + } + } + } + ) + .subscribe(); + return Mono.just(result); + } + + public Map getResourceMap() { + return resourceMap; + } + + public GlobalResource get(String key) { + return resourceMap.get(key); + } +} diff --git a/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java b/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java index f3fed7d..5716b6b 100644 --- a/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java +++ b/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java @@ -86,7 +86,7 @@ public final class FizzPluginFilterChain { } } - @Deprecated + // @Deprecated public static Mono next(ServerWebExchange exchange, List pcs) { Iterator it = pcs.iterator(); Map attris = exchange.getAttributes(); diff --git a/fizz-core/src/main/java/we/proxy/Route.java b/fizz-core/src/main/java/we/proxy/Route.java index 37f6816..064f918 100644 --- a/fizz-core/src/main/java/we/proxy/Route.java +++ b/fizz-core/src/main/java/we/proxy/Route.java @@ -58,8 +58,8 @@ public class Route { public long retryInterval = 0; - public Route type(byte t) { - type = t; + public Route type(int t) { + type = (byte) t; return this; } diff --git a/fizz-core/src/test/java/we/dict/DictTests.java b/fizz-core/src/test/java/we/global_resource/GlobalResourceTests.java similarity index 56% rename from fizz-core/src/test/java/we/dict/DictTests.java rename to fizz-core/src/test/java/we/global_resource/GlobalResourceTests.java index b17aece..ae3b207 100644 --- a/fizz-core/src/test/java/we/dict/DictTests.java +++ b/fizz-core/src/test/java/we/global_resource/GlobalResourceTests.java @@ -1,4 +1,4 @@ -package we.dict; +package we.global_resource; import com.fasterxml.jackson.core.JsonProcessingException; import org.junit.jupiter.api.BeforeEach; @@ -8,9 +8,8 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import we.Fizz; import we.FizzAppContext; -import we.plugin.auth.ApiConfigService; -import we.plugin.auth.ApiConfigServiceProperties; import we.redis.RedisProperties; import we.redis.RedisServerConfiguration; import we.redis.RedisTemplateConfiguration; @@ -27,7 +26,7 @@ import java.util.Map; @TestPropertySource("/application.properties") @SpringJUnitConfig(classes = {RedisProperties.class, RedisTemplateConfiguration.class, RedisServerConfiguration.class}) -public class DictTests { +public class GlobalResourceTests { @Resource StringRedisTemplate stringRedisTemplate; @@ -35,34 +34,34 @@ public class DictTests { @Resource ReactiveStringRedisTemplate reactiveStringRedisTemplate; - DictService dictService; + GlobalResourceService globalResourceService; @BeforeEach void beforeEach() throws NoSuchFieldException { - dictService = new DictService(); - ReflectionUtils.set(dictService, "rt", reactiveStringRedisTemplate); + globalResourceService = new GlobalResourceService(); + ReflectionUtils.set(globalResourceService, "rt", reactiveStringRedisTemplate); } @Test void constructTest() throws JsonProcessingException { String json = "{\"id\":1,\"key\":\"key\",\"type\":4,\"value\":\"{\\\"a0\\\":\\\"v0\\\",\\\"a1\\\":66}\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}"; - Dict dict = JacksonUtils.readValue(json, Dict.class); -// assertEquals(96.12347, dict.numberVal.doubleValue()); -// assertEquals("96.12347", dict.numberVal.toPlainString()); -// System.err.println(dict.toString()); + GlobalResource globalResource = JacksonUtils.readValue(json, GlobalResource.class); +// assertEquals(96.12347, globalResource.numberVal.doubleValue()); +// assertEquals("96.12347", globalResource.numberVal.toPlainString()); +// System.err.println(globalResource.toString()); } @Test void initTest() throws Throwable { - FizzAppContext.appContext = new GenericApplicationContext(); - FizzAppContext.appContext.refresh(); + Fizz.context = new GenericApplicationContext(); + Fizz.context.refresh(); - Map dictsMap = new HashMap<>(); - dictsMap.put("key0", "{\"id\":1,\"key\":\"key0\",\"type\":2,\"value\":\"val0\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}"); - dictsMap.put("key1", "{\"id\":1,\"key\":\"key1\",\"type\":2,\"value\":\"val1\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}"); - stringRedisTemplate.opsForHash().putAll("fizz_dict", dictsMap); + Map resourceMap = new HashMap<>(); + resourceMap.put("key0", "{\"id\":1,\"key\":\"key0\",\"type\":2,\"value\":\"val0\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}"); + resourceMap.put("key1", "{\"id\":1,\"key\":\"key1\",\"type\":2,\"value\":\"val1\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}"); + stringRedisTemplate.opsForHash().putAll("fizz_global_resource", resourceMap); - dictService.init(); + globalResourceService.init(); } } From 9373370ade4dccdad73eb961b1145cf4060b99f3 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Tue, 12 Oct 2021 20:54:09 +0800 Subject: [PATCH 4/8] Incubate fizz api dispatch web server --- .../ext/FizzServerWebExchangeDecorator.java | 4 +- .../incubator/FizzApiDispatchHttpHandler.java | 168 ++++++++++++++++++ .../incubator/FizzApiDispatchWebServer.java | 69 +++++++ fizz-core/src/main/java/we/util/WebUtils.java | 13 ++ 4 files changed, 252 insertions(+), 2 deletions(-) create mode 100644 fizz-core/src/main/java/we/incubator/FizzApiDispatchHttpHandler.java create mode 100644 fizz-core/src/main/java/we/incubator/FizzApiDispatchWebServer.java diff --git a/fizz-common/src/main/java/we/spring/web/server/ext/FizzServerWebExchangeDecorator.java b/fizz-common/src/main/java/we/spring/web/server/ext/FizzServerWebExchangeDecorator.java index cdedb0b..8146cca 100644 --- a/fizz-common/src/main/java/we/spring/web/server/ext/FizzServerWebExchangeDecorator.java +++ b/fizz-common/src/main/java/we/spring/web/server/ext/FizzServerWebExchangeDecorator.java @@ -48,9 +48,9 @@ import java.util.Set; public class FizzServerWebExchangeDecorator extends ServerWebExchangeDecorator { - private static final MultiValueMap EMPTY_FORM_DATA = CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap(0)); + public static final MultiValueMap EMPTY_FORM_DATA = CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap(0)); - private static final Mono> EMPTY_FORM_DATA_MONO = Mono.just(EMPTY_FORM_DATA).cache(); + public static final Mono> EMPTY_FORM_DATA_MONO = Mono.just(EMPTY_FORM_DATA).cache(); public FizzServerWebExchangeDecorator(ServerWebExchange delegate) { super(delegate); diff --git a/fizz-core/src/main/java/we/incubator/FizzApiDispatchHttpHandler.java b/fizz-core/src/main/java/we/incubator/FizzApiDispatchHttpHandler.java new file mode 100644 index 0000000..6de9749 --- /dev/null +++ b/fizz-core/src/main/java/we/incubator/FizzApiDispatchHttpHandler.java @@ -0,0 +1,168 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package we.incubator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.NestedExceptionUtils; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.codec.ServerCodecConfigurer; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ServerHttpRequest; +import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.util.MultiValueMap; +import org.springframework.util.StringUtils; +import org.springframework.web.server.ServerWebExchange; +import org.springframework.web.server.adapter.DefaultServerWebExchange; +import org.springframework.web.server.adapter.ForwardedHeaderTransformer; +import org.springframework.web.server.i18n.LocaleContextResolver; +import org.springframework.web.server.session.WebSessionManager; +import reactor.core.publisher.Mono; +import we.Fizz; +import we.plugin.auth.ApiConfigService; +import we.spring.web.server.ext.FizzServerWebExchangeDecorator; +import we.util.JacksonUtils; +import we.util.WebUtils; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +/** + * @author hongqiaowei + */ + +public class FizzApiDispatchHttpHandler implements HttpHandler { + + private static final String disconnected_client_log_category = "DisconnectedClient"; + + private static final Logger log = LoggerFactory.getLogger(FizzApiDispatchHttpHandler.class); + + private static final Logger lostClientLog = LoggerFactory.getLogger(disconnected_client_log_category); + + private static final Set disconnected_client_exceptions = new HashSet<>(Arrays.asList("AbortedException", "ClientAbortException", "EOFException", "EofException")); + + private WebSessionManager sessionManager; + private ServerCodecConfigurer serverCodecConfigurer; + private LocaleContextResolver localeContextResolver; + private ForwardedHeaderTransformer forwardedHeaderTransformer; + private boolean enableLoggingRequestDetails = false; + + public FizzApiDispatchHttpHandler(WebSessionManager sessionManager, ServerCodecConfigurer codecConfigurer, + LocaleContextResolver localeContextResolver, ForwardedHeaderTransformer forwardedHeaderTransformer) { + this.sessionManager = sessionManager; + this.serverCodecConfigurer = codecConfigurer; + this.localeContextResolver = localeContextResolver; + this.forwardedHeaderTransformer = forwardedHeaderTransformer; + } + + @Override + public Mono handle(ServerHttpRequest request, ServerHttpResponse response) { + if (forwardedHeaderTransformer != null) { + try { + request = forwardedHeaderTransformer.apply(request); + } catch (Throwable t) { + if (log.isDebugEnabled()) { + log.debug("Failed to apply forwarded headers to " + formatRequest(request), t); + } + response.setStatusCode(HttpStatus.BAD_REQUEST); + return response.setComplete(); + } + } + + DefaultServerWebExchange exchange = new DefaultServerWebExchange(request, response, sessionManager, serverCodecConfigurer, localeContextResolver); + + // XXX + String clientReqPath = WebUtils.getClientReqPath(exchange); + log.info("client request path: {}", clientReqPath); + Mono> formData = exchange.getFormData().defaultIfEmpty(FizzServerWebExchangeDecorator.EMPTY_FORM_DATA).flatMap( + dat -> { + log.info("form data: " + JacksonUtils.writeValueAsString(dat)); + return Mono.just(dat); + } + ); + try { + ApiConfigService apiConfigService = Fizz.context.getBean(ApiConfigService.class); + String apiConfigs = JacksonUtils.writeValueAsString(apiConfigService.serviceConfigMap); + return + formData.then( + response.writeWith(Mono.just(response.bufferFactory().wrap(apiConfigs.getBytes()))) + ) + .doOnSuccess (v -> logResponse(exchange)) + .onErrorResume(t -> handleUnresolvedError(exchange, t)) + .then(Mono.defer(response::setComplete)); + } catch (Throwable t) { + throw t; + // TODO: response error + } + } + + private void logResponse(ServerWebExchange exchange) { + WebUtils.traceDebug(log, traceOn -> { + HttpStatus status = exchange.getResponse().getStatusCode(); + return exchange.getLogPrefix() + "Completed " + (status != null ? status : "200 OK") + (traceOn ? ", headers=" + formatHeaders(exchange.getResponse().getHeaders()) : ""); + }); + } + + private String formatHeaders(HttpHeaders responseHeaders) { + return this.enableLoggingRequestDetails ? + responseHeaders.toString() : responseHeaders.isEmpty() ? "{}" : "{masked}"; + } + + private String formatRequest(ServerHttpRequest request) { + String rawQuery = request.getURI().getRawQuery(); + String query = StringUtils.hasText(rawQuery) ? "?" + rawQuery : ""; + return "HTTP " + request.getMethod() + " \"" + request.getPath() + query + "\""; + } + + private Mono handleUnresolvedError(ServerWebExchange exchange, Throwable t) { + ServerHttpRequest request = exchange.getRequest(); + ServerHttpResponse response = exchange.getResponse(); + String logPrefix = exchange.getLogPrefix(); + + if (response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR)) { + log.error(logPrefix + "500 Server Error for " + formatRequest(request), t); + return Mono.empty(); + + } else if (isDisconnectedClientError(t)) { + if (lostClientLog.isTraceEnabled()) { + lostClientLog.trace(logPrefix + "Client went away", t); + } else if (lostClientLog.isDebugEnabled()) { + lostClientLog.debug(logPrefix + "Client went away: " + t + " (stacktrace at TRACE level for '" + disconnected_client_log_category + "')"); + } + return Mono.empty(); + + } else { + // After the response is committed, propagate errors to the server... + log.error(logPrefix + "Error [" + t + "] for " + formatRequest(request) + ", but ServerHttpResponse already committed (" + response.getStatusCode() + ")"); + return Mono.error(t); + } + } + + private boolean isDisconnectedClientError(Throwable t) { + String message = NestedExceptionUtils.getMostSpecificCause(t).getMessage(); + if (message != null) { + String text = message.toLowerCase(); + if (text.contains("broken pipe") || text.contains("connection reset by peer")) { + return true; + } + } + return disconnected_client_exceptions.contains(t.getClass().getSimpleName()); + } +} diff --git a/fizz-core/src/main/java/we/incubator/FizzApiDispatchWebServer.java b/fizz-core/src/main/java/we/incubator/FizzApiDispatchWebServer.java new file mode 100644 index 0000000..e13e313 --- /dev/null +++ b/fizz-core/src/main/java/we/incubator/FizzApiDispatchWebServer.java @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2020 the original author or authors. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package we.incubator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory; +import org.springframework.boot.web.server.WebServer; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.web.server.adapter.HttpWebHandlerAdapter; +import org.springframework.web.server.session.DefaultWebSessionManager; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.annotation.Resource; + +/** + * @author hongqiaowei + */ + +//@Configuration +//@AutoConfigureAfter({HttpHandlerAutoConfiguration.class}) +public class FizzApiDispatchWebServer { + + private static final Logger log = LoggerFactory.getLogger(FizzApiDispatchWebServer.class); + + @Resource + private HttpHandler httpHandler; + + private WebServer server; + + private int port = 8601; + + @PostConstruct + public void start() { + HttpWebHandlerAdapter adapter = (HttpWebHandlerAdapter) httpHandler; + NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory(port); + server = factory.getWebServer( + new FizzApiDispatchHttpHandler( + new DefaultWebSessionManager(), + adapter.getCodecConfigurer(), + adapter.getLocaleContextResolver(), + adapter.getForwardedHeaderTransformer() + ) + ); + server.start(); + log.info("fizz api dispatch web server listen on {}", port); + } + + @PreDestroy + public void stop() { + server.stop(); + } +} diff --git a/fizz-core/src/main/java/we/util/WebUtils.java b/fizz-core/src/main/java/we/util/WebUtils.java index 83a9595..5dc667f 100644 --- a/fizz-core/src/main/java/we/util/WebUtils.java +++ b/fizz-core/src/main/java/we/util/WebUtils.java @@ -43,6 +43,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -587,6 +588,18 @@ public abstract class WebUtils { return id; } + public static void traceDebug(Logger log, Function messageFactory) { + if (log.isDebugEnabled()) { + boolean traceEnabled = log.isTraceEnabled(); + String logMessage = messageFactory.apply(traceEnabled); + if (traceEnabled) { + log.trace(logMessage); + } else { + log.debug(logMessage); + } + } + } + private static final String s0 = "{\""; private static final String s1 = "\":"; private static final String s2 = ",\""; From 7b78a85dd61de25f18ff9fb81f3bd46c32d8d609 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Tue, 12 Oct 2021 21:17:16 +0800 Subject: [PATCH 5/8] Code style --- .../java/we/plugin/auth/ApiConfigService.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java b/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java index 2178c09..e84fc45 100644 --- a/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java +++ b/fizz-core/src/main/java/we/plugin/auth/ApiConfigService.java @@ -31,6 +31,7 @@ import org.springframework.util.CollectionUtils; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import we.Fizz; import we.FizzAppContext; import we.config.AggregateRedisConfig; import we.config.SystemConfig; @@ -217,7 +218,7 @@ public class ApiConfigService { .defaultIfEmpty(Collections.emptyList()) .flatMap( es -> { - if (FizzAppContext.appContext != null) { + if (Fizz.context != null) { for (Map.Entry e : es) { String json = (String) e.getValue(); HashMap map = JacksonUtils.readValue(json, HashMap.class); @@ -225,8 +226,8 @@ public class ApiConfigService { String pluginConfig = (String) map.get("fixedConfig"); String currentPluginConfig = pluginConfigMap.get(plugin); if (currentPluginConfig == null || !currentPluginConfig.equals(pluginConfig)) { - if (FizzAppContext.appContext.containsBean(plugin)) { - FizzPluginFilter pluginFilter = (FizzPluginFilter) FizzAppContext.appContext.getBean(plugin); + if (Fizz.context.containsBean(plugin)) { + FizzPluginFilter pluginFilter = (FizzPluginFilter) Fizz.context.getBean(plugin); pluginFilter.init(pluginConfig); pluginConfigMap.put(plugin, pluginConfig); log.info("init {} with {}", plugin, pluginConfig); @@ -266,7 +267,7 @@ public class ApiConfigService { ) .doOnNext( msg -> { - if (FizzAppContext.appContext != null) { + if (Fizz.context != null) { String message = msg.getMessage(); try { HashMap map = JacksonUtils.readValue(message, HashMap.class); @@ -274,8 +275,8 @@ public class ApiConfigService { String pluginConfig = (String) map.get("fixedConfig"); String currentPluginConfig = pluginConfigMap.get(plugin); if (currentPluginConfig == null || !currentPluginConfig.equals(pluginConfig)) { - if (FizzAppContext.appContext.containsBean(plugin)) { - FizzPluginFilter pluginFilter = (FizzPluginFilter) FizzAppContext.appContext.getBean(plugin); + if (Fizz.context.containsBean(plugin)) { + FizzPluginFilter pluginFilter = (FizzPluginFilter) Fizz.context.getBean(plugin); pluginFilter.init(pluginConfig); pluginConfigMap.put(plugin, pluginConfig); log.info("init {} with {} again", plugin, pluginConfig); @@ -359,7 +360,9 @@ public class ApiConfigService { } List apiConfigs = sc.getApiConfigs(gatewayGroups, method, path); if (apiConfigs.isEmpty()) { - return Result.fail(service + " don't have api config matching " + gatewayGroups + " group " + method + " method " + path + " path"); + StringBuilder b = ThreadContext.getStringBuilder(); + b.append(service).append(" don't have api config matching ").append(gatewayGroups).append(" group ").append(method).append(" method ").append(path).append(" path"); + return Result.fail(b.toString()); } List appCanAccess = ThreadContext.getArrayList(macs); for (int i = 0; i < apiConfigs.size(); i++) { @@ -373,11 +376,13 @@ public class ApiConfigService { } } if (appCanAccess.isEmpty()) { - return Result.fail("app " + app + " can't access " + JacksonUtils.writeValueAsString(apiConfigs)); + StringBuilder b = ThreadContext.getStringBuilder(); + b.append("app ").append(app).append(" can't access ").append(JacksonUtils.writeValueAsString(apiConfigs)); + return Result.fail(b.toString()); } ApiConfig bestOne = appCanAccess.get(0); if (appCanAccess.size() != 1) { - appCanAccess.sort(new ApiConfigPathPatternComparator(path)); + appCanAccess.sort(new ApiConfigPathPatternComparator(path)); // singleton ? ApiConfig ac0 = appCanAccess.get(0); bestOne = ac0; ApiConfig ac1 = appCanAccess.get(1); From bf3358b27efadf34db2f3e1630ebb9d32ad2608f Mon Sep 17 00:00:00 2001 From: "lancer.hong" Date: Tue, 12 Oct 2021 22:46:05 +0800 Subject: [PATCH 6/8] Code snippet optimization --- .../incubator/FizzApiDispatchHttpHandler.java | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/fizz-core/src/main/java/we/incubator/FizzApiDispatchHttpHandler.java b/fizz-core/src/main/java/we/incubator/FizzApiDispatchHttpHandler.java index 6de9749..32b9a50 100644 --- a/fizz-core/src/main/java/we/incubator/FizzApiDispatchHttpHandler.java +++ b/fizz-core/src/main/java/we/incubator/FizzApiDispatchHttpHandler.java @@ -79,7 +79,7 @@ public class FizzApiDispatchHttpHandler implements HttpHandler { request = forwardedHeaderTransformer.apply(request); } catch (Throwable t) { if (log.isDebugEnabled()) { - log.debug("Failed to apply forwarded headers to " + formatRequest(request), t); + log.debug("Failed to apply forwarded headers to {}", formatRequest(request), t); } response.setStatusCode(HttpStatus.BAD_REQUEST); return response.setComplete(); @@ -93,7 +93,7 @@ public class FizzApiDispatchHttpHandler implements HttpHandler { log.info("client request path: {}", clientReqPath); Mono> formData = exchange.getFormData().defaultIfEmpty(FizzServerWebExchangeDecorator.EMPTY_FORM_DATA).flatMap( dat -> { - log.info("form data: " + JacksonUtils.writeValueAsString(dat)); + log.info("form data: {}", JacksonUtils.writeValueAsString(dat)); return Mono.just(dat); } ); @@ -104,19 +104,21 @@ public class FizzApiDispatchHttpHandler implements HttpHandler { formData.then( response.writeWith(Mono.just(response.bufferFactory().wrap(apiConfigs.getBytes()))) ) - .doOnSuccess (v -> logResponse(exchange)) - .onErrorResume(t -> handleUnresolvedError(exchange, t)) - .then(Mono.defer(response::setComplete)); + .doOnSuccess ( v -> logResponse(exchange) ) + .onErrorResume( t -> handleUnresolvedError(exchange, t) ) + .then ( Mono.defer(response::setComplete) ); } catch (Throwable t) { - throw t; - // TODO: response error + // throw t; + log.error(exchange.getLogPrefix() + " 500 Server Error for " + formatRequest(request), t); + response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR); + return response.setComplete(); } } private void logResponse(ServerWebExchange exchange) { WebUtils.traceDebug(log, traceOn -> { HttpStatus status = exchange.getResponse().getStatusCode(); - return exchange.getLogPrefix() + "Completed " + (status != null ? status : "200 OK") + (traceOn ? ", headers=" + formatHeaders(exchange.getResponse().getHeaders()) : ""); + return exchange.getLogPrefix() + " Completed " + (status != null ? status : "200 OK") + (traceOn ? ", headers=" + formatHeaders(exchange.getResponse().getHeaders()) : ""); }); } @@ -137,20 +139,20 @@ public class FizzApiDispatchHttpHandler implements HttpHandler { String logPrefix = exchange.getLogPrefix(); if (response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR)) { - log.error(logPrefix + "500 Server Error for " + formatRequest(request), t); + log.error(logPrefix + " 500 Server Error for " + formatRequest(request), t); return Mono.empty(); } else if (isDisconnectedClientError(t)) { if (lostClientLog.isTraceEnabled()) { - lostClientLog.trace(logPrefix + "Client went away", t); + lostClientLog.trace(logPrefix + " Client went away", t); } else if (lostClientLog.isDebugEnabled()) { - lostClientLog.debug(logPrefix + "Client went away: " + t + " (stacktrace at TRACE level for '" + disconnected_client_log_category + "')"); + lostClientLog.debug(logPrefix + " Client went away: " + t + " (stacktrace at TRACE level for '" + disconnected_client_log_category + "')"); } return Mono.empty(); } else { // After the response is committed, propagate errors to the server... - log.error(logPrefix + "Error [" + t + "] for " + formatRequest(request) + ", but ServerHttpResponse already committed (" + response.getStatusCode() + ")"); + log.error(logPrefix + " Error [" + t + "] for " + formatRequest(request) + ", but ServerHttpResponse already committed (" + response.getStatusCode() + ")"); return Mono.error(t); } } From 396a8f81f73a18f6149702e5780f10f78c57d522 Mon Sep 17 00:00:00 2001 From: "lancer.hong" Date: Wed, 13 Oct 2021 01:18:23 +0800 Subject: [PATCH 7/8] Upgrade reactor to Dysprosium-SR24 --- fizz-bootstrap/pom.xml | 2 +- pom.xml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml index f033b84..7ab2838 100644 --- a/fizz-bootstrap/pom.xml +++ b/fizz-bootstrap/pom.xml @@ -18,7 +18,7 @@ 1.8 5.2.13.RELEASE Dragonfruit-SR3 - Dysprosium-SR23 + Dysprosium-SR24 5.3.7.RELEASE 4.1.69.Final 4.4.14 diff --git a/pom.xml b/pom.xml index 8d4ba2a..6010b9d 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ 2.2.13.RELEASE 5.2.13.RELEASE - Dysprosium-SR23 + Dysprosium-SR24 5.3.7.RELEASE 2.2.6.RELEASE 4.1.69.Final @@ -134,7 +134,7 @@ io.projectreactor reactor-test - 3.3.20.RELEASE + 3.3.21.RELEASE test From 81341bd62e0a3910abc7b90b276255c8185d4993 Mon Sep 17 00:00:00 2001 From: hongqiaowei Date: Wed, 13 Oct 2021 11:14:46 +0800 Subject: [PATCH 8/8] Dynamic change plugins of route --- .../main/java/we/plugin/FizzPluginFilterChain.java | 9 ++++++--- fizz-core/src/main/java/we/plugin/auth/ApiConfig.java | 3 ++- fizz-core/src/main/java/we/proxy/Route.java | 11 +++++++---- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java b/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java index 5716b6b..6700a4b 100644 --- a/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java +++ b/fizz-core/src/main/java/we/plugin/FizzPluginFilterChain.java @@ -22,6 +22,7 @@ import org.springframework.web.server.WebFilterChain; import reactor.core.publisher.Mono; import we.Fizz; import we.FizzAppContext; +import we.proxy.Route; import we.util.ReactorUtils; import we.util.WebUtils; @@ -44,11 +45,13 @@ public final class FizzPluginFilterChain { public static Mono next(ServerWebExchange exchange) { Iterator it = exchange.getAttribute(pluginConfigsIt); - if (it == null) { - List pcs = WebUtils.getRoute(exchange).pluginConfigs; + Route route = WebUtils.getRoute(exchange); + if (it == null || route.pluginConfigsChange) { + List pcs = route.pluginConfigs; it = pcs.iterator(); Map attris = exchange.getAttributes(); attris.put(pluginConfigsIt, it); + route.pluginConfigsChange = false; } if (it.hasNext()) { PluginConfig pc = it.next(); @@ -86,7 +89,7 @@ public final class FizzPluginFilterChain { } } - // @Deprecated + @Deprecated public static Mono next(ServerWebExchange exchange, List pcs) { Iterator it = pcs.iterator(); Map attris = exchange.getAttributes(); diff --git a/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java b/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java index 3582842..a9ab21f 100644 --- a/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java +++ b/fizz-core/src/main/java/we/plugin/auth/ApiConfig.java @@ -227,7 +227,7 @@ public class ApiConfig { .backendService(this.backendService) .backendPath( this.backendPath) .query( WebUtils.getClientReqQuery(exchange)) - .pluginConfigs( this.pluginConfigs) +// .pluginConfigs( this.pluginConfigs) .rpcMethod( this.rpcMethod) .rpcParamTypes( this.rpcParamTypes) .rpcGroup( this.rpcGroup) @@ -236,6 +236,7 @@ public class ApiConfig { .retryCount( this.retryCount) .retryInterval( this.retryInterval); + r.pluginConfigs = this.pluginConfigs; if (this.type == Type.REVERSE_PROXY) { r = r.nextHttpHostPort(getNextHttpHostPort()); } diff --git a/fizz-core/src/main/java/we/proxy/Route.java b/fizz-core/src/main/java/we/proxy/Route.java index 064f918..6168bc6 100644 --- a/fizz-core/src/main/java/we/proxy/Route.java +++ b/fizz-core/src/main/java/we/proxy/Route.java @@ -44,6 +44,8 @@ public class Route { public List pluginConfigs; + public boolean pluginConfigsChange = false; + public String rpcMethod; public String rpcParamTypes; @@ -52,11 +54,11 @@ public class Route { public String rpcGroup; - public long timeout = 0; + public long timeout = 0; - public int retryCount = 0; + public int retryCount = 0; - public long retryInterval = 0; + public long retryInterval = 0; public Route type(int t) { type = (byte) t; @@ -84,7 +86,8 @@ public class Route { } public Route pluginConfigs(List pcs) { - pluginConfigs = pcs; + pluginConfigs = pcs; + pluginConfigsChange = true; return this; }