Support redis cluster

This commit is contained in:
hongqiaowei
2022-07-01 17:17:27 +08:00
parent e567214dff
commit ee5b406026
9 changed files with 459 additions and 55 deletions

View File

@@ -37,6 +37,7 @@
<netty-tcnative.version>2.0.53.Final</netty-tcnative.version>
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
<snakeyaml.version>1.30</snakeyaml.version>
<spring-data-releasetrain.version>Moore-SR13</spring-data-releasetrain.version>
</properties>
<dependencies>

View File

@@ -18,32 +18,47 @@
package we.config;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.resource.DefaultClientResources;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import java.time.Duration;
/**
* @author hongqiaowei
*/
public abstract class RedisReactiveConfig {
protected static final Logger log = LoggerFactory.getLogger(RedisReactiveConfig.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(RedisReactiveConfig.class);
// this should not be changed unless there is a truely good reason to do so
// this should not be changed unless there is a good reason to do so
private static final int ps = Runtime.getRuntime().availableProcessors();
/**
* @deprecated and renamed to CLIENT_RESOURCES
*/
@Deprecated
public static final ClientResources clientResources = DefaultClientResources.builder()
.ioThreadPoolSize(ps)
.computationThreadPoolSize(ps)
.build();
public static final ClientResources CLIENT_RESOURCES = clientResources;
private RedisReactiveProperties redisReactiveProperties;
public RedisReactiveConfig(RedisReactiveProperties properties) {
@@ -56,23 +71,91 @@ public abstract class RedisReactiveConfig {
public ReactiveRedisConnectionFactory lettuceConnectionFactory() {
log.info("connect to {}", redisReactiveProperties);
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory;
RedisStandaloneConfiguration rcs = new RedisStandaloneConfiguration(redisReactiveProperties.getHost(), redisReactiveProperties.getPort());
if (redisReactiveProperties.getType() == RedisReactiveProperties.STANDALONE) {
RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(redisReactiveProperties.getHost(), redisReactiveProperties.getPort());
String password = redisReactiveProperties.getPassword();
if (password != null) {
rcs.setPassword(password);
redisStandaloneConfiguration.setPassword(password);
}
rcs.setDatabase(redisReactiveProperties.getDatabase());
redisStandaloneConfiguration.setDatabase(redisReactiveProperties.getDatabase());
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2);
LettucePoolingClientConfiguration ccs = LettucePoolingClientConfiguration.builder()
LettucePoolingClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder()
.clientResources(clientResources)
.clientOptions(ClientOptions.builder().publishOnScheduler(true).build())
.poolConfig(poolConfig)
.build();
return new LettuceConnectionFactory(rcs, ccs);
reactiveRedisConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfiguration);
} else {
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
String password = redisReactiveProperties.getPassword();
if (password != null) {
redisClusterConfiguration.setPassword(password);
}
redisClusterConfiguration.setClusterNodes(redisReactiveProperties.getClusterNodes());
int maxRedirects = redisReactiveProperties.getMaxRedirects();
if (maxRedirects > 0) {
redisClusterConfiguration.setMaxRedirects(maxRedirects);
}
ClusterTopologyRefreshOptions.Builder builder = ClusterTopologyRefreshOptions.builder();
int clusterRefreshPeriod = redisReactiveProperties.getClusterRefreshPeriod();
builder = builder.enablePeriodicRefresh(Duration.ofSeconds(clusterRefreshPeriod));
boolean enableAllAdaptiveRefreshTriggers = redisReactiveProperties.isEnableAllAdaptiveRefreshTriggers();
if (enableAllAdaptiveRefreshTriggers) {
builder = builder.enableAllAdaptiveRefreshTriggers();
}
ClusterTopologyRefreshOptions topologyRefreshOptions = builder.build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(clusterRefreshPeriod)))
.topologyRefreshOptions(topologyRefreshOptions)
.publishOnScheduler(true)
.build();
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
int minIdle = redisReactiveProperties.getMinIdle();
if (minIdle > 0) {
poolConfig.setMinIdle(minIdle);
}
int maxIdle = redisReactiveProperties.getMaxIdle();
if (maxIdle > 0) {
poolConfig.setMaxIdle(maxIdle);
}
int maxTotal = redisReactiveProperties.getMaxTotal();
if (maxTotal > 0) {
poolConfig.setMaxTotal(maxTotal);
} else {
poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2);
}
Duration maxWait = redisReactiveProperties.getMaxWait();
if (maxWait != null) {
poolConfig.setMaxWait(maxWait);
}
Duration timeBetweenEvictionRuns = redisReactiveProperties.getTimeBetweenEvictionRuns();
if (timeBetweenEvictionRuns != null) {
poolConfig.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns);
}
LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.clientResources(clientResources)
// .commandTimeout(Duration.ofSeconds(60))
.poolConfig(poolConfig)
.readFrom(redisReactiveProperties.getReadFrom())
.clientOptions(clusterClientOptions)
.build();
reactiveRedisConnectionFactory = new LettuceConnectionFactory(redisClusterConfiguration, clientConfig);
}
LOGGER.info("build reactive redis connection factory for {}", redisReactiveProperties);
return reactiveRedisConnectionFactory;
}
}

