4、Netty心跳检测机制

烟雨 4年前 (2021-10-29) 阅读数 483 #Netty
文章标签 Netty
所谓心跳,即在TCP长连接中,客户端和服务器之间定期发送的一种特殊的数据包,通知对方自己还在线,以确保TCP连接的有效性。
在Netty中,实现心跳机制的关键是IdleStateHandler类,看下它的构造器:
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}
三个参数的含义:
  • readerIdleTimeSeconds: 读超时. 即当在指定的时间间隔内没有从Channel读取到数据时, 会触发一个READER_IDLE的 IdleStateEvent事件。

  • writerIdleTimeSeconds: 写超时. 即当在指定的时间间隔内没有数据写入到Channel时, 会触发一个 WRITER_IDLE的 IdleStateEvent 事件。

  • allIdleTimeSeconds: 读/写超时。即当在指定的时间间隔内没有读或写操作时, 会触发一个ALL_IDLE的IdleStateEvent事件。

注:这三个参数默认的时间单位是秒。若需要指定其他时间单位,可以使用另一个构造方法:
IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

要实现Netty服务端心跳检测机制需要在服务器端的ChannelInitializer中加入如下的代码:

 pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
Netty心跳检测代码示例:
服务端代码
public static void main(String[] args) throws Exception {
    EventLoopGroup boss = new NioEventLoopGroup();
    EventLoopGroup worker = new NioEventLoopGroup();
    try {
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss, worker)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast("decoder", new StringDecoder());
                    pipeline.addLast("encoder", new StringEncoder());
                    //IdleStateHandler的readerIdleTime参数指定超过3秒还没收到客户端的连接,
                    //会触发IdleStateEvent事件并且交给下一个handler处理,下一个handler必须
                    //实现userEventTriggered方法处理对应事件
                    pipeline.addLast(new IdleStateHandler(3, 0, 0, TimeUnit.SECONDS));
                    pipeline.addLast(new HeartBeatServerHandler());
                }
            });
        System.out.println("netty server start。。");
        ChannelFuture future = bootstrap.bind(8080).sync();
        future.channel().closeFuture().sync();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        worker.shutdownGracefully();
        boss.shutdownGracefully();
    }
}

// HeartBeatServerHandler
public class HeartBeatServerHandler extends SimpleChannelInboundHandler<String> {
    int readIdleTimes = 0;
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println("服务端收到的消息:" + s);
        if ("test".equals(s)) {
            ctx.channel().writeAndFlush("ok");
        } else {
            System.out.println("其他信息处理...");
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent) evt;
        String eventType = null;
        switch (event.state()) {
            case READER_IDLE:
                eventType = "读空闲";
                readIdleTimes++; // 读空闲的计数加1
                break;
            case WRITER_IDLE:
                eventType = "写空闲";
                // 不处理
                break;
            case ALL_IDLE:
                eventType = "读写空闲";
                // 不处理
                break;
        }
        System.out.println(ctx.channel().remoteAddress() + ":超时事件:" + eventType);
        if (readIdleTimes > 3) {
            System.out.println(" [server]读空闲超过3次,关闭连接,释放更多资源");
            ctx.channel().writeAndFlush("close");
            ctx.channel().close();
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println(ctx.channel().remoteAddress() + " 已上线!");
    }
}

客户端

public static void main(String[] args) throws Exception {
    EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    try {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    pipeline.addLast("decoder", new StringDecoder());
                    pipeline.addLast("encoder", new StringEncoder());
                    pipeline.addLast(new HeartBeatClientHandler());
                }
            });

        System.out.println("netty client start。。");
        Channel channel = bootstrap.connect("127.0.0.1", 8080).sync().channel();
        String text = "test";
        Random random = new Random();
        while (channel.isActive()) {
            int num = random.nextInt(10);
            Thread.sleep(num * 1000);
            channel.writeAndFlush(text);
        }
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        eventLoopGroup.shutdownGracefully();
    }
}

//HeartBeatClientHandler
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("客户端收到的消息::" + msg);
        if (msg != null && msg.equals("close")) {
            System.out.println("服务端关闭连接后,客户端也关闭");
            ctx.channel().closeFuture();
        }
    }
}

image.png

image.png

Netty断线自动重连实现

/**
 * 实现了重连的客户端
 */
public class NettyClient {
    private String host;
    private int port;
    private Bootstrap bootstrap;
    private EventLoopGroup group;

    public static void main(String[] args) throws Exception {
        NettyClient nettyClient = new NettyClient("localhost", 9000);
        nettyClient.connect();
    }

    public NettyClient(String host, int port) {
        this.host = host;
        this.port = port;
        init();
    }

    private void init() {
        //客户端需要一个事件循环组
        group = new NioEventLoopGroup();
        //创建客户端启动对象
        // bootstrap 可重用, 只需在NettyClient实例化的时候初始化即可.
        bootstrap = new Bootstrap();
        bootstrap.group(group)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //加入处理器
                        ch.pipeline().addLast(new NettyClientHandler(NettyClient.this));
                    }
                });
    }

    public void connect() throws Exception {
        System.out.println("netty client start。。");
        //启动客户端去连接服务器端
        ChannelFuture cf = bootstrap.connect(host, port);
        cf.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    //重连交给后端线程执行
                    future.channel().eventLoop().schedule(() -> {
                        System.err.println("重连服务端...");
                        try {
                            connect();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }, 3000, TimeUnit.MILLISECONDS);
                } else {
                    System.out.println("服务端连接成功...");
                }
            }
        });
        //对通道关闭进行监听
        cf.channel().closeFuture().sync();
    }
}

// NettyClientHandler
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    private NettyClient nettyClient;  
    public NettyClientHandler(NettyClient nettyClient) {
        this.nettyClient = nettyClient;
    }

    /**
     * 当客户端连接服务器完成就会触发该方法
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("HelloServer".getBytes(CharsetUtil.UTF_8));
        ctx.writeAndFlush(buf);
    }

    //当通道有读取事件时会触发,即服务端发送数据给客户端
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        System.out.println("收到服务端的消息:" + buf.toString(CharsetUtil.UTF_8));
        System.out.println("服务端的地址: " + ctx.channel().remoteAddress());
    }

    // channel 处于不活动状态时调用
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("运行中断开重连。。。");
        nettyClient.connect();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}


版权声明

非特殊说明,本文由Zender原创或收集发布,欢迎转载。

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

作者文章
热门