利用 redis Pus/Sub 来解决集群模式下 websocket 的推送问题

This commit is contained in:
b2baccline
2021-01-12 19:06:49 +08:00
parent 3b7224b57a
commit 4abfa33e75
12 changed files with 255 additions and 45 deletions

View File

@@ -1,5 +1,6 @@
package com.hccake.ballcat.admin.modules.notify.push;
import cn.hutool.core.util.StrUtil;
import com.hccake.ballcat.admin.constants.NotifyChannel;
import com.hccake.ballcat.admin.modules.notify.model.domain.NotifyInfo;
import com.hccake.ballcat.admin.modules.sys.model.entity.SysUser;
@@ -32,7 +33,7 @@ public class MailNotifyPusher implements NotifyPusher {
@Override
public void push(NotifyInfo notifyInfo, List<SysUser> userList) {
String[] emails = userList.stream().map(SysUser::getEmail).toArray(String[]::new);
String[] emails = userList.stream().map(SysUser::getEmail).filter(StrUtil::isNotBlank).toArray(String[]::new);
// 密送群发,不展示其他收件人
MailDetails mailDetails = new MailDetails();

View File

@@ -1,5 +1,6 @@
package com.hccake.ballcat.admin.modules.notify.push;
import cn.hutool.core.util.StrUtil;
import com.hccake.ballcat.admin.constants.NotifyChannel;
import com.hccake.ballcat.admin.modules.notify.model.domain.NotifyInfo;
import com.hccake.ballcat.admin.modules.sys.model.entity.SysUser;
@@ -30,7 +31,8 @@ public class SmsNotifyPusher implements NotifyPusher {
@Override
public void push(NotifyInfo notifyInfo, List<SysUser> userList) {
List<String> phoneList = userList.stream().map(SysUser::getPhone).collect(Collectors.toList());
List<String> phoneList = userList.stream().map(SysUser::getPhone).filter(StrUtil::isNotBlank)
.collect(Collectors.toList());
// 短信文本去除 html 标签
String content = HtmlUtil.toText(notifyInfo.getContent());
// TODO 对接短信发送平台

View File

@@ -0,0 +1,63 @@
package com.hccake.ballcat.admin.websocket;
import com.hccake.ballcat.admin.websocket.distribute.MessageDistributor;
import com.hccake.ballcat.admin.websocket.distribute.RedisMessageDistributor;
import com.hccake.ballcat.admin.websocket.distribute.RedisWebsocketMessageListener;
import com.hccake.ballcat.admin.websocket.user.UserAttributeHandshakeInterceptor;
import com.hccake.ballcat.admin.websocket.user.UserSessionKeyGenerator;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
* @author Hccake 2021/1/5
* @version 1.0
*/
@Configuration
@RequiredArgsConstructor
public class AdminWebSocketConfiguration {
@Bean
@ConditionalOnMissingBean(UserAttributeHandshakeInterceptor.class)
public HandshakeInterceptor authenticationHandshakeInterceptor() {
return new UserAttributeHandshakeInterceptor();
}
@Bean
@ConditionalOnMissingBean(UserSessionKeyGenerator.class)
public UserSessionKeyGenerator userSessionKeyGenerator() {
return new UserSessionKeyGenerator();
}
@Bean
@ConditionalOnMissingBean(MessageDistributor.class)
public RedisMessageDistributor messageDistributor(StringRedisTemplate stringRedisTemplate) {
return new RedisMessageDistributor(stringRedisTemplate);
}
@Bean
@ConditionalOnBean(RedisMessageDistributor.class)
public RedisWebsocketMessageListener redisWebsocketMessageDelegate(StringRedisTemplate stringRedisTemplate) {
return new RedisWebsocketMessageListener(stringRedisTemplate);
}
@Bean
@ConditionalOnBean(RedisMessageDistributor.class)
@ConditionalOnMissingBean(RedisMessageListenerContainer.class)
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,
RedisWebsocketMessageListener redisWebsocketMessageListener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(redisWebsocketMessageListener,
new PatternTopic(RedisWebsocketMessageListener.CHANNEL));
return container;
}
}

View File

@@ -1,31 +0,0 @@
package com.hccake.ballcat.admin.websocket.config;
import com.hccake.ballcat.admin.websocket.user.UserAttributeHandshakeInterceptor;
import com.hccake.ballcat.admin.websocket.user.UserSessionKeyGenerator;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.HandshakeInterceptor;
/**
* @author Hccake 2021/1/5
* @version 1.0
*/
@Configuration
@RequiredArgsConstructor
public class AdminWebSocketConfiguration {
@Bean
@ConditionalOnMissingBean(UserAttributeHandshakeInterceptor.class)
public HandshakeInterceptor authenticationHandshakeInterceptor() {
return new UserAttributeHandshakeInterceptor();
}
@Bean
@ConditionalOnMissingBean(UserSessionKeyGenerator.class)
public UserSessionKeyGenerator userSessionKeyGenerator() {
return new UserSessionKeyGenerator();
}
}

View File

@@ -0,0 +1,20 @@
package com.hccake.ballcat.admin.websocket.distribute;
/**
* 本地消息分发,直接进行发送
*
* @author Hccake 2021/1/12
* @version 1.0
*/
public class LocalMessageDistributor implements MessageDistributor, MessageSender {
/**
* 消息分发
* @param messageDO 发送的消息
*/
@Override
public void distribute(MessageDO messageDO) {
doSend(messageDO);
}
}

View File

@@ -0,0 +1,31 @@
package com.hccake.ballcat.admin.websocket.distribute;
import lombok.Data;
import lombok.experimental.Accessors;
import java.util.List;
/**
* @author Hccake 2021/1/12
* @version 1.0
*/
@Data
@Accessors(chain = true)
public class MessageDO {
/**
* 是否广播
*/
private Boolean needBroadcast;
/**
* sessionKeys
*/
private List<Object> sessionKeys;
/**
* 需要发送的消息文本
*/
private String messageText;
}

View File

@@ -0,0 +1,17 @@
package com.hccake.ballcat.admin.websocket.distribute;
/**
* 消息分发器
*
* @author Hccake 2021/1/12
* @version 1.0
*/
public interface MessageDistributor {
/**
* 消息分发
* @param messageDO 发送的消息
*/
void distribute(MessageDO messageDO);
}

View File

@@ -0,0 +1,34 @@
package com.hccake.ballcat.admin.websocket.distribute;
import cn.hutool.core.collection.CollectionUtil;
import com.hccake.ballcat.common.websocket.WebSocketMessageSender;
import java.util.List;
/**
* @author Hccake 2021/1/12
* @version 1.0
*/
public interface MessageSender {
/**
* 发送消息
* @param messageDO 发送的消息
*/
default void doSend(MessageDO messageDO) {
Boolean needBroadcast = messageDO.getNeedBroadcast();
String messageText = messageDO.getMessageText();
List<Object> sessionKeys = messageDO.getSessionKeys();
if (needBroadcast != null && needBroadcast) {
// 广播信息
WebSocketMessageSender.broadcast(messageText);
}
else if (CollectionUtil.isNotEmpty(sessionKeys)) {
// 指定用户发送
for (Object sessionKey : sessionKeys) {
WebSocketMessageSender.send(sessionKey, messageText);
}
}
}
}

View File

@@ -1,4 +1,4 @@
package com.hccake.ballcat.admin.websocket;
package com.hccake.ballcat.admin.websocket.distribute;
import com.hccake.ballcat.admin.modules.notify.event.AnnouncementCloseEvent;
import com.hccake.ballcat.admin.modules.notify.event.StationNotifyPushEvent;
@@ -12,7 +12,6 @@ import com.hccake.ballcat.admin.websocket.message.AnnouncementCloseMessage;
import com.hccake.ballcat.admin.websocket.message.AnnouncementPushMessage;
import com.hccake.ballcat.admin.websocket.message.DictChangeMessage;
import com.hccake.ballcat.common.core.util.JacksonUtils;
import com.hccake.ballcat.common.websocket.WebSocketMessageSender;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
@@ -33,9 +32,11 @@ public class PushEventListener {
private final UserAnnouncementService userAnnouncementService;
private final MessageDistributor messageDistributor;
/**
* 字典修改事件监听
* @param event the DictChangeEvent
* @param event the `DictChangeEvent`
*/
@Async
@EventListener(DictChangeEvent.class)
@@ -46,7 +47,8 @@ public class PushEventListener {
String msg = JacksonUtils.toJson(dictChangeMessage);
// 广播修改信息
WebSocketMessageSender.broadcast(msg);
MessageDO messageDO = new MessageDO().setMessageText(msg).setNeedBroadcast(true);
messageDistributor.distribute(messageDO);
}
/**
@@ -62,7 +64,8 @@ public class PushEventListener {
String msg = JacksonUtils.toJson(message);
// 广播修改信息
WebSocketMessageSender.broadcast(msg);
MessageDO messageDO = new MessageDO().setMessageText(msg).setNeedBroadcast(true);
messageDistributor.distribute(messageDO);
}
/**
@@ -88,16 +91,20 @@ public class PushEventListener {
String msg = JacksonUtils.toJson(message);
List<UserAnnouncement> userAnnouncements = new ArrayList<>();
List<Object> sessionKeys = new ArrayList<>();
// 向指定用户推送
for (SysUser sysUser : userList) {
Integer userId = sysUser.getUserId();
boolean send = WebSocketMessageSender.send(userId, msg);
if (send) {
UserAnnouncement userAnnouncement = userAnnouncementService.prodUserAnnouncement(userId,
announcementNotifyInfo.getId());
userAnnouncements.add(userAnnouncement);
}
sessionKeys.add(userId);
UserAnnouncement userAnnouncement = userAnnouncementService.prodUserAnnouncement(userId,
announcementNotifyInfo.getId());
userAnnouncements.add(userAnnouncement);
}
MessageDO messageDO = new MessageDO().setMessageText(msg).setSessionKeys(sessionKeys)
.setNeedBroadcast(false);
messageDistributor.distribute(messageDO);
userAnnouncementService.saveBatch(userAnnouncements);
}
}

View File

@@ -0,0 +1,28 @@
package com.hccake.ballcat.admin.websocket.distribute;
import com.hccake.ballcat.common.core.util.JacksonUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* 消息分发器
*
* @author Hccake 2021/1/12
* @version 1.0
*/
@RequiredArgsConstructor
public class RedisMessageDistributor implements MessageDistributor {
private final StringRedisTemplate stringRedisTemplate;
/**
* 消息分发
* @param messageDO 发送的消息
*/
@Override
public void distribute(MessageDO messageDO) {
String str = JacksonUtils.toJson(messageDO);
stringRedisTemplate.convertAndSend(RedisWebsocketMessageListener.CHANNEL, str);
}
}

View File

@@ -0,0 +1,38 @@
package com.hccake.ballcat.admin.websocket.distribute;
import com.hccake.ballcat.common.core.util.JacksonUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.RedisSerializer;
/**
* redis订阅 websocket 发送消息,接收到消息时进行推送
*
* @author Hccake 2021/1/12
* @version 1.0
*/
@RequiredArgsConstructor
public class RedisWebsocketMessageListener implements MessageListener, MessageSender {
public final static String CHANNEL = "websocket-send";
private final StringRedisTemplate stringRedisTemplate;
@Override
public void onMessage(Message message, byte[] bytes) {
byte[] channelBytes = message.getChannel();
RedisSerializer<String> stringSerializer = stringRedisTemplate.getStringSerializer();
String channel = stringSerializer.deserialize(channelBytes);
// 这里没有使用通配符所以一定是true
if (CHANNEL.equals(channel)) {
byte[] bodyBytes = message.getBody();
String body = stringSerializer.deserialize(bodyBytes);
MessageDO messageDO = JacksonUtils.toObj(body, MessageDO.class);
doSend(messageDO);
}
}
}