作为websocket客户端

返回:netty

抽象类AbstractWebSocketClient

package com.fourfaith.dataServer.communicate.socket;

import com.fourfaith.common.tools.exception.BusinessException;
import com.fourfaith.common.tools.utils.StringUtils;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * *                    .::::.
 * *                  .::::::::.
 * *                 :::::::::::  码中自有黄金屋
 * *             ..:::::::::::'  码中自有颜如玉
 * *           '::::::::::::'
 * *             .::::::::::
 * *        '::::::::::::::..
 * *             ..::::::::::::.
 * *           ``::::::::::::::::
 * *            ::::``:::::::::'        .:::.
 * *           ::::'   ':::::'       .::::::::.
 * *         .::::'      ::::     .:::::::'::::.
 * *        .:::'       :::::  .:::::::::' ':::::.
 * *       .::'        :::::.:::::::::'      ':::::.
 * *      .::'         ::::::::::::::'         ``::::.
 * *  ...:::           ::::::::::::'              ``::.
 * * ```` ':.          ':::::::::'                  ::::..
 * *                    '.:::::'                    ':'````..
 *
 * @author : htring
 * @packageName : com.fourfaith.dataServer.communicate.socket
 * @description :
 * @date : 2021/8/12 10:55
 */

public abstract class AbstractWebSocketClient implements Closeable {

    private static final Logger log = LoggerFactory.getLogger(AbstractWebSocketClient.class);

    /**
     * 接收响应超时时间(单位:s)
     */
    private final int connectionTimeout;
    /**
     * 任务上下文
     */
    protected WebSocketContext webSocketContext;

    public AbstractWebSocketClient(int connectionTimeout) {
        this.connectionTimeout = connectionTimeout;
        this.webSocketContext = new WebSocketContext(new CountDownLatch(1));
    }

    /**
     * 发送消息
     *
     * @param message 消息内容
     */
    public void write(String message) {
        Channel channel = getChannel();
        if (Objects.nonNull(channel)) {
            channel.writeAndFlush(new TextWebSocketFrame(message));
        }
        throw new BusinessException("01", "连接已经关闭");
    }

    public void connect() throws BusinessException {
        try {
            doOpen();
            doConnect();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
            throw new BusinessException("连接失败,原因:" + e.getMessage());
        }
    }


    /**
     * 接收消息.<br>
     *
     * @return:
     */
    public String receiveResult() throws BusinessException {
        this.receive(this.webSocketContext.getCountDownLatch());
        if (StringUtils.isBlank(this.webSocketContext.getResult())) {
            throw new BusinessException("未获取到任务结果信息");
        }
        return this.webSocketContext.getResult();
    }


