Merge pull request #339 from wehotel/develop
This commit is contained in:
@@ -17,9 +17,9 @@
|
||||
<java.version>1.8</java.version>
|
||||
<spring-framework.version>5.2.13.RELEASE</spring-framework.version>
|
||||
<spring-session-bom.version>Dragonfruit-SR3</spring-session-bom.version>
|
||||
<reactor-bom.version>Dysprosium-SR23</reactor-bom.version>
|
||||
<reactor-bom.version>Dysprosium-SR24</reactor-bom.version>
|
||||
<lettuce.version>5.3.7.RELEASE</lettuce.version>
|
||||
<netty.version>4.1.68.Final</netty.version>
|
||||
<netty.version>4.1.69.Final</netty.version>
|
||||
<httpcore.version>4.4.14</httpcore.version>
|
||||
<log4j2.version>2.14.1</log4j2.version>
|
||||
<slf4j.version>1.7.32</slf4j.version>
|
||||
|
||||
@@ -48,9 +48,9 @@ import java.util.Set;
|
||||
|
||||
public class FizzServerWebExchangeDecorator extends ServerWebExchangeDecorator {
|
||||
|
||||
private static final MultiValueMap<String, String> EMPTY_FORM_DATA = CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<String, String>(0));
|
||||
public static final MultiValueMap<String, String> EMPTY_FORM_DATA = CollectionUtils.unmodifiableMultiValueMap(new LinkedMultiValueMap<String, String>(0));
|
||||
|
||||
private static final Mono<MultiValueMap<String, String>> EMPTY_FORM_DATA_MONO = Mono.just(EMPTY_FORM_DATA).cache();
|
||||
public static final Mono<MultiValueMap<String, String>> EMPTY_FORM_DATA_MONO = Mono.just(EMPTY_FORM_DATA).cache();
|
||||
|
||||
public FizzServerWebExchangeDecorator(ServerWebExchange delegate) {
|
||||
super(delegate);
|
||||
|
||||
@@ -22,7 +22,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.dict.DictService;
|
||||
import we.global_resource.GlobalResourceService;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.ApiConifg2appsService;
|
||||
import we.plugin.auth.AppService;
|
||||
@@ -56,7 +56,7 @@ public class CacheCheckController {
|
||||
private ApiConifg2appsService apiConifg2appsService;
|
||||
|
||||
@Resource
|
||||
private DictService dictService;
|
||||
private GlobalResourceService globalResourceService;
|
||||
|
||||
@GetMapping("/gatewayGroups")
|
||||
public Mono<String> gatewayGroups(ServerWebExchange exchange) {
|
||||
@@ -88,8 +88,8 @@ public class CacheCheckController {
|
||||
return Mono.just(JacksonUtils.writeValueAsString(apiConifg2appsService.getApiConfig2appsMap()));
|
||||
}
|
||||
|
||||
@GetMapping("/dicts")
|
||||
@GetMapping("/globalResources")
|
||||
public Mono<String> dicts(ServerWebExchange exchange) {
|
||||
return Mono.just(JacksonUtils.writeValueAsString(dictService.getDictMap()));
|
||||
return Mono.just(JacksonUtils.writeValueAsString(globalResourceService.getResourceMap()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,152 +0,0 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.dict;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.FizzAppContext;
|
||||
import we.config.AggregateRedisConfig;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ReactiveResult;
|
||||
import we.util.Result;
|
||||
import we.util.Utils;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
@Service
|
||||
public class DictService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(DictService.class);
|
||||
|
||||
private Map<String, Dict> dictMap = new HashMap<>(64);
|
||||
|
||||
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
|
||||
private ReactiveStringRedisTemplate rt;
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Throwable {
|
||||
initDict().subscribe(
|
||||
r -> {
|
||||
if (r.code == ReactiveResult.SUCC) {
|
||||
lsnInitChange().subscribe(
|
||||
res -> {
|
||||
if (res.code == ReactiveResult.FAIL) {
|
||||
log.error(res.toString());
|
||||
if (res.t == null) {
|
||||
throw Utils.runtimeExceptionWithoutStack("lsn dict error");
|
||||
}
|
||||
throw new RuntimeException(res.t);
|
||||
}
|
||||
}
|
||||
);
|
||||
} else {
|
||||
log.error(r.toString());
|
||||
if (r.t == null) {
|
||||
throw Utils.runtimeExceptionWithoutStack("init dict error");
|
||||
}
|
||||
throw new RuntimeException(r.t);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private Mono<Result<?>> initDict() {
|
||||
Flux<Map.Entry<Object, Object>> dicts = rt.opsForHash().entries("fizz_dict");
|
||||
dicts.collectList()
|
||||
.defaultIfEmpty(Collections.emptyList())
|
||||
.flatMap(
|
||||
es -> {
|
||||
if (FizzAppContext.appContext != null) {
|
||||
for (Map.Entry<Object, Object> e : es) {
|
||||
String json = (String) e.getValue();
|
||||
Dict dict = JacksonUtils.readValue(json, Dict.class);
|
||||
dictMap.put(dict.key, dict);
|
||||
log.info("init dict: {}", dict);
|
||||
}
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
)
|
||||
.doOnError(
|
||||
t -> {
|
||||
log.error("init dict", t);
|
||||
}
|
||||
)
|
||||
.block();
|
||||
return Mono.just(Result.succ());
|
||||
}
|
||||
|
||||
private Mono<Result<?>> lsnInitChange() {
|
||||
Result<?> result = Result.succ();
|
||||
String channel = "fizz_dict_channel";
|
||||
rt.listenToChannel(channel)
|
||||
.doOnError(
|
||||
t -> {
|
||||
result.code = ReactiveResult.FAIL;
|
||||
result.t = t;
|
||||
log.error("lsn {}", channel, t);
|
||||
}
|
||||
)
|
||||
.doOnSubscribe(
|
||||
s -> {
|
||||
log.info("success to lsn on {}", channel);
|
||||
}
|
||||
)
|
||||
.doOnNext(
|
||||
msg -> {
|
||||
if (FizzAppContext.appContext != null) {
|
||||
String message = msg.getMessage();
|
||||
try {
|
||||
Dict dict = JacksonUtils.readValue(message, Dict.class);
|
||||
if (dict.isDeleted == Dict.DELETED) {
|
||||
dictMap.remove(dict.key);
|
||||
log.info("remove dict {}", dict.key);
|
||||
} else {
|
||||
Dict put = dictMap.put(dict.key, dict);
|
||||
log.info("update dict {} with {}", put, dict);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
log.error("message: {}", message, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
.subscribe();
|
||||
return Mono.just(result);
|
||||
}
|
||||
|
||||
public Map<String, Dict> getDictMap() {
|
||||
return dictMap;
|
||||
}
|
||||
|
||||
public Dict get(String key) {
|
||||
return dictMap.get(key);
|
||||
}
|
||||
}
|
||||
@@ -15,7 +15,7 @@
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.dict;
|
||||
package we.global_resource;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
@@ -26,10 +26,11 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* just a dict.
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
public class Dict {
|
||||
public class GlobalResource {
|
||||
|
||||
public static final int BOOLEAN = 1;
|
||||
public static final int STRING = 2;
|
||||
@@ -46,7 +47,9 @@ public class Dict {
|
||||
|
||||
public int type;
|
||||
|
||||
public String value;
|
||||
public String val;
|
||||
|
||||
public Object originalVal; /** for aggregate use mainly */
|
||||
|
||||
public boolean booleanVal;
|
||||
|
||||
@@ -73,7 +76,7 @@ public class Dict {
|
||||
public long update;
|
||||
|
||||
@JsonCreator
|
||||
public Dict(
|
||||
public GlobalResource(
|
||||
@JsonProperty("isDeleted") int isDeleted,
|
||||
@JsonProperty("id") int id,
|
||||
@JsonProperty("key") String key,
|
||||
@@ -87,29 +90,38 @@ public class Dict {
|
||||
this.id = id;
|
||||
this.key = key;
|
||||
this.type = type;
|
||||
this.value = value;
|
||||
this.val = value;
|
||||
this.create = create;
|
||||
this.update = update;
|
||||
|
||||
if (type == BOOLEAN) {
|
||||
booleanVal = Boolean.parseBoolean(value);
|
||||
booleanVal = Boolean.parseBoolean(value);
|
||||
originalVal = booleanVal;
|
||||
|
||||
} else if (type == STRING) {
|
||||
stringVal = value;
|
||||
stringVal = value;
|
||||
originalVal = stringVal;
|
||||
|
||||
} else if (type == NUMBER) {
|
||||
numberVal = new BigDecimal(value);
|
||||
if (value.indexOf('.') == -1) {
|
||||
intVal = numberVal.intValue();
|
||||
longVal = numberVal.longValue();
|
||||
intVal = numberVal.intValue();
|
||||
longVal = numberVal.longValue();
|
||||
originalVal = longVal;
|
||||
} else {
|
||||
floatVal = numberVal.floatValue();
|
||||
doubleVal = numberVal.doubleValue();
|
||||
floatVal = numberVal.floatValue();
|
||||
doubleVal = numberVal.doubleValue();
|
||||
originalVal = doubleVal;
|
||||
}
|
||||
|
||||
} else { // JSON
|
||||
jsonVal = value;
|
||||
if (value.startsWith("{")) {
|
||||
valMap = JacksonUtils.readValue(jsonVal, Map.class);
|
||||
valMap = JacksonUtils.readValue(jsonVal, Map.class);
|
||||
originalVal = valMap;
|
||||
} else {
|
||||
valList = JacksonUtils.readValue(jsonVal, List.class);
|
||||
valList = JacksonUtils.readValue(jsonVal, List.class);
|
||||
originalVal = valList;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,168 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.global_resource;
|
||||
|
||||
import org.noear.snack.ONode;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.Fizz;
|
||||
import we.config.AggregateRedisConfig;
|
||||
import we.fizz.input.PathMapping;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.ReactiveResult;
|
||||
import we.util.Result;
|
||||
import we.util.Utils;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
@Service
|
||||
public class GlobalResourceService {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(GlobalResourceService.class);
|
||||
|
||||
public static ONode resNode;
|
||||
|
||||
private Map<String, GlobalResource> resourceMap = new HashMap<>(64);
|
||||
|
||||
private Map<String, Object> objectMap = new HashMap<>(64);
|
||||
|
||||
@Resource(name = AggregateRedisConfig.AGGREGATE_REACTIVE_REDIS_TEMPLATE)
|
||||
private ReactiveStringRedisTemplate rt;
|
||||
|
||||
@PostConstruct
|
||||
public void init() throws Throwable {
|
||||
initGlobalResource().subscribe(
|
||||
r -> {
|
||||
if (r.code == ReactiveResult.SUCC) {
|
||||
lsnGlobalResourceChange().subscribe(
|
||||
res -> {
|
||||
if (res.code == ReactiveResult.FAIL) {
|
||||
log.error(res.toString());
|
||||
if (res.t == null) {
|
||||
throw Utils.runtimeExceptionWithoutStack("lsn global resource error");
|
||||
}
|
||||
throw new RuntimeException(res.t);
|
||||
}
|
||||
updateResNode();
|
||||
}
|
||||
);
|
||||
} else {
|
||||
log.error(r.toString());
|
||||
if (r.t == null) {
|
||||
throw Utils.runtimeExceptionWithoutStack("init global resource error");
|
||||
}
|
||||
throw new RuntimeException(r.t);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
private void updateResNode() {
|
||||
resNode = PathMapping.toONode(objectMap);
|
||||
log.info("new object map: {}", JacksonUtils.writeValueAsString(objectMap));
|
||||
}
|
||||
|
||||
private Mono<Result<?>> initGlobalResource() {
|
||||
Flux<Map.Entry<Object, Object>> dicts = rt.opsForHash().entries("fizz_global_resource");
|
||||
dicts.collectList()
|
||||
.defaultIfEmpty(Collections.emptyList())
|
||||
.flatMap(
|
||||
es -> {
|
||||
if (Fizz.context != null) {
|
||||
for (Map.Entry<Object, Object> e : es) {
|
||||
String json = (String) e.getValue();
|
||||
GlobalResource r = JacksonUtils.readValue(json, GlobalResource.class);
|
||||
resourceMap.put(r.key, r);
|
||||
objectMap.put(r.key, r.originalVal);
|
||||
log.info("init global resource: {}", r);
|
||||
}
|
||||
}
|
||||
return Mono.empty();
|
||||
}
|
||||
)
|
||||
.doOnError(
|
||||
t -> {
|
||||
log.error("init global resource", t);
|
||||
}
|
||||
)
|
||||
.block();
|
||||
return Mono.just(Result.succ());
|
||||
}
|
||||
|
||||
private Mono<Result<?>> lsnGlobalResourceChange() {
|
||||
Result<?> result = Result.succ();
|
||||
String channel = "fizz_global_resource_channel";
|
||||
rt.listenToChannel(channel)
|
||||
.doOnError(
|
||||
t -> {
|
||||
result.code = ReactiveResult.FAIL;
|
||||
result.t = t;
|
||||
log.error("lsn {}", channel, t);
|
||||
}
|
||||
)
|
||||
.doOnSubscribe(
|
||||
s -> {
|
||||
log.info("success to lsn on {}", channel);
|
||||
}
|
||||
)
|
||||
.doOnNext(
|
||||
msg -> {
|
||||
if (Fizz.context != null) {
|
||||
String message = msg.getMessage();
|
||||
try {
|
||||
GlobalResource r = JacksonUtils.readValue(message, GlobalResource.class);
|
||||
if (r.isDeleted == GlobalResource.DELETED) {
|
||||
resourceMap.remove(r.key);
|
||||
objectMap.remove(r.key);
|
||||
log.info("remove global resource {}", r.key);
|
||||
} else {
|
||||
GlobalResource put = resourceMap.put(r.key, r);
|
||||
objectMap.put(r.key, r);
|
||||
log.info("update global resource {} with {}", put, r);
|
||||
}
|
||||
updateResNode();
|
||||
} catch (Throwable t) {
|
||||
log.error("message: {}", message, t);
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
.subscribe();
|
||||
return Mono.just(result);
|
||||
}
|
||||
|
||||
public Map<String, GlobalResource> getResourceMap() {
|
||||
return resourceMap;
|
||||
}
|
||||
|
||||
public GlobalResource get(String key) {
|
||||
return resourceMap.get(key);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.incubator;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.core.NestedExceptionUtils;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.codec.ServerCodecConfigurer;
|
||||
import org.springframework.http.server.reactive.HttpHandler;
|
||||
import org.springframework.http.server.reactive.ServerHttpRequest;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.util.MultiValueMap;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import org.springframework.web.server.adapter.DefaultServerWebExchange;
|
||||
import org.springframework.web.server.adapter.ForwardedHeaderTransformer;
|
||||
import org.springframework.web.server.i18n.LocaleContextResolver;
|
||||
import org.springframework.web.server.session.WebSessionManager;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.Fizz;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.spring.web.server.ext.FizzServerWebExchangeDecorator;
|
||||
import we.util.JacksonUtils;
|
||||
import we.util.WebUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
public class FizzApiDispatchHttpHandler implements HttpHandler {
|
||||
|
||||
private static final String disconnected_client_log_category = "DisconnectedClient";
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(FizzApiDispatchHttpHandler.class);
|
||||
|
||||
private static final Logger lostClientLog = LoggerFactory.getLogger(disconnected_client_log_category);
|
||||
|
||||
private static final Set<String> disconnected_client_exceptions = new HashSet<>(Arrays.asList("AbortedException", "ClientAbortException", "EOFException", "EofException"));
|
||||
|
||||
private WebSessionManager sessionManager;
|
||||
private ServerCodecConfigurer serverCodecConfigurer;
|
||||
private LocaleContextResolver localeContextResolver;
|
||||
private ForwardedHeaderTransformer forwardedHeaderTransformer;
|
||||
private boolean enableLoggingRequestDetails = false;
|
||||
|
||||
public FizzApiDispatchHttpHandler(WebSessionManager sessionManager, ServerCodecConfigurer codecConfigurer,
|
||||
LocaleContextResolver localeContextResolver, ForwardedHeaderTransformer forwardedHeaderTransformer) {
|
||||
this.sessionManager = sessionManager;
|
||||
this.serverCodecConfigurer = codecConfigurer;
|
||||
this.localeContextResolver = localeContextResolver;
|
||||
this.forwardedHeaderTransformer = forwardedHeaderTransformer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {
|
||||
if (forwardedHeaderTransformer != null) {
|
||||
try {
|
||||
request = forwardedHeaderTransformer.apply(request);
|
||||
} catch (Throwable t) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Failed to apply forwarded headers to {}", formatRequest(request), t);
|
||||
}
|
||||
response.setStatusCode(HttpStatus.BAD_REQUEST);
|
||||
return response.setComplete();
|
||||
}
|
||||
}
|
||||
|
||||
DefaultServerWebExchange exchange = new DefaultServerWebExchange(request, response, sessionManager, serverCodecConfigurer, localeContextResolver);
|
||||
|
||||
// XXX
|
||||
String clientReqPath = WebUtils.getClientReqPath(exchange);
|
||||
log.info("client request path: {}", clientReqPath);
|
||||
Mono<MultiValueMap<String, String>> formData = exchange.getFormData().defaultIfEmpty(FizzServerWebExchangeDecorator.EMPTY_FORM_DATA).flatMap(
|
||||
dat -> {
|
||||
log.info("form data: {}", JacksonUtils.writeValueAsString(dat));
|
||||
return Mono.just(dat);
|
||||
}
|
||||
);
|
||||
try {
|
||||
ApiConfigService apiConfigService = Fizz.context.getBean(ApiConfigService.class);
|
||||
String apiConfigs = JacksonUtils.writeValueAsString(apiConfigService.serviceConfigMap);
|
||||
return
|
||||
formData.then(
|
||||
response.writeWith(Mono.just(response.bufferFactory().wrap(apiConfigs.getBytes())))
|
||||
)
|
||||
.doOnSuccess ( v -> logResponse(exchange) )
|
||||
.onErrorResume( t -> handleUnresolvedError(exchange, t) )
|
||||
.then ( Mono.defer(response::setComplete) );
|
||||
} catch (Throwable t) {
|
||||
// throw t;
|
||||
log.error(exchange.getLogPrefix() + " 500 Server Error for " + formatRequest(request), t);
|
||||
response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
|
||||
return response.setComplete();
|
||||
}
|
||||
}
|
||||
|
||||
private void logResponse(ServerWebExchange exchange) {
|
||||
WebUtils.traceDebug(log, traceOn -> {
|
||||
HttpStatus status = exchange.getResponse().getStatusCode();
|
||||
return exchange.getLogPrefix() + " Completed " + (status != null ? status : "200 OK") + (traceOn ? ", headers=" + formatHeaders(exchange.getResponse().getHeaders()) : "");
|
||||
});
|
||||
}
|
||||
|
||||
private String formatHeaders(HttpHeaders responseHeaders) {
|
||||
return this.enableLoggingRequestDetails ?
|
||||
responseHeaders.toString() : responseHeaders.isEmpty() ? "{}" : "{masked}";
|
||||
}
|
||||
|
||||
private String formatRequest(ServerHttpRequest request) {
|
||||
String rawQuery = request.getURI().getRawQuery();
|
||||
String query = StringUtils.hasText(rawQuery) ? "?" + rawQuery : "";
|
||||
return "HTTP " + request.getMethod() + " \"" + request.getPath() + query + "\"";
|
||||
}
|
||||
|
||||
private Mono<Void> handleUnresolvedError(ServerWebExchange exchange, Throwable t) {
|
||||
ServerHttpRequest request = exchange.getRequest();
|
||||
ServerHttpResponse response = exchange.getResponse();
|
||||
String logPrefix = exchange.getLogPrefix();
|
||||
|
||||
if (response.setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR)) {
|
||||
log.error(logPrefix + " 500 Server Error for " + formatRequest(request), t);
|
||||
return Mono.empty();
|
||||
|
||||
} else if (isDisconnectedClientError(t)) {
|
||||
if (lostClientLog.isTraceEnabled()) {
|
||||
lostClientLog.trace(logPrefix + " Client went away", t);
|
||||
} else if (lostClientLog.isDebugEnabled()) {
|
||||
lostClientLog.debug(logPrefix + " Client went away: " + t + " (stacktrace at TRACE level for '" + disconnected_client_log_category + "')");
|
||||
}
|
||||
return Mono.empty();
|
||||
|
||||
} else {
|
||||
// After the response is committed, propagate errors to the server...
|
||||
log.error(logPrefix + " Error [" + t + "] for " + formatRequest(request) + ", but ServerHttpResponse already committed (" + response.getStatusCode() + ")");
|
||||
return Mono.error(t);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDisconnectedClientError(Throwable t) {
|
||||
String message = NestedExceptionUtils.getMostSpecificCause(t).getMessage();
|
||||
if (message != null) {
|
||||
String text = message.toLowerCase();
|
||||
if (text.contains("broken pipe") || text.contains("connection reset by peer")) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return disconnected_client_exceptions.contains(t.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,69 @@
|
||||
/*
|
||||
* Copyright (C) 2020 the original author or authors.
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package we.incubator;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory;
|
||||
import org.springframework.boot.web.server.WebServer;
|
||||
import org.springframework.http.server.reactive.HttpHandler;
|
||||
import org.springframework.web.server.adapter.HttpWebHandlerAdapter;
|
||||
import org.springframework.web.server.session.DefaultWebSessionManager;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.PreDestroy;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* @author hongqiaowei
|
||||
*/
|
||||
|
||||
//@Configuration
|
||||
//@AutoConfigureAfter({HttpHandlerAutoConfiguration.class})
|
||||
public class FizzApiDispatchWebServer {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(FizzApiDispatchWebServer.class);
|
||||
|
||||
@Resource
|
||||
private HttpHandler httpHandler;
|
||||
|
||||
private WebServer server;
|
||||
|
||||
private int port = 8601;
|
||||
|
||||
@PostConstruct
|
||||
public void start() {
|
||||
HttpWebHandlerAdapter adapter = (HttpWebHandlerAdapter) httpHandler;
|
||||
NettyReactiveWebServerFactory factory = new NettyReactiveWebServerFactory(port);
|
||||
server = factory.getWebServer(
|
||||
new FizzApiDispatchHttpHandler(
|
||||
new DefaultWebSessionManager(),
|
||||
adapter.getCodecConfigurer(),
|
||||
adapter.getLocaleContextResolver(),
|
||||
adapter.getForwardedHeaderTransformer()
|
||||
)
|
||||
);
|
||||
server.start();
|
||||
log.info("fizz api dispatch web server listen on {}", port);
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void stop() {
|
||||
server.stop();
|
||||
}
|
||||
}
|
||||
@@ -22,6 +22,7 @@ import org.springframework.web.server.WebFilterChain;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.Fizz;
|
||||
import we.FizzAppContext;
|
||||
import we.proxy.Route;
|
||||
import we.util.ReactorUtils;
|
||||
import we.util.WebUtils;
|
||||
|
||||
@@ -44,11 +45,13 @@ public final class FizzPluginFilterChain {
|
||||
|
||||
public static Mono<Void> next(ServerWebExchange exchange) {
|
||||
Iterator<PluginConfig> it = exchange.getAttribute(pluginConfigsIt);
|
||||
if (it == null) {
|
||||
List<PluginConfig> pcs = WebUtils.getRoute(exchange).pluginConfigs;
|
||||
Route route = WebUtils.getRoute(exchange);
|
||||
if (it == null || route.pluginConfigsChange) {
|
||||
List<PluginConfig> pcs = route.pluginConfigs;
|
||||
it = pcs.iterator();
|
||||
Map<String, Object> attris = exchange.getAttributes();
|
||||
attris.put(pluginConfigsIt, it);
|
||||
route.pluginConfigsChange = false;
|
||||
}
|
||||
if (it.hasNext()) {
|
||||
PluginConfig pc = it.next();
|
||||
|
||||
@@ -227,7 +227,7 @@ public class ApiConfig {
|
||||
.backendService(this.backendService)
|
||||
.backendPath( this.backendPath)
|
||||
.query( WebUtils.getClientReqQuery(exchange))
|
||||
.pluginConfigs( this.pluginConfigs)
|
||||
// .pluginConfigs( this.pluginConfigs)
|
||||
.rpcMethod( this.rpcMethod)
|
||||
.rpcParamTypes( this.rpcParamTypes)
|
||||
.rpcGroup( this.rpcGroup)
|
||||
@@ -236,6 +236,7 @@ public class ApiConfig {
|
||||
.retryCount( this.retryCount)
|
||||
.retryInterval( this.retryInterval);
|
||||
|
||||
r.pluginConfigs = this.pluginConfigs;
|
||||
if (this.type == Type.REVERSE_PROXY) {
|
||||
r = r.nextHttpHostPort(getNextHttpHostPort());
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.web.server.ServerWebExchange;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
import we.Fizz;
|
||||
import we.FizzAppContext;
|
||||
import we.config.AggregateRedisConfig;
|
||||
import we.config.SystemConfig;
|
||||
@@ -217,7 +218,7 @@ public class ApiConfigService {
|
||||
.defaultIfEmpty(Collections.emptyList())
|
||||
.flatMap(
|
||||
es -> {
|
||||
if (FizzAppContext.appContext != null) {
|
||||
if (Fizz.context != null) {
|
||||
for (Map.Entry<Object, Object> e : es) {
|
||||
String json = (String) e.getValue();
|
||||
HashMap<?, ?> map = JacksonUtils.readValue(json, HashMap.class);
|
||||
@@ -225,8 +226,8 @@ public class ApiConfigService {
|
||||
String pluginConfig = (String) map.get("fixedConfig");
|
||||
String currentPluginConfig = pluginConfigMap.get(plugin);
|
||||
if (currentPluginConfig == null || !currentPluginConfig.equals(pluginConfig)) {
|
||||
if (FizzAppContext.appContext.containsBean(plugin)) {
|
||||
FizzPluginFilter pluginFilter = (FizzPluginFilter) FizzAppContext.appContext.getBean(plugin);
|
||||
if (Fizz.context.containsBean(plugin)) {
|
||||
FizzPluginFilter pluginFilter = (FizzPluginFilter) Fizz.context.getBean(plugin);
|
||||
pluginFilter.init(pluginConfig);
|
||||
pluginConfigMap.put(plugin, pluginConfig);
|
||||
log.info("init {} with {}", plugin, pluginConfig);
|
||||
@@ -266,7 +267,7 @@ public class ApiConfigService {
|
||||
)
|
||||
.doOnNext(
|
||||
msg -> {
|
||||
if (FizzAppContext.appContext != null) {
|
||||
if (Fizz.context != null) {
|
||||
String message = msg.getMessage();
|
||||
try {
|
||||
HashMap<?, ?> map = JacksonUtils.readValue(message, HashMap.class);
|
||||
@@ -274,8 +275,8 @@ public class ApiConfigService {
|
||||
String pluginConfig = (String) map.get("fixedConfig");
|
||||
String currentPluginConfig = pluginConfigMap.get(plugin);
|
||||
if (currentPluginConfig == null || !currentPluginConfig.equals(pluginConfig)) {
|
||||
if (FizzAppContext.appContext.containsBean(plugin)) {
|
||||
FizzPluginFilter pluginFilter = (FizzPluginFilter) FizzAppContext.appContext.getBean(plugin);
|
||||
if (Fizz.context.containsBean(plugin)) {
|
||||
FizzPluginFilter pluginFilter = (FizzPluginFilter) Fizz.context.getBean(plugin);
|
||||
pluginFilter.init(pluginConfig);
|
||||
pluginConfigMap.put(plugin, pluginConfig);
|
||||
log.info("init {} with {} again", plugin, pluginConfig);
|
||||
@@ -359,7 +360,9 @@ public class ApiConfigService {
|
||||
}
|
||||
List<ApiConfig> apiConfigs = sc.getApiConfigs(gatewayGroups, method, path);
|
||||
if (apiConfigs.isEmpty()) {
|
||||
return Result.fail(service + " don't have api config matching " + gatewayGroups + " group " + method + " method " + path + " path");
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
b.append(service).append(" don't have api config matching ").append(gatewayGroups).append(" group ").append(method).append(" method ").append(path).append(" path");
|
||||
return Result.fail(b.toString());
|
||||
}
|
||||
List<ApiConfig> appCanAccess = ThreadContext.getArrayList(macs);
|
||||
for (int i = 0; i < apiConfigs.size(); i++) {
|
||||
@@ -373,11 +376,13 @@ public class ApiConfigService {
|
||||
}
|
||||
}
|
||||
if (appCanAccess.isEmpty()) {
|
||||
return Result.fail("app " + app + " can't access " + JacksonUtils.writeValueAsString(apiConfigs));
|
||||
StringBuilder b = ThreadContext.getStringBuilder();
|
||||
b.append("app ").append(app).append(" can't access ").append(JacksonUtils.writeValueAsString(apiConfigs));
|
||||
return Result.fail(b.toString());
|
||||
}
|
||||
ApiConfig bestOne = appCanAccess.get(0);
|
||||
if (appCanAccess.size() != 1) {
|
||||
appCanAccess.sort(new ApiConfigPathPatternComparator(path));
|
||||
appCanAccess.sort(new ApiConfigPathPatternComparator(path)); // singleton ?
|
||||
ApiConfig ac0 = appCanAccess.get(0);
|
||||
bestOne = ac0;
|
||||
ApiConfig ac1 = appCanAccess.get(1);
|
||||
|
||||
@@ -44,6 +44,8 @@ public class Route {
|
||||
|
||||
public List<PluginConfig> pluginConfigs;
|
||||
|
||||
public boolean pluginConfigsChange = false;
|
||||
|
||||
public String rpcMethod;
|
||||
|
||||
public String rpcParamTypes;
|
||||
@@ -52,14 +54,14 @@ public class Route {
|
||||
|
||||
public String rpcGroup;
|
||||
|
||||
public long timeout = 0;
|
||||
public long timeout = 0;
|
||||
|
||||
public int retryCount = 0;
|
||||
public int retryCount = 0;
|
||||
|
||||
public long retryInterval = 0;
|
||||
public long retryInterval = 0;
|
||||
|
||||
public Route type(byte t) {
|
||||
type = t;
|
||||
public Route type(int t) {
|
||||
type = (byte) t;
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -84,7 +86,8 @@ public class Route {
|
||||
}
|
||||
|
||||
public Route pluginConfigs(List<PluginConfig> pcs) {
|
||||
pluginConfigs = pcs;
|
||||
pluginConfigs = pcs;
|
||||
pluginConfigsChange = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@@ -587,6 +588,18 @@ public abstract class WebUtils {
|
||||
return id;
|
||||
}
|
||||
|
||||
public static void traceDebug(Logger log, Function<Boolean, String> messageFactory) {
|
||||
if (log.isDebugEnabled()) {
|
||||
boolean traceEnabled = log.isTraceEnabled();
|
||||
String logMessage = messageFactory.apply(traceEnabled);
|
||||
if (traceEnabled) {
|
||||
log.trace(logMessage);
|
||||
} else {
|
||||
log.debug(logMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static final String s0 = "{\"";
|
||||
private static final String s1 = "\":";
|
||||
private static final String s2 = ",\"";
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package we.dict;
|
||||
package we.global_resource;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
@@ -8,9 +8,8 @@ import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.test.context.TestPropertySource;
|
||||
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
|
||||
import we.Fizz;
|
||||
import we.FizzAppContext;
|
||||
import we.plugin.auth.ApiConfigService;
|
||||
import we.plugin.auth.ApiConfigServiceProperties;
|
||||
import we.redis.RedisProperties;
|
||||
import we.redis.RedisServerConfiguration;
|
||||
import we.redis.RedisTemplateConfiguration;
|
||||
@@ -27,7 +26,7 @@ import java.util.Map;
|
||||
|
||||
@TestPropertySource("/application.properties")
|
||||
@SpringJUnitConfig(classes = {RedisProperties.class, RedisTemplateConfiguration.class, RedisServerConfiguration.class})
|
||||
public class DictTests {
|
||||
public class GlobalResourceTests {
|
||||
|
||||
@Resource
|
||||
StringRedisTemplate stringRedisTemplate;
|
||||
@@ -35,34 +34,34 @@ public class DictTests {
|
||||
@Resource
|
||||
ReactiveStringRedisTemplate reactiveStringRedisTemplate;
|
||||
|
||||
DictService dictService;
|
||||
GlobalResourceService globalResourceService;
|
||||
|
||||
@BeforeEach
|
||||
void beforeEach() throws NoSuchFieldException {
|
||||
dictService = new DictService();
|
||||
ReflectionUtils.set(dictService, "rt", reactiveStringRedisTemplate);
|
||||
globalResourceService = new GlobalResourceService();
|
||||
ReflectionUtils.set(globalResourceService, "rt", reactiveStringRedisTemplate);
|
||||
}
|
||||
|
||||
@Test
|
||||
void constructTest() throws JsonProcessingException {
|
||||
String json = "{\"id\":1,\"key\":\"key\",\"type\":4,\"value\":\"{\\\"a0\\\":\\\"v0\\\",\\\"a1\\\":66}\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}";
|
||||
Dict dict = JacksonUtils.readValue(json, Dict.class);
|
||||
// assertEquals(96.12347, dict.numberVal.doubleValue());
|
||||
// assertEquals("96.12347", dict.numberVal.toPlainString());
|
||||
// System.err.println(dict.toString());
|
||||
GlobalResource globalResource = JacksonUtils.readValue(json, GlobalResource.class);
|
||||
// assertEquals(96.12347, globalResource.numberVal.doubleValue());
|
||||
// assertEquals("96.12347", globalResource.numberVal.toPlainString());
|
||||
// System.err.println(globalResource.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void initTest() throws Throwable {
|
||||
|
||||
FizzAppContext.appContext = new GenericApplicationContext();
|
||||
FizzAppContext.appContext.refresh();
|
||||
Fizz.context = new GenericApplicationContext();
|
||||
Fizz.context.refresh();
|
||||
|
||||
Map<String, String> dictsMap = new HashMap<>();
|
||||
dictsMap.put("key0", "{\"id\":1,\"key\":\"key0\",\"type\":2,\"value\":\"val0\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}");
|
||||
dictsMap.put("key1", "{\"id\":1,\"key\":\"key1\",\"type\":2,\"value\":\"val1\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}");
|
||||
stringRedisTemplate.opsForHash().putAll("fizz_dict", dictsMap);
|
||||
Map<String, String> resourceMap = new HashMap<>();
|
||||
resourceMap.put("key0", "{\"id\":1,\"key\":\"key0\",\"type\":2,\"value\":\"val0\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}");
|
||||
resourceMap.put("key1", "{\"id\":1,\"key\":\"key1\",\"type\":2,\"value\":\"val1\",\"create\":1633756859538,\"update\":1633756859538,\"isDeleted\":1}");
|
||||
stringRedisTemplate.opsForHash().putAll("fizz_global_resource", resourceMap);
|
||||
|
||||
dictService.init();
|
||||
globalResourceService.init();
|
||||
}
|
||||
}
|
||||
6
pom.xml
6
pom.xml
@@ -7,10 +7,10 @@
|
||||
<!--<java.version>1.8</java.version>-->
|
||||
<spring-boot.version>2.2.13.RELEASE</spring-boot.version>
|
||||
<spring-framework.version>5.2.13.RELEASE</spring-framework.version>
|
||||
<reactor-bom.version>Dysprosium-SR23</reactor-bom.version>
|
||||
<reactor-bom.version>Dysprosium-SR24</reactor-bom.version>
|
||||
<lettuce.version>5.3.7.RELEASE</lettuce.version>
|
||||
<nacos.cloud.version>2.2.6.RELEASE</nacos.cloud.version>
|
||||
<netty.version>4.1.68.Final</netty.version>
|
||||
<netty.version>4.1.69.Final</netty.version>
|
||||
<httpcore.version>4.4.14</httpcore.version>
|
||||
<log4j2.version>2.14.1</log4j2.version>
|
||||
<slf4j.version>1.7.32</slf4j.version>
|
||||
@@ -134,7 +134,7 @@
|
||||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-test</artifactId>
|
||||
<version>3.3.20.RELEASE</version>
|
||||
<version>3.3.21.RELEASE</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
|
||||
Reference in New Issue
Block a user