# 作为websocket服务端
# NettyServer
package com.ht.server.type;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
/**
* @author : htring
* @packageName : com.ht.server.type
* @description :
* @date : 2021/8/12 16:12
*/
@Component
public class NettyServer {
public void startServer() throws Exception {
// netty基本操作,两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// netty的启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//记录日志的handler,netty自带的
.handler(new LoggingHandler())
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024 * 1024 * 10)
.childHandler(new NettyChannelInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(9356)).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# NettyChannelInitializer
package com.ht.server.type;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @packageName : com.ht.server.type
* @description :
* @author : htring
* @date : 2021/8/12 17:10
*/
public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// websocket协议本身是基于Http协议的,所以需要Http解码器
pipeline.addLast(new HttpServerCodec());
// 以块的方式来写的处理器
// 加入chunked 主要作用是支持异步发送的码流(大文件传输),但不专用过多的内存,防止java内存溢出
pipeline.addLast(new ChunkedWriteHandler());
// netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
// 加入ObjectAggregator解码器,作用是他会把多个消息转换为单一的FullHttpRequest或者FullHttpResponse
pipeline.addLast(new HttpObjectAggregator(1024 * 1024 * 1024));
// 这个是websocket的handler,是netty提供的,也可以自定义,建议就用默认的
pipeline.addLast(new WebSocketServerProtocolHandler(
"/hello",
null,
true,
65536));
//自定义的handler,处理服务端传来的消息
pipeline.addLast(new NettySocketHandler());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# NettySocketHandler
package com.ht.server.type;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @author : htring
* @packageName : com.ht.server.type
* @description :
* @date : 2021/8/12 16:25
*/
public class NettySocketHandler extends SimpleChannelInboundHandler<Object> {
public static ChannelGroup channelGroup;
static {
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
private static final ConcurrentMap<String, Channel> CHANNEL_MAP = new ConcurrentHashMap<>();
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端连接成功");
channelGroup.add(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端关闭");
channelGroup.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("连接异常");
ctx.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
if (o instanceof TextWebSocketFrame) {
TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) o;
// sendAll();
InetSocketAddress ipSocket = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
String address = ipSocket.getAddress().getHostAddress();
System.out.println("客户端连接地址为:" + address);
System.out.println("收到客户端发来的消息:" + textWebSocketFrame.text());
if (!CHANNEL_MAP.containsKey(address)) {
CHANNEL_MAP.put(address, channelHandlerContext.channel());
}
if ("bye".equals(textWebSocketFrame.text())) {
channelHandlerContext.channel().writeAndFlush(new CloseWebSocketFrame());
} else {
sendMessage(address, textWebSocketFrame.text());
}
}
if (o instanceof BinaryWebSocketFrame) {
BinaryWebSocketFrame binaryWebSocketFrame = (BinaryWebSocketFrame) o;
System.out.println("收到二进制消息:" + binaryWebSocketFrame.content().readableBytes());
BinaryWebSocketFrame sendFrame = new BinaryWebSocketFrame(Unpooled.buffer().writeBytes("你好".getBytes()));
channelHandlerContext.channel().writeAndFlush(sendFrame);
}
if (o instanceof PongWebSocketFrame) {
System.out.println("客户端ping成功");
}
if (o instanceof CloseWebSocketFrame) {
System.out.println("客户端关闭");
channelHandlerContext.channel().close().sync();
}
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("等待连接");
}
public void sendMessage(String address, String text) {
Channel channel = CHANNEL_MAP.get(address);
channel.writeAndFlush(new TextWebSocketFrame("你好,这是专发消息"));
channelGroup.writeAndFlush(new TextWebSocketFrame(text));
}
public void sendAll() {
channelGroup.writeAndFlush(new TextWebSocketFrame("这是群发消息"));
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# 启动类MyServerApplication
package com.ht.server;
import com.ht.server.type.NettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @author htring
*/
@SpringBootApplication
public class MyServerApplication implements CommandLineRunner {
private NettyServer nettyServer;
public static void main(String[] args) {
SpringApplication.run(MyServerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
this.nettyServer.startServer();
}
@Autowired
public void setNettyServer(NettyServer nettyServer) {
this.nettyServer = nettyServer;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30