    /**
     * 接收消息封装.<br>
     *
     * @param countDownLatch 计数器
     * @return:
     */
    private void receive(CountDownLatch countDownLatch) throws BusinessException {
        boolean waitFlag = false;
        try {
            waitFlag = countDownLatch.await(connectionTimeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.info("此连接未接收到响应信息");
            Thread.currentThread().interrupt();
        }
        if (!waitFlag) {
            log.error("Timeout({}}s) when receiving response message", connectionTimeout);
            throw new BusinessException("此连接未接收到响应信息");
        }
    }

        /**
         * 获取本次连接的Channel
         *
         * @return Channel
         */
        protected abstract Channel getChannel ();

        /**
         * 初始化连接
         */
        protected abstract void doOpen ();

        /**
         * 建立连接
         *
         * @throws BusinessException
         */
        protected abstract void doConnect () throws BusinessException;

        /**
         * Closes this stream and releases any system resources associated
         * with it. If the stream is already closed then invoking this
         * method has no effect.
         *
         * <p> As noted in {@link AutoCloseable#close()}, cases where the
         * close may fail require careful attention. It is strongly advised
         * to relinquish the underlying resources and to internally
         * <em>mark</em> the {@code Closeable} as closed, prior to throwing
         * the {@code IOException}.
         *
         * @throws IOException if an I/O error occurs
         */
        @Override
        public abstract void close ();
    }

DefaultWebSocketClient

package com.fourfaith.dataServer.communicate.socket;

import com.fourfaith.common.tools.exception.BusinessException;
import com.fourfaith.common.tools.utils.StringUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Objects;

/**
 * *                    .::::.
 * *                  .::::::::.
 * *                 :::::::::::  码中自有黄金屋
 * *             ..:::::::::::'  码中自有颜如玉
 * *           '::::::::::::'
 * *             .::::::::::
 * *        '::::::::::::::..
 * *             ..::::::::::::.
 * *           ``::::::::::::::::
 * *            ::::``:::::::::'        .:::.
 * *           ::::'   ':::::'       .::::::::.
 * *         .::::'      ::::     .:::::::'::::.
 * *        .:::'       :::::  .:::::::::' ':::::.
 * *       .::'        :::::.:::::::::'      ':::::.
 * *      .::'         ::::::::::::::'         ``::::.
 * *  ...:::           ::::::::::::'              ``::.
 * * ```` ':.          ':::::::::'                  ::::..
 * *                    '.:::::'                    ':'````..
 *
 * @author : htring
 * @packageName : com.fourfaith.dataServer.communicate.socket
 * @description :
 * @date : 2021/8/12 11:12
 */
public class DefaultWebSocketClient extends AbstractWebSocketClient {
    private static final Logger log = LoggerFactory.getLogger(DefaultWebSocketClient.class);
    protected static final String WSS_PREFIX = "wss";
    protected static final String WS_PREFIX = "ws";

    protected final URI uri;
    protected final int port;
    private Bootstrap bootstrap;
    private DefaultWebSocketClientHandler defaultWebSocketClientHandler;
    protected Channel channel;

    public DefaultWebSocketClient(String url, int connectionTimeout) throws URISyntaxException, BusinessException {
        super(connectionTimeout);
        this.uri = new URI(url);
        this.port = this.getPort();
    }


    protected int getPort() throws BusinessException {
        int port = this.uri.getPort();
        if (port == -1) {
            String scheme = uri.getScheme();
            if (WSS_PREFIX.equals(scheme)) {
                return 443;
            } else if (WS_PREFIX.equals(scheme)) {
                return 80;
            } else {
                throw new BusinessException("unknown scheme: " + scheme);
            }
        }
        return port;
    }

    protected SslContext getSslContext() {
        SslContext sslContext = null;
        try {
            if (WSS_PREFIX.equals(this.uri.getScheme())) {
                sslContext = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
            }
        } catch (SSLException e) {
            log.error("ssl连接失败: {}", e.getMessage(), e);
        }
        return sslContext;
    }

    /**
     * 获取本次连接的Channel
     *
     * @return Channel
     */
    @Override
    protected Channel getChannel() {
        return this.channel;
    }

    /**
     * 初始化连接
     */
    @Override
    protected void doOpen() {
        HttpHeaders headers = new DefaultHttpHeaders();
//        headers.add("authorization", "bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySW5mbyI6eyJzdWJzeXNJZCI6ImY3MWNmZDMyMWE1NTQ4ODc5MDM0ZjUxN2RmODczMjdhIiwic3Vic3lzTmFtZSI6IuWAvOWuiOW5s-WPsCIsInByb2plY3RObyI6IkJKU1gwMDEiLCJwaG9uZSI6IjE4ODg4ODg4ODg4Iiwicm9sZXMiOiJ7fSIsIm5hbWUiOiLpobnnm67nrqHnkIblkZgiLCJhbGlhcyI6IkJKU1gwMDFfMTg4ODg4ODg4ODgiLCJpZCI6IjVlMzVkYzAyZTIyNDQ4ZGNhOWMzNDM2MzI2OTU2ZDQxIiwicHJvamVjdE5hbWUiOiLljJfkuqzlm5vkv6HmtojpmLLlgLzlrojlubPlj7AiLCJwcm9qZWN0SWQiOiIyN2YxZDcyMS0xNzEyLTExZWEtOGZiMi0wMjQyYWMxMTAwMDciLCJ1c2VyR3JvdXAiOiIxIn0sInVzZXJfbmFtZSI6IjVlMzVkYzAyZTIyNDQ4ZGNhOWMzNDM2MzI2OTU2ZDQxIiwic2NvcGUiOlsid3JpdGUiXSwiZXhwIjoxNjQ0MjUzNDA1LCJhdXRob3JpdGllcyI6WyJ7fSJdLCJqdGkiOiIwdlZGNldTSnlXWURhRzN0TDFRSG9RN2lWb0EiLCJjbGllbnRfaWQiOiJmZl9jbGllbnQifQ.l-FB8Gv9SEXP4xlifeXlB096PRmqHFePbPj24y2ylNo");
//        headers.add("Content-Type", "application/json");
        WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(this.uri,
                WebSocketVersion.V13,
                null,
                true,
                headers);
        this.defaultWebSocketClientHandler = new DefaultWebSocketClientHandler(webSocketClientHandshaker,
                this.webSocketContext,
                this.getSslContext());
        this.bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup())
                .channel(NioSocketChannel.class)
                .handler(new DefaultWebSocketChannelInitializer(defaultWebSocketClientHandler, this.uri.getHost(), this.port));
    }

