# 作为websocket服务端

返回:netty

# NettyServer

package com.ht.server.type;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/**
 * @author : htring
 * @packageName : com.ht.server.type
 * @description :
 * @date : 2021/8/12 16:12
 */
@Component
public class NettyServer {
    public void startServer() throws Exception {

        // netty基本操作,两个线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // netty的启动类
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    //记录日志的handler,netty自带的
                    .handler(new LoggingHandler())
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.SO_BACKLOG, 1024 * 1024 * 10)
                    .childHandler(new NettyChannelInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(9356)).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

# NettyChannelInitializer

package com.ht.server.type;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

/**
* @packageName : com.ht.server.type
* @description :
* @author : htring
* @date : 2021/8/12 17:10
*/
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        // websocket协议本身是基于Http协议的,所以需要Http解码器
        pipeline.addLast(new HttpServerCodec());
        // 以块的方式来写的处理器
        // 加入chunked 主要作用是支持异步发送的码流(大文件传输),但不专用过多的内存,防止java内存溢出
        pipeline.addLast(new ChunkedWriteHandler());
        // netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
        // 加入ObjectAggregator解码器,作用是他会把多个消息转换为单一的FullHttpRequest或者FullHttpResponse
        pipeline.addLast(new HttpObjectAggregator(1024 * 1024 * 1024));
        // 这个是websocket的handler,是netty提供的,也可以自定义,建议就用默认的
        pipeline.addLast(new WebSocketServerProtocolHandler(
                "/hello",
                null,
                true,
                65536));
        //自定义的handler,处理服务端传来的消息
        pipeline.addLast(new NettySocketHandler());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

# NettySocketHandler

package com.ht.server.type;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * @author : htring
 * @packageName : com.ht.server.type
 * @description :
 * @date : 2021/8/12 16:25
 */
public class NettySocketHandler extends SimpleChannelInboundHandler<Object> {

    public static ChannelGroup channelGroup;

    static {
        channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    }

    private static final ConcurrentMap<String, Channel> CHANNEL_MAP = new ConcurrentHashMap<>();

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端连接成功");
        channelGroup.add(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端关闭");
        channelGroup.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("连接异常");
        ctx.close();
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
        if (o instanceof TextWebSocketFrame) {
            TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) o;
//            sendAll();
            InetSocketAddress ipSocket = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
            String address = ipSocket.getAddress().getHostAddress();
            System.out.println("客户端连接地址为:" + address);
            System.out.println("收到客户端发来的消息:" + textWebSocketFrame.text());
            if (!CHANNEL_MAP.containsKey(address)) {
                CHANNEL_MAP.put(address, channelHandlerContext.channel());
            }
            if ("bye".equals(textWebSocketFrame.text())) {
                channelHandlerContext.channel().writeAndFlush(new CloseWebSocketFrame());
            } else {
                sendMessage(address, textWebSocketFrame.text());
            }
        }
        if (o instanceof BinaryWebSocketFrame) {
            BinaryWebSocketFrame binaryWebSocketFrame = (BinaryWebSocketFrame) o;
            System.out.println("收到二进制消息:" + binaryWebSocketFrame.content().readableBytes());
            BinaryWebSocketFrame sendFrame = new BinaryWebSocketFrame(Unpooled.buffer().writeBytes("你好".getBytes()));
            channelHandlerContext.channel().writeAndFlush(sendFrame);
        }
        if (o instanceof PongWebSocketFrame) {
            System.out.println("客户端ping成功");
        }
        if (o instanceof CloseWebSocketFrame) {
            System.out.println("客户端关闭");
            channelHandlerContext.channel().close().sync();
        }
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("等待连接");
    }

    public void sendMessage(String address, String text) {
        Channel channel = CHANNEL_MAP.get(address);
        channel.writeAndFlush(new TextWebSocketFrame("你好,这是专发消息"));
        channelGroup.writeAndFlush(new TextWebSocketFrame(text));
    }

    public void sendAll() {
        channelGroup.writeAndFlush(new TextWebSocketFrame("这是群发消息"));
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

# 启动类MyServerApplication

package com.ht.server;

import com.ht.server.type.NettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author htring
 */
@SpringBootApplication
public class MyServerApplication implements CommandLineRunner {

    private NettyServer nettyServer;

    public static void main(String[] args) {
        SpringApplication.run(MyServerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        this.nettyServer.startServer();
    }

    @Autowired
    public void setNettyServer(NettyServer nettyServer) {
        this.nettyServer = nettyServer;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30