diff --git a/fizz-bootstrap/pom.xml b/fizz-bootstrap/pom.xml index 35b978b..d42d962 100644 --- a/fizz-bootstrap/pom.xml +++ b/fizz-bootstrap/pom.xml @@ -37,6 +37,7 @@ 2.0.53.Final 2.2.9.RELEASE 1.30 + Moore-SR13 diff --git a/fizz-common/src/main/java/we/config/RedisReactiveConfig.java b/fizz-common/src/main/java/we/config/RedisReactiveConfig.java index 36987d1..854a559 100644 --- a/fizz-common/src/main/java/we/config/RedisReactiveConfig.java +++ b/fizz-common/src/main/java/we/config/RedisReactiveConfig.java @@ -18,31 +18,46 @@ package we.config; import io.lettuce.core.ClientOptions; +import io.lettuce.core.ReadFrom; +import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; +import java.time.Duration; + /** * @author hongqiaowei */ public abstract class RedisReactiveConfig { - protected static final Logger log = LoggerFactory.getLogger(RedisReactiveConfig.class); + protected static final Logger LOGGER = LoggerFactory.getLogger(RedisReactiveConfig.class); - // this should not be changed unless there is a truely good reason to do so - private static final int ps = Runtime.getRuntime().availableProcessors(); - public static final ClientResources clientResources = DefaultClientResources.builder() - .ioThreadPoolSize(ps) - .computationThreadPoolSize(ps) - .build(); + // this should not be changed unless there is a good reason to do so + private static final int ps = Runtime.getRuntime().availableProcessors(); + + /** + * @deprecated and renamed to CLIENT_RESOURCES + */ + @Deprecated + public static final ClientResources clientResources = DefaultClientResources.builder() + .ioThreadPoolSize(ps) + .computationThreadPoolSize(ps) + .build(); + + public static final ClientResources CLIENT_RESOURCES = clientResources; private RedisReactiveProperties redisReactiveProperties; @@ -56,23 +71,91 @@ public abstract class RedisReactiveConfig { public ReactiveRedisConnectionFactory lettuceConnectionFactory() { - log.info("connect to {}", redisReactiveProperties); + ReactiveRedisConnectionFactory reactiveRedisConnectionFactory; - RedisStandaloneConfiguration rcs = new RedisStandaloneConfiguration(redisReactiveProperties.getHost(), redisReactiveProperties.getPort()); - String password = redisReactiveProperties.getPassword(); - if (password != null) { - rcs.setPassword(password); + if (redisReactiveProperties.getType() == RedisReactiveProperties.STANDALONE) { + + RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(redisReactiveProperties.getHost(), redisReactiveProperties.getPort()); + String password = redisReactiveProperties.getPassword(); + if (password != null) { + redisStandaloneConfiguration.setPassword(password); + } + redisStandaloneConfiguration.setDatabase(redisReactiveProperties.getDatabase()); + + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); + poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2); + LettucePoolingClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder() + .clientResources(clientResources) + .clientOptions(ClientOptions.builder().publishOnScheduler(true).build()) + .poolConfig(poolConfig) + .build(); + + reactiveRedisConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfiguration); + + } else { + + RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(); + String password = redisReactiveProperties.getPassword(); + if (password != null) { + redisClusterConfiguration.setPassword(password); + } + redisClusterConfiguration.setClusterNodes(redisReactiveProperties.getClusterNodes()); + int maxRedirects = redisReactiveProperties.getMaxRedirects(); + if (maxRedirects > 0) { + redisClusterConfiguration.setMaxRedirects(maxRedirects); + } + + ClusterTopologyRefreshOptions.Builder builder = ClusterTopologyRefreshOptions.builder(); + int clusterRefreshPeriod = redisReactiveProperties.getClusterRefreshPeriod(); + builder = builder.enablePeriodicRefresh(Duration.ofSeconds(clusterRefreshPeriod)); + boolean enableAllAdaptiveRefreshTriggers = redisReactiveProperties.isEnableAllAdaptiveRefreshTriggers(); + if (enableAllAdaptiveRefreshTriggers) { + builder = builder.enableAllAdaptiveRefreshTriggers(); + } + ClusterTopologyRefreshOptions topologyRefreshOptions = builder.build(); + + ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder() + .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(clusterRefreshPeriod))) + .topologyRefreshOptions(topologyRefreshOptions) + .publishOnScheduler(true) + .build(); + + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); + int minIdle = redisReactiveProperties.getMinIdle(); + if (minIdle > 0) { + poolConfig.setMinIdle(minIdle); + } + int maxIdle = redisReactiveProperties.getMaxIdle(); + if (maxIdle > 0) { + poolConfig.setMaxIdle(maxIdle); + } + int maxTotal = redisReactiveProperties.getMaxTotal(); + if (maxTotal > 0) { + poolConfig.setMaxTotal(maxTotal); + } else { + poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2); + } + Duration maxWait = redisReactiveProperties.getMaxWait(); + if (maxWait != null) { + poolConfig.setMaxWait(maxWait); + } + Duration timeBetweenEvictionRuns = redisReactiveProperties.getTimeBetweenEvictionRuns(); + if (timeBetweenEvictionRuns != null) { + poolConfig.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns); + } + + LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder() + .clientResources(clientResources) + // .commandTimeout(Duration.ofSeconds(60)) + .poolConfig(poolConfig) + .readFrom(redisReactiveProperties.getReadFrom()) + .clientOptions(clusterClientOptions) + .build(); + + reactiveRedisConnectionFactory = new LettuceConnectionFactory(redisClusterConfiguration, clientConfig); } - rcs.setDatabase(redisReactiveProperties.getDatabase()); - GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); - poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2); - LettucePoolingClientConfiguration ccs = LettucePoolingClientConfiguration.builder() - .clientResources(clientResources) - .clientOptions(ClientOptions.builder().publishOnScheduler(true).build()) - .poolConfig(poolConfig) - .build(); - - return new LettuceConnectionFactory(rcs, ccs); + LOGGER.info("build reactive redis connection factory for {}", redisReactiveProperties); + return reactiveRedisConnectionFactory; } } diff --git a/fizz-common/src/main/java/we/config/RedisReactiveProperties.java b/fizz-common/src/main/java/we/config/RedisReactiveProperties.java index 4ae2772..a227aa2 100644 --- a/fizz-common/src/main/java/we/config/RedisReactiveProperties.java +++ b/fizz-common/src/main/java/we/config/RedisReactiveProperties.java @@ -17,20 +17,68 @@ package we.config; -import we.util.Constants; +import io.lettuce.core.ReadFrom; +import org.springframework.data.redis.connection.RedisNode; import we.util.Consts; +import we.util.StringUtils; import we.util.Utils; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + /** * @author hongqiaowei */ public abstract class RedisReactiveProperties { - private String host = "127.0.0.1"; - private int port = 6379; - private String password; - private int database = 0; + public static final String STANDALONE = "standalone"; + public static final String CLUSTER = "cluster"; + + private String type = STANDALONE; + + private String host = "127.0.0.1"; + + private int port = 6379; + + private String password; + + private int database = 0; + + private List clusterNodes; + + private int maxRedirects = 0; + + private int clusterRefreshPeriod = 60; + + private boolean clusterRefreshAdaptive = true; + + private boolean enableAllAdaptiveRefreshTriggers = true; + + private ReadFrom readFrom = ReadFrom.REPLICA_PREFERRED; + + private int minIdle = 0; + + private int maxIdle = 0; + + private int maxTotal = 0; + + private Duration maxWait; + + private Duration timeBetweenEvictionRuns; + + public String getType() { + return type; + } + + public void setType(String type) { + if (type.equals(STANDALONE)) { + this.type = STANDALONE; + } else { + this.type = CLUSTER; + } + } public String getHost() { return host; @@ -64,19 +112,127 @@ public abstract class RedisReactiveProperties { this.database = database; } + public List getClusterNodes() { + return clusterNodes; + } + + public void setClusterNodes(String clusterNodes) { + String[] nodeArr = StringUtils.split(clusterNodes, ','); + this.clusterNodes = new ArrayList<>(nodeArr.length); + for (String n : nodeArr) { + String[] ipAndPort = StringUtils.split(n.trim(), ':'); + RedisNode redisNode = new RedisNode(ipAndPort[0], Integer.parseInt(ipAndPort[1])); + this.clusterNodes.add(redisNode); + } + } + + public int getMaxRedirects() { + return maxRedirects; + } + + public void setMaxRedirects(int maxRedirects) { + this.maxRedirects = maxRedirects; + } + + public int getClusterRefreshPeriod() { + return clusterRefreshPeriod; + } + + public void setClusterRefreshPeriod(int clusterRefreshPeriod) { + this.clusterRefreshPeriod = clusterRefreshPeriod; + } + + public boolean isClusterRefreshAdaptive() { + return clusterRefreshAdaptive; + } + + public void setClusterRefreshAdaptive(boolean clusterRefreshAdaptive) { + this.clusterRefreshAdaptive = clusterRefreshAdaptive; + } + + public boolean isEnableAllAdaptiveRefreshTriggers() { + return enableAllAdaptiveRefreshTriggers; + } + + public void setEnableAllAdaptiveRefreshTriggers(boolean enableAllAdaptiveRefreshTriggers) { + this.enableAllAdaptiveRefreshTriggers = enableAllAdaptiveRefreshTriggers; + } + + public ReadFrom getReadFrom() { + return readFrom; + } + + public void setReadFrom(String readFrom) { + this.readFrom = ReadFrom.valueOf(readFrom); + } + + public int getMinIdle() { + return minIdle; + } + + public void setMinIdle(int minIdle) { + this.minIdle = minIdle; + } + + public int getMaxIdle() { + return maxIdle; + } + + public void setMaxIdle(int maxIdle) { + this.maxIdle = maxIdle; + } + + public int getMaxTotal() { + return maxTotal; + } + + public void setMaxTotal(int maxTotal) { + this.maxTotal = maxTotal; + } + + public Duration getMaxWait() { + return maxWait; + } + + public void setMaxWait(long maxWait) { + this.maxWait = Duration.ofMillis(maxWait); + } + + public Duration getTimeBetweenEvictionRuns() { + return timeBetweenEvictionRuns; + } + + public void setTimeBetweenEvictionRuns(long timeBetweenEvictionRuns) { + this.timeBetweenEvictionRuns = Duration.ofMillis(timeBetweenEvictionRuns); + } + @Override public String toString() { - StringBuilder b = new StringBuilder(48); + StringBuilder b = new StringBuilder(256); appendTo(b); return b.toString(); } public void appendTo(StringBuilder b) { b.append(Consts.S.LEFT_BRACE); - Utils.addTo(b, "host", Consts.S.EQUAL, host, Consts.S.SPACE_STR); - Utils.addTo(b, "port", Consts.S.EQUAL, port, Consts.S.SPACE_STR); -// Utils.addTo(b, "password", Consts.S.EQUAL, password, Consts.S.SPACE_STR); - Utils.addTo(b, "database", Consts.S.EQUAL, database, Consts.S.EMPTY); + if (type == STANDALONE) { + Utils.addTo(b, "host", Consts.S.EQUAL, host, Consts.S.SPACE_STR); + Utils.addTo(b, "port", Consts.S.EQUAL, port, Consts.S.SPACE_STR); + Utils.addTo(b, "database", Consts.S.EQUAL, database, Consts.S.SPACE_STR); + } else { + Utils.addTo(b, "clusterNodes", Consts.S.EQUAL, clusterNodes, Consts.S.SPACE_STR); + Utils.addTo(b, "maxRedirects", Consts.S.EQUAL, maxRedirects, Consts.S.SPACE_STR); + Utils.addTo(b, "clusterRefreshPeriod", Consts.S.EQUAL, clusterRefreshPeriod, Consts.S.SPACE_STR); + Utils.addTo(b, "clusterRefreshAdaptive", Consts.S.EQUAL, clusterRefreshAdaptive, Consts.S.SPACE_STR); + Utils.addTo(b, "enableAllAdaptiveRefreshTriggers", Consts.S.EQUAL, enableAllAdaptiveRefreshTriggers, Consts.S.SPACE_STR); + Utils.addTo(b, "readFrom", Consts.S.EQUAL, readFrom, Consts.S.SPACE_STR); + } + + Utils.addTo(b, "minIdle", Consts.S.EQUAL, minIdle, Consts.S.SPACE_STR); + Utils.addTo(b, "maxIdle", Consts.S.EQUAL, maxIdle, Consts.S.SPACE_STR); + Utils.addTo(b, "maxWait", Consts.S.EQUAL, maxWait, Consts.S.SPACE_STR); + Utils.addTo(b, "maxTotal", Consts.S.EQUAL, maxTotal, Consts.S.SPACE_STR); + Utils.addTo(b, "timeBetweenEvictionRuns", Consts.S.EQUAL, timeBetweenEvictionRuns, Consts.S.EMPTY); b.append(Consts.S.RIGHT_BRACE); } } diff --git a/fizz-common/src/main/java/we/util/ReactiveRedisHelper.java b/fizz-common/src/main/java/we/util/ReactiveRedisHelper.java index 9e1df21..85cd310 100644 --- a/fizz-common/src/main/java/we/util/ReactiveRedisHelper.java +++ b/fizz-common/src/main/java/we/util/ReactiveRedisHelper.java @@ -18,13 +18,21 @@ package we.util; import io.lettuce.core.ClientOptions; +import io.lettuce.core.TimeoutOptions; +import io.lettuce.core.cluster.ClusterClientOptions; +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration; import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import we.config.RedisReactiveConfig; +import we.config.RedisReactiveProperties; + +import java.time.Duration; /** * @apiNote just helper, RedisReactiveConfig is best practice @@ -38,26 +46,114 @@ public abstract class ReactiveRedisHelper { private ReactiveRedisHelper() { } - public static ReactiveRedisConnectionFactory getConnectionFactory(String host, int port, String password, int database) { - RedisStandaloneConfiguration rcs = new RedisStandaloneConfiguration(host, port); - if (password != null) { - rcs.setPassword(password); + public static ReactiveRedisConnectionFactory getConnectionFactory(RedisReactiveProperties redisReactiveProperties) { + if (redisReactiveProperties.getType() == RedisReactiveProperties.STANDALONE) { + return getConnectionFactory(redisReactiveProperties.getHost(), redisReactiveProperties.getPort(), redisReactiveProperties.getPassword(), redisReactiveProperties.getDatabase()); + } else { + return getClusterConnectionFactory(redisReactiveProperties); } - rcs.setDatabase(database); + } + + public static ReactiveStringRedisTemplate getStringRedisTemplate(RedisReactiveProperties redisReactiveProperties) { + ReactiveRedisConnectionFactory connectionFactory = getConnectionFactory(redisReactiveProperties); + return new ReactiveStringRedisTemplate(connectionFactory); + } + + /** + * For standalone redis. + */ + public static ReactiveRedisConnectionFactory getConnectionFactory(String host, int port, String password, int database) { + RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration(host, port); + if (password != null) { + rsc.setPassword(password); + } + rsc.setDatabase(database); LettucePoolingClientConfiguration ccs = LettucePoolingClientConfiguration.builder() - .clientResources(RedisReactiveConfig.clientResources) - .clientOptions(ClientOptions.builder().publishOnScheduler(true).build()) - .poolConfig(new GenericObjectPoolConfig<>()) - .build(); + .clientResources(RedisReactiveConfig.CLIENT_RESOURCES) + .clientOptions(ClientOptions.builder().publishOnScheduler(true).build()) + .poolConfig(new GenericObjectPoolConfig<>()) + .build(); - LettuceConnectionFactory factory = new LettuceConnectionFactory(rcs, ccs); + LettuceConnectionFactory factory = new LettuceConnectionFactory(rsc, ccs); factory.afterPropertiesSet(); return factory; } + /** + * For standalone redis. + */ public static ReactiveStringRedisTemplate getStringRedisTemplate(String host, int port, String password, int database) { ReactiveRedisConnectionFactory connectionFactory = getConnectionFactory(host, port, password, database); return new ReactiveStringRedisTemplate(connectionFactory); } + + public static ReactiveRedisConnectionFactory getClusterConnectionFactory(RedisReactiveProperties redisReactiveProperties) { + RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(); + String password = redisReactiveProperties.getPassword(); + if (password != null) { + redisClusterConfiguration.setPassword(password); + } + redisClusterConfiguration.setClusterNodes(redisReactiveProperties.getClusterNodes()); + int maxRedirects = redisReactiveProperties.getMaxRedirects(); + if (maxRedirects > 0) { + redisClusterConfiguration.setMaxRedirects(maxRedirects); + } + + ClusterTopologyRefreshOptions.Builder builder = ClusterTopologyRefreshOptions.builder(); + int clusterRefreshPeriod = redisReactiveProperties.getClusterRefreshPeriod(); + builder = builder.enablePeriodicRefresh(Duration.ofSeconds(clusterRefreshPeriod)); + boolean enableAllAdaptiveRefreshTriggers = redisReactiveProperties.isEnableAllAdaptiveRefreshTriggers(); + if (enableAllAdaptiveRefreshTriggers) { + builder = builder.enableAllAdaptiveRefreshTriggers(); + } + ClusterTopologyRefreshOptions topologyRefreshOptions = builder.build(); + + ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder() + .timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(clusterRefreshPeriod))) + .topologyRefreshOptions(topologyRefreshOptions) + .publishOnScheduler(true) + .build(); + + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); + int minIdle = redisReactiveProperties.getMinIdle(); + if (minIdle > 0) { + poolConfig.setMinIdle(minIdle); + } + int maxIdle = redisReactiveProperties.getMaxIdle(); + if (maxIdle > 0) { + poolConfig.setMaxIdle(maxIdle); + } + int maxTotal = redisReactiveProperties.getMaxTotal(); + if (maxTotal > 0) { + poolConfig.setMaxTotal(maxTotal); + } else { + poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2); + } + Duration maxWait = redisReactiveProperties.getMaxWait(); + if (maxWait != null) { + poolConfig.setMaxWait(maxWait); + } + Duration timeBetweenEvictionRuns = redisReactiveProperties.getTimeBetweenEvictionRuns(); + if (timeBetweenEvictionRuns != null) { + poolConfig.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns); + } + + LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder() + .clientResources(RedisReactiveConfig.CLIENT_RESOURCES) + // .commandTimeout(Duration.ofSeconds(60)) + .poolConfig(poolConfig) + .readFrom(redisReactiveProperties.getReadFrom()) + .clientOptions(clusterClientOptions) + .build(); + + LettuceConnectionFactory reactiveRedisConnectionFactory = new LettuceConnectionFactory(redisClusterConfiguration, clientConfig); + reactiveRedisConnectionFactory.afterPropertiesSet(); + return reactiveRedisConnectionFactory; + } + + public static ReactiveStringRedisTemplate getClusterStringRedisTemplate(RedisReactiveProperties redisReactiveProperties) { + ReactiveRedisConnectionFactory connectionFactory = getClusterConnectionFactory(redisReactiveProperties); + return new ReactiveStringRedisTemplate(connectionFactory); + } } diff --git a/fizz-core/src/main/java/we/beans/factory/config/FizzBeanFactoryPostProcessor.java b/fizz-core/src/main/java/we/beans/factory/config/FizzBeanFactoryPostProcessor.java index cfa7490..562c36f 100644 --- a/fizz-core/src/main/java/we/beans/factory/config/FizzBeanFactoryPostProcessor.java +++ b/fizz-core/src/main/java/we/beans/factory/config/FizzBeanFactoryPostProcessor.java @@ -18,7 +18,6 @@ package we.beans.factory.config; import com.fasterxml.jackson.core.type.TypeReference; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.BeansException; @@ -31,17 +30,15 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.EnvironmentAware; import org.springframework.core.Ordered; -import org.springframework.core.env.ConfigurableEnvironment; -import org.springframework.core.env.Environment; -import org.springframework.core.env.MapPropertySource; -import org.springframework.core.env.MutablePropertySources; +import org.springframework.core.env.*; import org.springframework.data.redis.core.ReactiveStringRedisTemplate; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import we.config.AggregateRedisConfig; import we.config.FizzConfigConfiguration; +import we.config.RedisReactiveProperties; import we.context.config.annotation.FizzRefreshScope; import we.context.event.FizzRefreshEvent; -import we.global_resource.GlobalResource; import we.util.*; import java.util.*; @@ -58,7 +55,7 @@ public class FizzBeanFactoryPostProcessor implements BeanFactoryPostProcessor, E private ConfigurableEnvironment environment; - private final Map property2beanMap = new HashMap<>(32); + private final Map property2beanMap = new HashMap<>(); private ReactiveStringRedisTemplate reactiveStringRedisTemplate; @@ -73,11 +70,27 @@ public class FizzBeanFactoryPostProcessor implements BeanFactoryPostProcessor, E } private void initReactiveStringRedisTemplate() { - String host = environment.getProperty("aggregate.redis.host"); - String port = environment.getProperty("aggregate.redis.port"); - String password = environment.getProperty("aggregate.redis.password"); - String database = environment.getProperty("aggregate.redis.database"); - reactiveStringRedisTemplate = ReactiveRedisHelper.getStringRedisTemplate(host, Integer.parseInt(port), password, Integer.parseInt(database)); + Properties aggregateRedisProperties = new Properties(); + boolean find = false; + for (PropertySource propertySource : environment.getPropertySources()) { + if (MapPropertySource.class.isAssignableFrom(propertySource.getClass())) { + MapPropertySource mps = (MapPropertySource) propertySource; + String[] propertyNames = mps.getPropertyNames(); + for (String propertyName : propertyNames) { + if (propertyName.startsWith(AggregateRedisConfig.AGGREGATE_REDIS_PREFIX)) { + aggregateRedisProperties.put(propertyName.substring(AggregateRedisConfig.AGGREGATE_REDIS_PREFIX.length() + 1), mps.getProperty(propertyName)); + find = true; + } + } + if (find) { + break; + } + } + } + RedisReactiveProperties redisReactiveProperties = new RedisReactiveProperties() { + }; + PropertiesUtils.setBeanPropertyValue(redisReactiveProperties, aggregateRedisProperties); + reactiveStringRedisTemplate = ReactiveRedisHelper.getStringRedisTemplate(redisReactiveProperties); } private void initFizzPropertySource() { diff --git a/fizz-core/src/main/java/we/config/AggregateRedisConfig.java b/fizz-core/src/main/java/we/config/AggregateRedisConfig.java index 942e3c0..d459e90 100644 --- a/fizz-core/src/main/java/we/config/AggregateRedisConfig.java +++ b/fizz-core/src/main/java/we/config/AggregateRedisConfig.java @@ -51,13 +51,14 @@ public class AggregateRedisConfig extends RedisReactiveConfig { public static final String AGGREGATE_REACTIVE_REDIS_TEMPLATE = "aggregateReactiveRedisTemplate"; public static final String AGGREGATE_REACTIVE_REDIS_MESSAGE_LISTENER_CONTAINER = "aggregateReactiveRedisMessageListenerContainer"; private static final String SEND_LOG_TYPE_REDIS = "redis"; + public static final String AGGREGATE_REDIS_PREFIX = "aggregate.redis"; public static ProxyLettuceConnectionFactory proxyLettuceConnectionFactory; @Resource private AggregateRedisConfigProperties aggregateRedisConfigProperties; - @ConfigurationProperties(prefix = "aggregate.redis") + @ConfigurationProperties(prefix = AGGREGATE_REDIS_PREFIX) @Configuration(AGGREGATE_REACTIVE_REDIS_PROPERTIES) public static class AggregateRedisReactiveProperties extends RedisReactiveProperties { } diff --git a/fizz-core/src/test/java/we/RedisClusterTests.java b/fizz-core/src/test/java/we/RedisClusterTests.java new file mode 100644 index 0000000..6e73fec --- /dev/null +++ b/fizz-core/src/test/java/we/RedisClusterTests.java @@ -0,0 +1,52 @@ +package we; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.junit.jupiter.api.Test; +import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory; +import org.springframework.data.redis.core.ReactiveStringRedisTemplate; +import we.config.RedisReactiveConfig; +import we.config.RedisReactiveProperties; + +public class RedisClusterTests { + + // @Test + void test() throws InterruptedException { + System.setProperty("log4j2.isThreadContextMapInheritable", "true"); + Logger LOGGER = LogManager.getLogger(RedisClusterTests.class); + + RedisReactiveProperties redisReactiveProperties = new RedisReactiveProperties() { + }; + redisReactiveProperties.setType(RedisReactiveProperties.CLUSTER); + redisReactiveProperties.setPassword("123456"); + redisReactiveProperties.setClusterNodes("ip:port"); + + RedisReactiveConfig redisReactiveConfig = new RedisReactiveConfig(redisReactiveProperties) { + }; + LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) redisReactiveConfig.lettuceConnectionFactory(); + lettuceConnectionFactory.afterPropertiesSet(); + ReactiveStringRedisTemplate reactiveStringRedisTemplate = redisReactiveConfig.reactiveStringRedisTemplate(lettuceConnectionFactory); + reactiveStringRedisTemplate.opsForValue().set("hqw", "lancer").block(); + + String channel = "ch1"; + reactiveStringRedisTemplate.listenToChannel(channel) + .doOnError( + t -> { + LOGGER.error("lsn channel {} error", channel, t); + } + ) + .doOnSubscribe( + s -> { + LOGGER.info("success to lsn on {}", channel); + } + ) + .doOnNext( + msg -> { + String message = msg.getMessage(); + LOGGER.info("receive message: {}", message); + } + ) + .subscribe(); + Thread.currentThread().join(); + } +} diff --git a/fizz-core/src/test/resources/log4j2-test.xml b/fizz-core/src/test/resources/log4j2-test.xml index a310415..b3b16f9 100644 --- a/fizz-core/src/test/resources/log4j2-test.xml +++ b/fizz-core/src/test/resources/log4j2-test.xml @@ -29,5 +29,6 @@ + diff --git a/pom.xml b/pom.xml index 78cf3bf..c57abf6 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,7 @@ 2.0.53.Final 2.2.9.RELEASE 1.30 + Moore-SR13