    /**
     * 建立连接
     *
     * @throws BusinessException
     */
    @Override
    protected void doConnect() throws BusinessException {
        try {
            this.channel = this.bootstrap.connect(this.uri.getHost(), this.port).sync().channel();
            this.defaultWebSocketClientHandler.getHandshakeFuture().sync();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            while (true) {
                String msg = bufferedReader.readLine();
                if (StringUtils.isBlank(msg)) {
                    break;
                }
//                else if("bye".equalsIgnoreCase(msg)){
//                    this.channel.writeAndFlush(msg);
//                } else if ("ping".equalsIgnoreCase(msg)){
//                    WebSocketFrame webSocketFrame = new PingWebSocketFrame(Unpooled.wrappedBuffer(new byte[]{8, 1, 8, 1}));
//                    this.channel.writeAndFlush(webSocketFrame);
//                } else {
                WebSocketFrame webSocketFrame = new TextWebSocketFrame(msg);
                this.channel.writeAndFlush(webSocketFrame);
//                }
            }
        } catch (InterruptedException | IOException e) {
            log.error("websocket连接发生异常:{}", e.getMessage(), e);
            Thread.currentThread().interrupt();
        }
    }

    /**
     * Closes this stream and releases any system resources associated
     * with it. If the stream is already closed then invoking this
     * method has no effect.
     *
     * <p> As noted in {@link AutoCloseable#close()}, cases where the
     * close may fail require careful attention. It is strongly advised
     * to relinquish the underlying resources and to internally
     * <em>mark</em> the {@code Closeable} as closed, prior to throwing
     * the {@code IOException}.
     */
    @Override
    public void close() {
        if (Objects.nonNull(this.channel)) {
            this.channel.close();
        }
    }

    public boolean isOpen() {
        return this.channel.isOpen();
    }

}

DefaultWebSocketClientHandler

package com.fourfaith.dataServer.communicate.socket;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.handler.ssl.SslContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;

/**
 * *                    .::::.
 * *                  .::::::::.
 * *                 :::::::::::  码中自有黄金屋
 * *             ..:::::::::::'  码中自有颜如玉
 * *           '::::::::::::'
 * *             .::::::::::
 * *        '::::::::::::::..
 * *             ..::::::::::::.
 * *           ``::::::::::::::::
 * *            ::::``:::::::::'        .:::.
 * *           ::::'   ':::::'       .::::::::.
 * *         .::::'      ::::     .:::::::'::::.
 * *        .:::'       :::::  .:::::::::' ':::::.
 * *       .::'        :::::.:::::::::'      ':::::.
 * *      .::'         ::::::::::::::'         ``::::.
 * *  ...:::           ::::::::::::'              ``::.
 * * ```` ':.          ':::::::::'                  ::::..
 * *                    '.:::::'                    ':'````..
 *
 * @author : htring
 * @packageName : com.fourfaith.dataServer.communicate.socket
 * @description : 处理
 * @date : 2021/8/12 11:17
 */