View File

@@ -17,21 +17,69 @@
package we.config;
import we.util.Constants;
import io.lettuce.core.ReadFrom;
import org.springframework.data.redis.connection.RedisNode;
import we.util.Consts;
import we.util.StringUtils;
import we.util.Utils;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
/**
* @author hongqiaowei
*/
public abstract class RedisReactiveProperties {
public static final String STANDALONE = "standalone";
public static final String CLUSTER = "cluster";
private String type = STANDALONE;
private String host = "127.0.0.1";
private int port = 6379;
private String password;
private int database = 0;
private List<RedisNode> clusterNodes;
private int maxRedirects = 0;
private int clusterRefreshPeriod = 60;
private boolean clusterRefreshAdaptive = true;
private boolean enableAllAdaptiveRefreshTriggers = true;
private ReadFrom readFrom = ReadFrom.REPLICA_PREFERRED;
private int minIdle = 0;
private int maxIdle = 0;
private int maxTotal = 0;
private Duration maxWait;
private Duration timeBetweenEvictionRuns;
public String getType() {
return type;
}
public void setType(String type) {
if (type.equals(STANDALONE)) {
this.type = STANDALONE;
} else {
this.type = CLUSTER;
}
}
public String getHost() {
return host;
}
@@ -64,19 +112,127 @@ public abstract class RedisReactiveProperties {
this.database = database;
}
public List<RedisNode> getClusterNodes() {
return clusterNodes;
}
public void setClusterNodes(String clusterNodes) {
String[] nodeArr = StringUtils.split(clusterNodes, ',');
this.clusterNodes = new ArrayList<>(nodeArr.length);
for (String n : nodeArr) {
String[] ipAndPort = StringUtils.split(n.trim(), ':');
RedisNode redisNode = new RedisNode(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
this.clusterNodes.add(redisNode);
}
}
public int getMaxRedirects() {
return maxRedirects;
}
public void setMaxRedirects(int maxRedirects) {
this.maxRedirects = maxRedirects;
}
public int getClusterRefreshPeriod() {
return clusterRefreshPeriod;
}
public void setClusterRefreshPeriod(int clusterRefreshPeriod) {
this.clusterRefreshPeriod = clusterRefreshPeriod;
}
public boolean isClusterRefreshAdaptive() {
return clusterRefreshAdaptive;
}
public void setClusterRefreshAdaptive(boolean clusterRefreshAdaptive) {
this.clusterRefreshAdaptive = clusterRefreshAdaptive;
}
public boolean isEnableAllAdaptiveRefreshTriggers() {
return enableAllAdaptiveRefreshTriggers;
}
public void setEnableAllAdaptiveRefreshTriggers(boolean enableAllAdaptiveRefreshTriggers) {
this.enableAllAdaptiveRefreshTriggers = enableAllAdaptiveRefreshTriggers;
}
public ReadFrom getReadFrom() {
return readFrom;
}
public void setReadFrom(String readFrom) {
this.readFrom = ReadFrom.valueOf(readFrom);
}
public int getMinIdle() {
return minIdle;
}
public void setMinIdle(int minIdle) {
this.minIdle = minIdle;
}
public int getMaxIdle() {
return maxIdle;
}
public void setMaxIdle(int maxIdle) {
this.maxIdle = maxIdle;
}
public int getMaxTotal() {
return maxTotal;
}
public void setMaxTotal(int maxTotal) {
this.maxTotal = maxTotal;
}
public Duration getMaxWait() {
return maxWait;
}
public void setMaxWait(long maxWait) {
this.maxWait = Duration.ofMillis(maxWait);
}
public Duration getTimeBetweenEvictionRuns() {
return timeBetweenEvictionRuns;
}
public void setTimeBetweenEvictionRuns(long timeBetweenEvictionRuns) {
this.timeBetweenEvictionRuns = Duration.ofMillis(timeBetweenEvictionRuns);
}
@Override
public String toString() {
StringBuilder b = new StringBuilder(48);
StringBuilder b = new StringBuilder(256);
appendTo(b);
return b.toString();
}
public void appendTo(StringBuilder b) {
b.append(Consts.S.LEFT_BRACE);
if (type == STANDALONE) {
Utils.addTo(b, "host", Consts.S.EQUAL, host, Consts.S.SPACE_STR);
Utils.addTo(b, "port", Consts.S.EQUAL, port, Consts.S.SPACE_STR);
// Utils.addTo(b, "password", Consts.S.EQUAL, password, Consts.S.SPACE_STR);
Utils.addTo(b, "database", Consts.S.EQUAL, database, Consts.S.EMPTY);
Utils.addTo(b, "database", Consts.S.EQUAL, database, Consts.S.SPACE_STR);
} else {
Utils.addTo(b, "clusterNodes", Consts.S.EQUAL, clusterNodes, Consts.S.SPACE_STR);
Utils.addTo(b, "maxRedirects", Consts.S.EQUAL, maxRedirects, Consts.S.SPACE_STR);
Utils.addTo(b, "clusterRefreshPeriod", Consts.S.EQUAL, clusterRefreshPeriod, Consts.S.SPACE_STR);
Utils.addTo(b, "clusterRefreshAdaptive", Consts.S.EQUAL, clusterRefreshAdaptive, Consts.S.SPACE_STR);
Utils.addTo(b, "enableAllAdaptiveRefreshTriggers", Consts.S.EQUAL, enableAllAdaptiveRefreshTriggers, Consts.S.SPACE_STR);
Utils.addTo(b, "readFrom", Consts.S.EQUAL, readFrom, Consts.S.SPACE_STR);
}
Utils.addTo(b, "minIdle", Consts.S.EQUAL, minIdle, Consts.S.SPACE_STR);
Utils.addTo(b, "maxIdle", Consts.S.EQUAL, maxIdle, Consts.S.SPACE_STR);
Utils.addTo(b, "maxWait", Consts.S.EQUAL, maxWait, Consts.S.SPACE_STR);
Utils.addTo(b, "maxTotal", Consts.S.EQUAL, maxTotal, Consts.S.SPACE_STR);
Utils.addTo(b, "timeBetweenEvictionRuns", Consts.S.EQUAL, timeBetweenEvictionRuns, Consts.S.EMPTY);
b.append(Consts.S.RIGHT_BRACE);
}
}

View File

@@ -18,13 +18,21 @@
package we.util;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.TimeoutOptions;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisClusterConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import we.config.RedisReactiveConfig;
import we.config.RedisReactiveProperties;
import java.time.Duration;
/**
* @apiNote just helper, RedisReactiveConfig is best practice
@@ -38,26 +46,114 @@ public abstract class ReactiveRedisHelper {
private ReactiveRedisHelper() {
}
public static ReactiveRedisConnectionFactory getConnectionFactory(String host, int port, String password, int database) {
RedisStandaloneConfiguration rcs = new RedisStandaloneConfiguration(host, port);
if (password != null) {
rcs.setPassword(password);
public static ReactiveRedisConnectionFactory getConnectionFactory(RedisReactiveProperties redisReactiveProperties) {
if (redisReactiveProperties.getType() == RedisReactiveProperties.STANDALONE) {
return getConnectionFactory(redisReactiveProperties.getHost(), redisReactiveProperties.getPort(), redisReactiveProperties.getPassword(), redisReactiveProperties.getDatabase());
} else {
return getClusterConnectionFactory(redisReactiveProperties);
}
rcs.setDatabase(database);
}
public static ReactiveStringRedisTemplate getStringRedisTemplate(RedisReactiveProperties redisReactiveProperties) {
ReactiveRedisConnectionFactory connectionFactory = getConnectionFactory(redisReactiveProperties);
return new ReactiveStringRedisTemplate(connectionFactory);
}
/**
* For standalone redis.
*/
public static ReactiveRedisConnectionFactory getConnectionFactory(String host, int port, String password, int database) {
RedisStandaloneConfiguration rsc = new RedisStandaloneConfiguration(host, port);
if (password != null) {
rsc.setPassword(password);
}
rsc.setDatabase(database);
LettucePoolingClientConfiguration ccs = LettucePoolingClientConfiguration.builder()
.clientResources(RedisReactiveConfig.clientResources)
.clientResources(RedisReactiveConfig.CLIENT_RESOURCES)
.clientOptions(ClientOptions.builder().publishOnScheduler(true).build())
.poolConfig(new GenericObjectPoolConfig<>())
.build();
LettuceConnectionFactory factory = new LettuceConnectionFactory(rcs, ccs);
LettuceConnectionFactory factory = new LettuceConnectionFactory(rsc, ccs);
factory.afterPropertiesSet();
return factory;
}
/**
* For standalone redis.
*/
public static ReactiveStringRedisTemplate getStringRedisTemplate(String host, int port, String password, int database) {
ReactiveRedisConnectionFactory connectionFactory = getConnectionFactory(host, port, password, database);
return new ReactiveStringRedisTemplate(connectionFactory);
}
public static ReactiveRedisConnectionFactory getClusterConnectionFactory(RedisReactiveProperties redisReactiveProperties) {
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
String password = redisReactiveProperties.getPassword();
if (password != null) {
redisClusterConfiguration.setPassword(password);
}
redisClusterConfiguration.setClusterNodes(redisReactiveProperties.getClusterNodes());
int maxRedirects = redisReactiveProperties.getMaxRedirects();
if (maxRedirects > 0) {
redisClusterConfiguration.setMaxRedirects(maxRedirects);
}
ClusterTopologyRefreshOptions.Builder builder = ClusterTopologyRefreshOptions.builder();
int clusterRefreshPeriod = redisReactiveProperties.getClusterRefreshPeriod();
builder = builder.enablePeriodicRefresh(Duration.ofSeconds(clusterRefreshPeriod));
boolean enableAllAdaptiveRefreshTriggers = redisReactiveProperties.isEnableAllAdaptiveRefreshTriggers();
if (enableAllAdaptiveRefreshTriggers) {
builder = builder.enableAllAdaptiveRefreshTriggers();
}
ClusterTopologyRefreshOptions topologyRefreshOptions = builder.build();
ClusterClientOptions clusterClientOptions = ClusterClientOptions.builder()
.timeoutOptions(TimeoutOptions.enabled(Duration.ofSeconds(clusterRefreshPeriod)))
.topologyRefreshOptions(topologyRefreshOptions)
.publishOnScheduler(true)
.build();
GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
int minIdle = redisReactiveProperties.getMinIdle();
if (minIdle > 0) {
poolConfig.setMinIdle(minIdle);
}
int maxIdle = redisReactiveProperties.getMaxIdle();
if (maxIdle > 0) {
poolConfig.setMaxIdle(maxIdle);
}
int maxTotal = redisReactiveProperties.getMaxTotal();
if (maxTotal > 0) {
poolConfig.setMaxTotal(maxTotal);
} else {
poolConfig.setMaxTotal(poolConfig.getMaxTotal() * 2);
}
Duration maxWait = redisReactiveProperties.getMaxWait();
if (maxWait != null) {
poolConfig.setMaxWait(maxWait);
}
Duration timeBetweenEvictionRuns = redisReactiveProperties.getTimeBetweenEvictionRuns();
if (timeBetweenEvictionRuns != null) {
poolConfig.setTimeBetweenEvictionRuns(timeBetweenEvictionRuns);
}
LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.clientResources(RedisReactiveConfig.CLIENT_RESOURCES)
// .commandTimeout(Duration.ofSeconds(60))
.poolConfig(poolConfig)
.readFrom(redisReactiveProperties.getReadFrom())
.clientOptions(clusterClientOptions)
.build();
LettuceConnectionFactory reactiveRedisConnectionFactory = new LettuceConnectionFactory(redisClusterConfiguration, clientConfig);
reactiveRedisConnectionFactory.afterPropertiesSet();
return reactiveRedisConnectionFactory;
}
public static ReactiveStringRedisTemplate getClusterStringRedisTemplate(RedisReactiveProperties redisReactiveProperties) {
ReactiveRedisConnectionFactory connectionFactory = getClusterConnectionFactory(redisReactiveProperties);
return new ReactiveStringRedisTemplate(connectionFactory);
}
}

