Springboot+Netty+Websocket Implementation Message Push Instance

  • 2021-08-12 02:57:50
  • OfStack

Directory foreword 1. Introducing netty dependency 2. Using steps

Preface

WebSocket makes data exchange between the client and the server easier, allowing the server to actively push data to the client. In WebSocket API, the browser and server only need to complete one handshake, and they can directly create a persistent connection and transmit data in both directions.

Advantages of Netty Framework

1. API is simple to use and has low development threshold;
2. Powerful, preset a variety of codec functions and support a variety of mainstream protocols;
3. Strong customization ability, which can flexibly expand the communication framework through ChannelHandler;
4. High performance. Compared with other mainstream NIO frameworks in the industry, Netty has the best comprehensive performance;
5. Mature and stable, Netty fixes all JDK, NIO and BUG that have been discovered, and business developers no longer need to worry about BUG of NIO

Tip: The following is the main content of this article, and the following cases can be used for reference

1. Introducing netty dependency


<dependency>
   <groupId>io.netty</groupId>
   <artifactId>netty-all</artifactId>
   <version>4.1.48.Final</version>
</dependency>

2. Use steps

1. Introducing basic configuration classes


package com.test.netty;

public enum Cmd {
 START("000", " Connection succeeded "),
 WMESSAGE("001", " Message alert "),
 ;
 private String cmd;
 private String desc;

 Cmd(String cmd, String desc) {
  this.cmd = cmd;
  this.desc = desc;
 }

 public String getCmd() {
  return cmd;
 }

 public String getDesc() {
  return desc;
 }
}

2. netty service startup listener


package com.test.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

/**
 * @author test
 * <p>
 *  Service startup listener 
 **/
@Slf4j
@Component
public class NettyServer {

 @Value("${server.netty.port}")
 private int port;

 @Autowired
 private ServerChannelInitializer serverChannelInitializer;

 @Bean
 ApplicationRunner nettyRunner() {
  return args -> {
   //new 1 Main thread groups 
   EventLoopGroup bossGroup = new NioEventLoopGroup(1);
   //new 1 Worker thread groups 
   EventLoopGroup workGroup = new NioEventLoopGroup();
   ServerBootstrap bootstrap = new ServerBootstrap()
     .group(bossGroup, workGroup)
     .channel(NioServerSocketChannel.class)
     .childHandler(serverChannelInitializer)
     // Setting the Queue Size 
     .option(ChannelOption.SO_BACKLOG, 1024)
     //  When there is no data communication within two hours, ,TCP Will be sent automatically 1 Activity detection data messages 
     .childOption(ChannelOption.SO_KEEPALIVE, true);
   // Binding port , Begin to receive incoming connections 
   try {
    ChannelFuture future = bootstrap.bind(port).sync();
    log.info(" Server Startup Start Listening Port : {}", port);
    future.channel().closeFuture().sync();
   } catch (InterruptedException e) {
    e.printStackTrace();
   } finally {
    // Close the main thread group 
    bossGroup.shutdownGracefully();
    // Close the worker thread group 
    workGroup.shutdownGracefully();
   }
  };
 }
}

3. netty server-side processor


package com.test.netty;

import com.test.common.util.JsonUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.net.URLDecoder;
import java.util.*;

/**
 * @author test
 * <p>
 * netty Server processor 
 **/
@Slf4j
@Component
@ChannelHandler.Sharable
public class NettyServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

 @Autowired
 private ServerChannelCache cache;
 private static final String dataKey = "test=";

 @Data
 public static class ChannelCache {
 }


 /**
  *  Client connection triggers 
  */
 @Override
 public void channelActive(ChannelHandlerContext ctx) throws Exception {
  Channel channel = ctx.channel();
  log.info(" The channel connection is open, ID->{}......", channel.id().asLongText());
 }

 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
   Channel channel = ctx.channel();
   WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
   String requestUri = handshakeComplete.requestUri();
   requestUri = URLDecoder.decode(requestUri, "UTF-8");
   log.info("HANDSHAKE_COMPLETE , ID->{} , URI->{}", channel.id().asLongText(), requestUri);
   String socketKey = requestUri.substring(requestUri.lastIndexOf(dataKey) + dataKey.length());
   if (socketKey.length() > 0) {
    cache.add(socketKey, channel);
    this.send(channel, Cmd.DOWN_START, null);
   } else {
    channel.disconnect();
    ctx.close();
   }
  }
  super.userEventTriggered(ctx, evt);
 }

 @Override
 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  Channel channel = ctx.channel();
  log.info(" The channel connection has been disconnected, ID->{} , users ID->{}......", channel.id().asLongText(), cache.getCacheId(channel));
  cache.remove(channel);
 }

 /**
  *  An abnormal trigger occurs 
  */
 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  Channel channel = ctx.channel();
  log.error(" An exception occurred in the connection, ID->{} , users ID->{} , anomaly ->{}......", channel.id().asLongText(), cache.getCacheId(channel), cause.getMessage(), cause);
  cache.remove(channel);
  ctx.close();
 }

 /**
  *  A message sent by the client triggers 
  */
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  try {
   // log.info(" Receive a message sent by the client: {}", msg.text());
   ctx.channel().writeAndFlush(new TextWebSocketFrame(JsonUtil.toString(Collections.singletonMap("cmd", "100"))));
  } catch (Exception e) {
   log.error(" Message handling exception: {}", e.getMessage(), e);
  }
 }

 public void send(Cmd cmd, String id, Object obj) {
  HashMap<String, Channel> channels = cache.get(id);
  if (channels == null) {
   return;
  }
  Map<String, Object> data = new LinkedHashMap<>();
  data.put("cmd", cmd.getCmd());
  data.put("data", obj);
  String msg = JsonUtil.toString(data);
  log.info(" The server sends a message : {}", msg);
  channels.values().forEach(channel -> {
   channel.writeAndFlush(new TextWebSocketFrame(msg));
  });
 }

 public void send(Channel channel, Cmd cmd, Object obj) {
  Map<String, Object> data = new LinkedHashMap<>();
  data.put("cmd", cmd.getCmd());
  data.put("data", obj);
  String msg = JsonUtil.toString(data);
  log.info(" The server sends a message : {}", msg);
  channel.writeAndFlush(new TextWebSocketFrame(msg));
 }

}