public class DefaultWebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
    private static final Logger log = LoggerFactory.getLogger(DefaultWebSocketClientHandler.class);
    protected final WebSocketClientHandshaker webSocketClientHandshaker;
    private ChannelPromise handshakeFuture;
    protected final WebSocketContext webSocketContext;
    protected Channel channel;
    private final SslContext sslContext;

    /**
     *
     */
    public DefaultWebSocketClientHandler(WebSocketClientHandshaker webSocketClientHandshaker, WebSocketContext webSocketContext, SslContext sslContext) {
        this.webSocketClientHandshaker = webSocketClientHandshaker;
        this.webSocketContext = webSocketContext;
        this.sslContext = sslContext;
    }

    public ChannelPromise getHandshakeFuture() {
        return this.handshakeFuture;
    }

    /**
     * ChannelHandler添加到实际上下文中准备处理事件,调用此方法
     *
     * @param ctx
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        handshakeFuture = ctx.newPromise();
    }

    /**
     * 当客户端主动链接服务端的链接后,调用此方法
     *
     * @param ctx
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.channel = ctx.channel();
        webSocketClientHandshaker.handshake(channel);
        log.info("建立连接");
    }

    /**
     * 链接断开后,调用此方法
     *
     * @param ctx
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("连接断开");
    }

    /**
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    /**
     * 接收消息
     *
     * @param ctx the {@link ChannelHandlerContext} which this {@link SimpleChannelInboundHandler}
     *            belongs to
     * @param msg the message to handle
     * @throws Exception is thrown if an error occurred
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!this.webSocketClientHandshaker.isHandshakeComplete()) {
            this.handleHttpRequest(msg);
            log.info("websocket已经建立连接");
            return;
        }
        if (msg instanceof FullHttpResponse) {
            FullHttpResponse fullHttpResponse = (FullHttpResponse) msg;
            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus="
                    + fullHttpResponse.status()
                    + ", content="
                    + fullHttpResponse.content().toString(StandardCharsets.UTF_8) + ')');
        }
        this.handleWebSocketFrame(msg);
    }

    protected void handleHttpRequest(Object msg) {
        webSocketClientHandshaker.finishHandshake(channel, (FullHttpResponse) msg);
        handshakeFuture.setSuccess();
    }


    /**
     * 处理文本帧请求.<br>
     *
     * @param msg:
     * @return:
     */
    protected void handleWebSocketFrame(Object msg) throws Exception {
        WebSocketFrame frame = (WebSocketFrame) msg;
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            // ...自定义
//            if (textFrame.text().contains("ff")) {
            System.out.println(textFrame.text() + "--------------");
            this.webSocketContext.setResult(textFrame.text());
            this.webSocketContext.getCountDownLatch().countDown();
//            }
        } else if (frame instanceof PongWebSocketFrame) {
            System.out.println("收到ping指令");
        } else if (frame instanceof CloseWebSocketFrame) {
            log.info("连接收到关闭帧");
            this.channel.eventLoop().shutdownGracefully();
            this.channel.close().sync();
        }
    }

    /**
     * 运行过程中未捕获的异常,调用此方法
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("发生异常->{}", cause.getMessage(), cause);
    }

    public SslContext getSslContext() {
        return this.sslContext;
    }
}

DefaultWebSocketChannelInitializer

package com.fourfaith.dataServer.communicate.socket;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
import io.netty.handler.ssl.SslContext;

import java.util.Objects;

/**
 * *                    .::::.
 * *                  .::::::::.
 * *                 :::::::::::  码中自有黄金屋
 * *             ..:::::::::::'  码中自有颜如玉
 * *           '::::::::::::'
 * *             .::::::::::
 * *        '::::::::::::::..
 * *             ..::::::::::::.
 * *           ``::::::::::::::::
 * *            ::::``:::::::::'        .:::.
 * *           ::::'   ':::::'       .::::::::.
 * *         .::::'      ::::     .:::::::'::::.
 * *        .:::'       :::::  .:::::::::' ':::::.
 * *       .::'        :::::.:::::::::'      ':::::.
 * *      .::'         ::::::::::::::'         ``::::.
 * *  ...:::           ::::::::::::'              ``::.
 * * ```` ':.          ':::::::::'                  ::::..
 * *                    '.:::::'                    ':'````..
 *
 * @author : htring
 * @packageName : com.fourfaith.dataServer.communicate.socket
 * @description :
 * @date : 2021/8/12 11:51
 */