View File

@@ -18,7 +18,6 @@
package we.beans.factory.config;
import com.fasterxml.jackson.core.type.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
@@ -31,17 +30,15 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.Ordered;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.core.env.*;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import we.config.AggregateRedisConfig;
import we.config.FizzConfigConfiguration;
import we.config.RedisReactiveProperties;
import we.context.config.annotation.FizzRefreshScope;
import we.context.event.FizzRefreshEvent;
import we.global_resource.GlobalResource;
import we.util.*;
import java.util.*;
@@ -58,7 +55,7 @@ public class FizzBeanFactoryPostProcessor implements BeanFactoryPostProcessor, E
private ConfigurableEnvironment environment;
private final Map<String, String> property2beanMap = new HashMap<>(32);
private final Map<String, String> property2beanMap = new HashMap<>();
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
@@ -73,11 +70,27 @@ public class FizzBeanFactoryPostProcessor implements BeanFactoryPostProcessor, E
}
private void initReactiveStringRedisTemplate() {
String host = environment.getProperty("aggregate.redis.host");
String port = environment.getProperty("aggregate.redis.port");
String password = environment.getProperty("aggregate.redis.password");
String database = environment.getProperty("aggregate.redis.database");
reactiveStringRedisTemplate = ReactiveRedisHelper.getStringRedisTemplate(host, Integer.parseInt(port), password, Integer.parseInt(database));
Properties aggregateRedisProperties = new Properties();
boolean find = false;
for (PropertySource<?> propertySource : environment.getPropertySources()) {
if (MapPropertySource.class.isAssignableFrom(propertySource.getClass())) {
MapPropertySource mps = (MapPropertySource) propertySource;
String[] propertyNames = mps.getPropertyNames();
for (String propertyName : propertyNames) {
if (propertyName.startsWith(AggregateRedisConfig.AGGREGATE_REDIS_PREFIX)) {
aggregateRedisProperties.put(propertyName.substring(AggregateRedisConfig.AGGREGATE_REDIS_PREFIX.length() + 1), mps.getProperty(propertyName));
find = true;
}
}
if (find) {
break;
}
}
}
RedisReactiveProperties redisReactiveProperties = new RedisReactiveProperties() {
};
PropertiesUtils.setBeanPropertyValue(redisReactiveProperties, aggregateRedisProperties);
reactiveStringRedisTemplate = ReactiveRedisHelper.getStringRedisTemplate(redisReactiveProperties);
}
private void initFizzPropertySource() {

View File

@@ -51,13 +51,14 @@ public class AggregateRedisConfig extends RedisReactiveConfig {
public static final String AGGREGATE_REACTIVE_REDIS_TEMPLATE = "aggregateReactiveRedisTemplate";
public static final String AGGREGATE_REACTIVE_REDIS_MESSAGE_LISTENER_CONTAINER = "aggregateReactiveRedisMessageListenerContainer";
private static final String SEND_LOG_TYPE_REDIS = "redis";
public static final String AGGREGATE_REDIS_PREFIX = "aggregate.redis";
public static ProxyLettuceConnectionFactory proxyLettuceConnectionFactory;
@Resource
private AggregateRedisConfigProperties aggregateRedisConfigProperties;
@ConfigurationProperties(prefix = "aggregate.redis")
@ConfigurationProperties(prefix = AGGREGATE_REDIS_PREFIX)
@Configuration(AGGREGATE_REACTIVE_REDIS_PROPERTIES)
public static class AggregateRedisReactiveProperties extends RedisReactiveProperties {
}

View File

@@ -0,0 +1,52 @@
package we;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
import we.config.RedisReactiveConfig;
import we.config.RedisReactiveProperties;
public class RedisClusterTests {
// @Test
void test() throws InterruptedException {
System.setProperty("log4j2.isThreadContextMapInheritable", "true");
Logger LOGGER = LogManager.getLogger(RedisClusterTests.class);
RedisReactiveProperties redisReactiveProperties = new RedisReactiveProperties() {
};
redisReactiveProperties.setType(RedisReactiveProperties.CLUSTER);
redisReactiveProperties.setPassword("123456");
redisReactiveProperties.setClusterNodes("ip:port");
RedisReactiveConfig redisReactiveConfig = new RedisReactiveConfig(redisReactiveProperties) {
};
LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) redisReactiveConfig.lettuceConnectionFactory();
lettuceConnectionFactory.afterPropertiesSet();
ReactiveStringRedisTemplate reactiveStringRedisTemplate = redisReactiveConfig.reactiveStringRedisTemplate(lettuceConnectionFactory);
reactiveStringRedisTemplate.opsForValue().set("hqw", "lancer").block();
String channel = "ch1";
reactiveStringRedisTemplate.listenToChannel(channel)
.doOnError(
t -> {
LOGGER.error("lsn channel {} error", channel, t);
}
)
.doOnSubscribe(
s -> {
LOGGER.info("success to lsn on {}", channel);
}
)
.doOnNext(
msg -> {
String message = msg.getMessage();
LOGGER.info("receive message: {}", message);
}
)
.subscribe();
Thread.currentThread().join();
}
}

View File

@@ -29,5 +29,6 @@
<AppenderRef ref="Console"/>
<!--<AppenderRef ref="KafkaAppender4monitor"/>-->
</Logger>
<Logger name="we" level="debug"/>
</Loggers>
</Configuration>

View File

@@ -25,6 +25,7 @@
<netty-tcnative.version>2.0.53.Final</netty-tcnative.version>
<spring-cloud.version>2.2.9.RELEASE</spring-cloud.version>
<snakeyaml.version>1.30</snakeyaml.version>
<spring-data-releasetrain.version>Moore-SR13</spring-data-releasetrain.version>
</properties>
<parent>