作为websocket客户端
返回:netty
抽象类AbstractWebSocketClient
package com.fourfaith.dataServer.communicate.socket;
import com.fourfaith.common.tools.exception.BusinessException;
import com.fourfaith.common.tools.utils.StringUtils;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* * .::::.
* * .::::::::.
* * ::::::::::: 码中自有黄金屋
* * ..:::::::::::' 码中自有颜如玉
* * '::::::::::::'
* * .::::::::::
* * '::::::::::::::..
* * ..::::::::::::.
* * ``::::::::::::::::
* * ::::``:::::::::' .:::.
* * ::::' ':::::' .::::::::.
* * .::::' :::: .:::::::'::::.
* * .:::' ::::: .:::::::::' ':::::.
* * .::' :::::.:::::::::' ':::::.
* * .::' ::::::::::::::' ``::::.
* * ...::: ::::::::::::' ``::.
* * ```` ':. ':::::::::' ::::..
* * '.:::::' ':'````..
*
* @author : htring
* @packageName : com.fourfaith.dataServer.communicate.socket
* @description :
* @date : 2021/8/12 10:55
*/
public abstract class AbstractWebSocketClient implements Closeable {
private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketClient.class);
/**
* 接收响应超时时间(单位:s)
*/
private final int connectionTimeout;
/**
* 任务上下文
*/
protected WebSocketContext webSocketContext;
public AbstractWebSocketClient(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
this.webSocketContext = new WebSocketContext(new CountDownLatch(1));
}
/**
* 发送消息
*
* @param message 消息内容
*/
public void write(String message) {
Channel channel = getChannel();
if (Objects.nonNull(channel)) {
channel.writeAndFlush(new TextWebSocketFrame(message));
}
throw new BusinessException("01", "连接已经关闭");
}
public void connect() throws BusinessException {
try {
doOpen();
doConnect();
} catch (Exception e) {
log.error(e.getMessage(), e);
throw new BusinessException("连接失败,原因:" + e.getMessage());
}
}
/**
* 接收消息.<br>
*
* @return:
*/
public String receiveResult() throws BusinessException {
this.receive(this.webSocketContext.getCountDownLatch());
if (StringUtils.isBlank(this.webSocketContext.getResult())) {
throw new BusinessException("未获取到任务结果信息");
}
return this.webSocketContext.getResult();
}
/**
* 接收消息封装.<br>
*
* @param countDownLatch 计数器
* @return:
*/
private void receive(CountDownLatch countDownLatch) throws BusinessException {
boolean waitFlag = false;
try {
waitFlag = countDownLatch.await(connectionTimeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.info("此连接未接收到响应信息");
Thread.currentThread().interrupt();
}
if (!waitFlag) {
log.error("Timeout({}}s) when receiving response message", connectionTimeout);
throw new BusinessException("此连接未接收到响应信息");
}
}
/**
* 获取本次连接的Channel
*
* @return Channel
*/
protected abstract Channel getChannel ();
/**
* 初始化连接
*/
protected abstract void doOpen ();
/**
* 建立连接
*
* @throws BusinessException
*/
protected abstract void doConnect () throws BusinessException;
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*
* <p> As noted in {@link AutoCloseable#close()}, cases where the
* close may fail require careful attention. It is strongly advised
* to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
* the {@code IOException}.
*
* @throws IOException if an I/O error occurs
*/
@Override
public abstract void close ();
}
DefaultWebSocketClient
package com.fourfaith.dataServer.communicate.socket;
import com.fourfaith.common.tools.exception.BusinessException;
import com.fourfaith.common.tools.utils.StringUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;
/**
* * .::::.
* * .::::::::.
* * ::::::::::: 码中自有黄金屋
* * ..:::::::::::' 码中自有颜如玉
* * '::::::::::::'
* * .::::::::::
* * '::::::::::::::..
* * ..::::::::::::.
* * ``::::::::::::::::
* * ::::``:::::::::' .:::.
* * ::::' ':::::' .::::::::.
* * .::::' :::: .:::::::'::::.
* * .:::' ::::: .:::::::::' ':::::.
* * .::' :::::.:::::::::' ':::::.
* * .::' ::::::::::::::' ``::::.
* * ...::: ::::::::::::' ``::.
* * ```` ':. ':::::::::' ::::..
* * '.:::::' ':'````..
*
* @author : htring
* @packageName : com.fourfaith.dataServer.communicate.socket
* @description :
* @date : 2021/8/12 11:12
*/
public class DefaultWebSocketClient extends AbstractWebSocketClient {
private static final Logger log = LoggerFactory.getLogger(DefaultWebSocketClient.class);
protected static final String WSS_PREFIX = "wss";
protected static final String WS_PREFIX = "ws";
protected final URI uri;
protected final int port;
private Bootstrap bootstrap;
private DefaultWebSocketClientHandler defaultWebSocketClientHandler;
protected Channel channel;
public DefaultWebSocketClient(String url, int connectionTimeout) throws URISyntaxException, BusinessException {
super(connectionTimeout);
this.uri = new URI(url);
this.port = this.getPort();
}
protected int getPort() throws BusinessException {
int port = this.uri.getPort();
if (port == -1) {
String scheme = uri.getScheme();
if (WSS_PREFIX.equals(scheme)) {
return 443;
} else if (WS_PREFIX.equals(scheme)) {
return 80;
} else {
throw new BusinessException("unknown scheme: " + scheme);
}
}
return port;
}
protected SslContext getSslContext() {
SslContext sslContext = null;
try {
if (WSS_PREFIX.equals(this.uri.getScheme())) {
sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
}
} catch (SSLException e) {
log.error("ssl连接失败: {}", e.getMessage(), e);
}
return sslContext;
}
/**
* 获取本次连接的Channel
*
* @return Channel
*/
@Override
protected Channel getChannel() {
return this.channel;
}
/**
* 初始化连接
*/
@Override
protected void doOpen() {
HttpHeaders headers = new DefaultHttpHeaders();
// headers.add("authorization", "bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySW5mbyI6eyJzdWJzeXNJZCI6ImY3MWNmZDMyMWE1NTQ4ODc5MDM0ZjUxN2RmODczMjdhIiwic3Vic3lzTmFtZSI6IuWAvOWuiOW5s-WPsCIsInByb2plY3RObyI6IkJKU1gwMDEiLCJwaG9uZSI6IjE4ODg4ODg4ODg4Iiwicm9sZXMiOiJ7fSIsIm5hbWUiOiLpobnnm67nrqHnkIblkZgiLCJhbGlhcyI6IkJKU1gwMDFfMTg4ODg4ODg4ODgiLCJpZCI6IjVlMzVkYzAyZTIyNDQ4ZGNhOWMzNDM2MzI2OTU2ZDQxIiwicHJvamVjdE5hbWUiOiLljJfkuqzlm5vkv6HmtojpmLLlgLzlrojlubPlj7AiLCJwcm9qZWN0SWQiOiIyN2YxZDcyMS0xNzEyLTExZWEtOGZiMi0wMjQyYWMxMTAwMDciLCJ1c2VyR3JvdXAiOiIxIn0sInVzZXJfbmFtZSI6IjVlMzVkYzAyZTIyNDQ4ZGNhOWMzNDM2MzI2OTU2ZDQxIiwic2NvcGUiOlsid3JpdGUiXSwiZXhwIjoxNjQ0MjUzNDA1LCJhdXRob3JpdGllcyI6WyJ7fSJdLCJqdGkiOiIwdlZGNldTSnlXWURhRzN0TDFRSG9RN2lWb0EiLCJjbGllbnRfaWQiOiJmZl9jbGllbnQifQ.l-FB8Gv9SEXP4xlifeXlB096PRmqHFePbPj24y2ylNo");
// headers.add("Content-Type", "application/json");
WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(this.uri,
WebSocketVersion.V13,
null,
true,
headers);
this.defaultWebSocketClientHandler = new DefaultWebSocketClientHandler(webSocketClientHandshaker,
this.webSocketContext,
this.getSslContext());
this.bootstrap = new Bootstrap();
bootstrap.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new DefaultWebSocketChannelInitializer(defaultWebSocketClientHandler, this.uri.getHost(), this.port));
}
/**
* 建立连接
*
* @throws BusinessException
*/
@Override
protected void doConnect() throws BusinessException {
try {
this.channel = this.bootstrap.connect(this.uri.getHost(), this.port).sync().channel();
this.defaultWebSocketClientHandler.getHandshakeFuture().sync();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String msg = bufferedReader.readLine();
if (StringUtils.isBlank(msg)) {
break;
}
// else if("bye".equalsIgnoreCase(msg)){
// this.channel.writeAndFlush(msg);
// } else if ("ping".equalsIgnoreCase(msg)){
// WebSocketFrame webSocketFrame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[]{8, 1, 8, 1}));
// this.channel.writeAndFlush(webSocketFrame);
// } else {
WebSocketFrame webSocketFrame = new TextWebSocketFrame(msg);
this.channel.writeAndFlush(webSocketFrame);
// }
}
} catch (InterruptedException | IOException e) {
log.error("websocket连接发生异常:{}", e.getMessage(), e);
Thread.currentThread().interrupt();
}
}
/**
* Closes this stream and releases any system resources associated
* with it. If the stream is already closed then invoking this
* method has no effect.
*
* <p> As noted in {@link AutoCloseable#close()}, cases where the
* close may fail require careful attention. It is strongly advised
* to relinquish the underlying resources and to internally
* <em>mark</em> the {@code Closeable} as closed, prior to throwing
* the {@code IOException}.
*/
@Override
public void close() {
if (Objects.nonNull(this.channel)) {
this.channel.close();
}
}
public boolean isOpen() {
return this.channel.isOpen();
}
}
DefaultWebSocketClientHandler
package com.fourfaith.dataServer.communicate.socket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.ssl.SslContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
* * .::::.
* * .::::::::.
* * ::::::::::: 码中自有黄金屋
* * ..:::::::::::' 码中自有颜如玉
* * '::::::::::::'
* * .::::::::::
* * '::::::::::::::..
* * ..::::::::::::.
* * ``::::::::::::::::
* * ::::``:::::::::' .:::.
* * ::::' ':::::' .::::::::.
* * .::::' :::: .:::::::'::::.
* * .:::' ::::: .:::::::::' ':::::.
* * .::' :::::.:::::::::' ':::::.
* * .::' ::::::::::::::' ``::::.
* * ...::: ::::::::::::' ``::.
* * ```` ':. ':::::::::' ::::..
* * '.:::::' ':'````..
*
* @author : htring
* @packageName : com.fourfaith.dataServer.communicate.socket
* @description : 处理
* @date : 2021/8/12 11:17
*/
public class DefaultWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger log = LoggerFactory.getLogger(DefaultWebSocketClientHandler.class);
protected final WebSocketClientHandshaker webSocketClientHandshaker;
private ChannelPromise handshakeFuture;
protected final WebSocketContext webSocketContext;
protected Channel channel;
private final SslContext sslContext;
/**
*
*/
public DefaultWebSocketClientHandler(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketContext webSocketContext, SslContext sslContext) {
this.webSocketClientHandshaker = webSocketClientHandshaker;
this.webSocketContext = webSocketContext;
this.sslContext = sslContext;
}
public ChannelPromise getHandshakeFuture() {
return this.handshakeFuture;
}
/**
* ChannelHandler添加到实际上下文中准备处理事件,调用此方法
*
* @param ctx
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
handshakeFuture = ctx.newPromise();
}
/**
* 当客户端主动链接服务端的链接后,调用此方法
*
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
this.channel = ctx.channel();
webSocketClientHandshaker.handshake(channel);
log.info("建立连接");
}
/**
* 链接断开后,调用此方法
*
* @param ctx
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
log.info("连接断开");
}
/**
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
/**
* 接收消息
*
* @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
* belongs to
* @param msg the message to handle
* @throws Exception is thrown if an error occurred
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!this.webSocketClientHandshaker.isHandshakeComplete()) {
this.handleHttpRequest(msg);
log.info("websocket已经建立连接");
return;
}
if (msg instanceof FullHttpResponse) {
FullHttpResponse fullHttpResponse = (FullHttpResponse) msg;
throw new IllegalStateException("Unexpected FullHttpResponse (getStatus="
+ fullHttpResponse.status()
+ ", content="
+ fullHttpResponse.content().toString(StandardCharsets.UTF_8) + ')');
}
this.handleWebSocketFrame(msg);
}
protected void handleHttpRequest(Object msg) {
webSocketClientHandshaker.finishHandshake(channel, (FullHttpResponse) msg);
handshakeFuture.setSuccess();
}
/**
* 处理文本帧请求.<br>
*
* @param msg:
* @return:
*/
protected void handleWebSocketFrame(Object msg) throws Exception {
WebSocketFrame frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
// ...自定义
// if (textFrame.text().contains("ff")) {
System.out.println(textFrame.text() + "--------------");
this.webSocketContext.setResult(textFrame.text());
this.webSocketContext.getCountDownLatch().countDown();
// }
} else if (frame instanceof PongWebSocketFrame) {
System.out.println("收到ping指令");
} else if (frame instanceof CloseWebSocketFrame) {
log.info("连接收到关闭帧");
this.channel.eventLoop().shutdownGracefully();
this.channel.close().sync();
}
}
/**
* 运行过程中未捕获的异常,调用此方法
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("发生异常->{}", cause.getMessage(), cause);
}
public SslContext getSslContext() {
return this.sslContext;
}
}
DefaultWebSocketChannelInitializer
package com.fourfaith.dataServer.communicate.socket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;
import java.util.Objects;
/**
* * .::::.
* * .::::::::.
* * ::::::::::: 码中自有黄金屋
* * ..:::::::::::' 码中自有颜如玉
* * '::::::::::::'
* * .::::::::::
* * '::::::::::::::..
* * ..::::::::::::.
* * ``::::::::::::::::
* * ::::``:::::::::' .:::.
* * ::::' ':::::' .::::::::.
* * .::::' :::: .:::::::'::::.
* * .:::' ::::: .:::::::::' ':::::.
* * .::' :::::.:::::::::' ':::::.
* * .::' ::::::::::::::' ``::::.
* * ...::: ::::::::::::' ``::.
* * ```` ':. ':::::::::' ::::..
* * '.:::::' ':'````..
*
* @author : htring
* @packageName : com.fourfaith.dataServer.communicate.socket
* @description :
* @date : 2021/8/12 11:51
*/
public class DefaultWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
private final DefaultWebSocketClientHandler handler;
private final String host;
private final int port;
public DefaultWebSocketChannelInitializer(DefaultWebSocketClientHandler handler, String host, int port) {
this.handler = handler;
this.host = host;
this.port = port;
}
/**
*
*/
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline channelPipeline = ch.pipeline();
SslContext sslContext = this.handler.getSslContext();
if (Objects.nonNull(sslContext)) {
channelPipeline.addLast(sslContext.newHandler(ch.alloc(), host, port));
}
channelPipeline.addLast(new HttpClientCodec());
channelPipeline.addLast(new HttpObjectAggregator(1024 * 1024 * 512));
channelPipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
channelPipeline.addLast(handler);
}
}
WebSocketContext相关上下文可有可无
package com.fourfaith.dataServer.communicate.socket;
import java.io.Serializable;
import java.util.concurrent.CountDownLatch;
/**
* * .::::.
* * .::::::::.
* * ::::::::::: 码中自有黄金屋
* * ..:::::::::::' 码中自有颜如玉
* * '::::::::::::'
* * .::::::::::
* * '::::::::::::::..
* * ..::::::::::::.
* * ``::::::::::::::::
* * ::::``:::::::::' .:::.
* * ::::' ':::::' .::::::::.
* * .::::' :::: .:::::::'::::.
* * .:::' ::::: .:::::::::' ':::::.
* * .::' :::::.:::::::::' ':::::.
* * .::' ::::::::::::::' ``::::.
* * ...::: ::::::::::::' ``::.
* * ```` ':. ':::::::::' ::::..
* * '.:::::' ':'````..
*
* @author : htring
* @packageName : com.fourfaith.dataServer.communicate.socket
* @description :
* @date : 2021/8/12 11:07
*/
public class WebSocketContext implements Serializable {
private static final long serialVersionUID = 1L;
private CountDownLatch countDownLatch;
private String result;
public WebSocketContext(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public CountDownLatch getCountDownLatch() {
return countDownLatch;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
启动
DefaultWebSocketClient defaultWebSocketClient = new DefaultWebSocketClient("ws://es.sunshy.cn:8089/?userGuid=20210813095117260_9f46d7f0bdec4c3fb934a0391b4b811c&onlineGuid=fd17b0b268dc45b9b7d6316a1be4bc73&token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJVc2VyR3VpZCI6IjIwMjEwODEzMDk1MTE3MjYwXzlmNDZkN2YwYmRlYzRjM2ZiOTM0YTAzOTFiNGI4MTFjIiwiVXNlck5hbWUiOiLmtYvor5UwODEyIiwiUm9sZUNvZGUiOiI5OSIsIk9ubGluZUd1aWQiOiJmZDE3YjBiMjY4ZGM0NWI5YjdkNjMxNmExYmU0YmM3MyIsIm5iZiI6MTYyODgzODgyNywiZXhwIjoxNjI4ODQyNDI3LCJpc3MiOiJHZWxhbmRlIiwiYXVkIjoiR0VTTVAifQ.QdhcrwgwAozp3H_ZZGJF1yIukQIfHDaLRewZlxCIOyE&subscribe=true",15);
defaultWebSocketClient.connect();