4. netty server-side cache class


package com.test.netty;

import io.netty.channel.Channel;
import io.netty.util.AttributeKey;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class ServerChannelCache {
 private static final ConcurrentHashMap<String, HashMap<String, Channel>> CACHE_MAP = new ConcurrentHashMap<>();
 private static final AttributeKey<String> CHANNEL_ATTR_KEY = AttributeKey.valueOf("test");

 public String getCacheId(Channel channel) {
  return channel.attr(CHANNEL_ATTR_KEY).get();
 }

 public void add(String cacheId, Channel channel) {
  channel.attr(CHANNEL_ATTR_KEY).set(cacheId);
  HashMap<String, Channel> hashMap = CACHE_MAP.get(cacheId);
  if (hashMap == null) {
   hashMap = new HashMap<>();
  }
  hashMap.put(channel.id().asShortText(), channel);
  CACHE_MAP.put(cacheId, hashMap);
 }

 public HashMap<String, Channel> get(String cacheId) {
  if (cacheId == null) {
   return null;
  }
  return CACHE_MAP.get(cacheId);
 }

 public void remove(Channel channel) {
  String cacheId = getCacheId(channel);
  if (cacheId == null) {
   return;
  }
  HashMap<String, Channel> hashMap = CACHE_MAP.get(cacheId);
  if (hashMap == null) {
   hashMap = new HashMap<>();
  }
  hashMap.remove(channel.id().asShortText());
  CACHE_MAP.put(cacheId, hashMap);
 }
}

5. netty service initializer


package com.test.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author test
 * <p>
 * netty Service initializer 
 **/
@Component
public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

 @Autowired
 private NettyServerHandler nettyServerHandler;

 @Override
 protected void initChannel(SocketChannel socketChannel) throws Exception {
  ChannelPipeline pipeline = socketChannel.pipeline();
  pipeline.addLast(new HttpServerCodec());
  pipeline.addLast(new ChunkedWriteHandler());
  pipeline.addLast(new HttpObjectAggregator(8192));
  pipeline.addLast(new WebSocketServerProtocolHandler("/test.io", true, 5000));
  pipeline.addLast(nettyServerHandler);
 }
}

6. html test


<!DOCTYPE HTML>
<html>
 <head>
 <meta charset="utf-8">
 <title>test</title>
 
  <script type="text/javascript">
   function WebSocketTest()
   {
   if ("WebSocket" in window)
   {
    alert(" Your browser supports  WebSocket!");
    
    //  Open 1 A  web socket
    var ws = new WebSocket("ws://localhost:port/test.io");
    
    ws.onopen = function()
    {
     // Web Socket  Connected, use the  send()  Method to send data 
     ws.send(" Send data ");
     alert(" Data is being sent ...");
    };
    
    ws.onmessage = function (evt) 
    { 
     var received_msg = evt.data;
     alert(" Data received ...");
    };
    
    ws.onclose = function()
    { 
     //  Shut down  websocket
     alert(" Connection closed ..."); 
    };
   }
   
   else
   {
    //  Browser does not support  WebSocket
    alert(" Your browser does not support  WebSocket!");
   }
   }
  </script>
  
 </head>
 <body>
 
  <div id="sse">
   <a href="javascript:WebSocketTest()" rel="external nofollow" > Run  WebSocket</a>
  </div>
  
 </body>
</html>

7. vue test


mounted() {
   this.initWebsocket();
  },
  methods: {
   initWebsocket() {
    let websocket = new WebSocket('ws://localhost:port/test.io?test=123456');
    websocket.onmessage = (event) => {
     let msg = JSON.parse(event.data);
     switch (msg.cmd) {
      case "000":
       this.$message({
        type: 'success',
        message: " Establish real-time connection successfully! ",
        duration: 1000
       })
       setInterval(()=>{websocket.send("heartbeat")},60*1000);
       break;
      case "001":
       this.$message.warning(" Received 1 Please check the new information in time! ")
       break;
     }
    }
    websocket.onclose = () => {
     setTimeout(()=>{
      this.initWebsocket();
     },30*1000);
    }
    websocket.onerror = () => {
     setTimeout(()=>{
      this.initWebsocket();
     },30*1000);
    }
   },
  },
![ Insert picture description here ](https://img-blog.csdnimg.cn/20210107160420568.jpg?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3d1X3Fpbmdfc29uZw==,size_16,color_FFFFFF,t_70#pic_center)

8. Server sends messages


@Autowired
	private NettyServerHandler nettyServerHandler;
nettyServerHandler.send(CmdWeb.WMESSAGE, id, message);

Related articles: