web项目当遇上分布式,需要考虑各个服务应用管理的session如何让session里面的数据共享。这里可以使用redis作为应用服务的共享中间站。
public class StandardSession implements HttpSession, Session, Serializable
通过查看源代码发现web项目中的session是有实现serializable接口,所以我们可以通过将每个session对象序列化再存储到redis里面,因为各个应用服务都使用相同的redis,所以当请求过来的时候可以通过cookies里面的sessionID去redis里面反序列化回来session。达到共享session的效果。使用技术可以百度spring-session提供的技术来实现。这个是比较常用比较常见的一种session共享解决方案。
但是想象总是美好的。当我们使用websocket做简易聊天功能的时候,通过查看WebSocketSession发现该实现类是没有继承serializable,也就是说按照上面的方案是解决不了共享session的问题。
但是当集群情况出现的时候,每个节点存储的session各个都不相同。我们采用这种方案。
每个服务节点的WebSocketSession保存初次连接的session,当第一次访问在第一台服务器上面,并存储了他的session,在第二次访问nginx服务器被随机访问第二台节点服务器上面,在存储的Map里面是找不到对应的session,做相对于的推送消息,这个时候我们通过消息中间件rocketMQ的发布订阅的模式,将相对应的sessionID分发到各个节点上面,每个节点监听接受消息,找到对应session并将对应的消息推送回客户端。完成这种操作。
代码实现如下:
@Component
@RocketMQMessageListener(
topic = "haoke-im-send-message-topic",
selectorExpression = "SEND_MSG",
messageModel = MessageModel.BROADCASTING,
consumerGroup = "haoke-im-group"
)
public class MessageHandler extends TextWebSocketHandler implements RocketMQListener<String> {
@Autowired
private MessageDAO messageDAO;
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final Map<Long, WebSocketSession> SESSIONS = new HashMap<>();
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Long uid = (Long) session.getAttributes().get("uid");
// 将当前用户的session放置到map中,后面会使用相应的session通信
SESSIONS.put(uid, session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage
textMessage) throws Exception {
Long uid = (Long) session.getAttributes().get("uid");
JsonNode jsonNode = MAPPER.readTree(textMessage.getPayload());
Long toId = jsonNode.get("toId").asLong();
String msg = jsonNode.get("msg").asText();
Message message = Message.builder()
.from(UserData.USER_MAP.get(uid))
.to(UserData.USER_MAP.get(toId))
.msg(msg)
.build();
// 将消息保存到MongoDB
message = this.messageDAO.saveMessage(message);
String msgJson = MAPPER.writeValueAsString(message);
// 判断to用户是否在线
WebSocketSession toSession = SESSIONS.get(toId);
if (toSession != null && toSession.isOpen()) {
//TODO 具体格式需要和前端对接
toSession.sendMessage(new TextMessage(msgJson));
// 更新消息状态为已读
this.messageDAO.updateMessageState(message.getId(), 2);
}else{
// 该用户可能下线,可能在其他的节点中,发送消息到MQ系统
// 需求:添加tag,便于消费者对消息的筛选
this.rocketMQTemplate.convertAndSend("haoke-im-send-message-topic:SEND_MSG", msgJson);
}
}
@Override
public void onMessage(String msg) {
try {
JsonNode jsonNode = MAPPER.readTree(msg);
long toId = jsonNode.get("to").get("id").longValue();
// 判断to用户是否在线
WebSocketSession toSession = SESSIONS.get(toId);
if (toSession != null && toSession.isOpen()) {
//TODO 具体格式需要和前端对接
toSession.sendMessage(new TextMessage(msg));
// 更新消息状态为已读
this.messageDAO.updateMessageState(new ObjectId(jsonNode.get("id").asText()), 2);
}else{
// 不需要做处理
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
|
|