public class DefaultWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {

    private final DefaultWebSocketClientHandler handler;

    private final String host;
    private final int port;

    public DefaultWebSocketChannelInitializer(DefaultWebSocketClientHandler handler, String host, int port) {
        this.handler = handler;
        this.host = host;
        this.port = port;
    }

    /**
     *
     */
    @Override
    protected void initChannel(SocketChannel ch) {
        ChannelPipeline channelPipeline = ch.pipeline();
        SslContext sslContext = this.handler.getSslContext();
        if (Objects.nonNull(sslContext)) {
            channelPipeline.addLast(sslContext.newHandler(ch.alloc(), host, port));
        }
        channelPipeline.addLast(new HttpClientCodec());
        channelPipeline.addLast(new HttpObjectAggregator(1024 * 1024 * 512));
        channelPipeline.addLast(WebSocketClientCompressionHandler.INSTANCE);
        channelPipeline.addLast(handler);
    }
}

WebSocketContext相关上下文可有可无

package com.fourfaith.dataServer.communicate.socket;

import java.io.Serializable;
import java.util.concurrent.CountDownLatch;

/**
 * *                    .::::.
 * *                  .::::::::.
 * *                 :::::::::::  码中自有黄金屋
 * *             ..:::::::::::'  码中自有颜如玉
 * *           '::::::::::::'
 * *             .::::::::::
 * *        '::::::::::::::..
 * *             ..::::::::::::.
 * *           ``::::::::::::::::
 * *            ::::``:::::::::'        .:::.
 * *           ::::'   ':::::'       .::::::::.
 * *         .::::'      ::::     .:::::::'::::.
 * *        .:::'       :::::  .:::::::::' ':::::.
 * *       .::'        :::::.:::::::::'      ':::::.
 * *      .::'         ::::::::::::::'         ``::::.
 * *  ...:::           ::::::::::::'              ``::.
 * * ```` ':.          ':::::::::'                  ::::..
 * *                    '.:::::'                    ':'````..
 *
 * @author : htring
 * @packageName : com.fourfaith.dataServer.communicate.socket
 * @description :
 * @date : 2021/8/12 11:07
 */
public class WebSocketContext implements Serializable {
    private static final long serialVersionUID = 1L;

    private CountDownLatch countDownLatch;
    private String result;

    public WebSocketContext(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public CountDownLatch getCountDownLatch() {
        return countDownLatch;
    }

    public void setCountDownLatch(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }

    public String getResult() {
        return result;
    }

    public void setResult(String result) {
        this.result = result;
    }
}

启动

DefaultWebSocketClient defaultWebSocketClient = new DefaultWebSocketClient("ws://es.sunshy.cn:8089/?userGuid=20210813095117260_9f46d7f0bdec4c3fb934a0391b4b811c&onlineGuid=fd17b0b268dc45b9b7d6316a1be4bc73&token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJVc2VyR3VpZCI6IjIwMjEwODEzMDk1MTE3MjYwXzlmNDZkN2YwYmRlYzRjM2ZiOTM0YTAzOTFiNGI4MTFjIiwiVXNlck5hbWUiOiLmtYvor5UwODEyIiwiUm9sZUNvZGUiOiI5OSIsIk9ubGluZUd1aWQiOiJmZDE3YjBiMjY4ZGM0NWI5YjdkNjMxNmExYmU0YmM3MyIsIm5iZiI6MTYyODgzODgyNywiZXhwIjoxNjI4ODQyNDI3LCJpc3MiOiJHZWxhbmRlIiwiYXVkIjoiR0VTTVAifQ.QdhcrwgwAozp3H_ZZGJF1yIukQIfHDaLRewZlxCIOyE&subscribe=true",15);
defaultWebSocketClient.connect();