Spring 集成 Netty,如何返回低字节等数据

Spring 集成 Netty ——TCP 协议——传参JSON

1.引入依赖

在这里插入图片描述

2. 创建启动类

/**
 * netty的server
 *
 */
@Slf4j
public class BootNettyServer {

    // 用于存储通道,给指定的通道发送消息,建议使用线程安全的ConcurrentHashMap,或者使用Netty 自带的ChannelGroup 来存储通道
    // 不过使用ConcurrentHashMap 获取通道更便捷,但是需要手动的去添加通道,删除通道
    // ChannelGroup 是Netty 专门提供来关系通道的集合类,可以自动处理通道的添加,删除,需要手动的地方更少
    public static final Map<String, ChannelVO> contextMap = new ConcurrentHashMap<>();

    /**
     * port 端口号
     * netty 启动需要占用一个独立的端口
     * */
	public void bind(int port) throws Exception {
        /**
         * 配置服务端的NIO线程组
         * NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
         * bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
         * bossGroup接收到连接后就会把连接信息注册到workerGroup
         * workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍
         */
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
        	/**
        	 * ServerBootstrap 是一个启动NIO服务的辅助启动类
        	 */
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            /**
             * 设置group,将bossGroup, workerGroup线程组传递到ServerBootstrap
             */
            serverBootstrap = serverBootstrap.group(bossGroup, workerGroup);
            /**
             * ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接,这里告诉Channel通过NioServerSocketChannel获取新的连接
             */
            serverBootstrap = serverBootstrap.channel(NioServerSocketChannel.class);
            /**
             * option是设置 bossGroup,childOption是设置workerGroup
             * netty 默认数据包传输大小为1024字节, 设置它可以自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费    最小  初始化  最大 (根据生产环境实际情况来定)
             * 使用对象池,重用缓冲区
             */
//            serverBootstrap = serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
//            serverBootstrap = serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
            /**
             * 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
             */
            serverBootstrap = serverBootstrap.childHandler(new BootNettyChannelInitializer());
            log.info("netty 启动成功!");
            /**
             * 绑定端口,同步等待成功
             */
            ChannelFuture f = serverBootstrap.bind(port).sync();
            /**
             * 等待服务器监听端口关闭
             */
            f.channel().closeFuture().sync();

        } catch (InterruptedException e) {

        } finally {
        	/**
        	 * 退出,释放线程池资源
        	 */
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

	}
}

3.创建自定义通道

import com.alibaba.fastjson.JSONObject;
import com.daniu.DaNiuMiniProgramApplication;
import com.daniu.config.RedisConfig;
import com.daniu.modules.entity.MessageTake;
import com.daniu.modules.entity.dto.*;
import com.daniu.modules.entity.vo.*;
import com.daniu.modules.redisUtils.RedisUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;

/**
 *
 * I/O数据读写处理类
 * 1. 该方法为单例方法,不能注入Bean 到其中,并且Netty 是Nio,也就是Io 多路复用,所以在此方法中不建议写过的的业务代码,不然处理时间过长,会导致消息延迟,或者导致硬件消息重试
 * 2. 该类问多例,不能使用注入Bean 的方式来进行注入,最好是将Bean 注入到静态类中来使用
 *
 * 问题1的解决方案:
 * 1. 使用MQ的方式,让其他的消费者来处理   最优方式
 * 2. 使用Redis 的消息订阅来进行处理,但是Redis 的消息通道是有内存到大小的,记得设置大小,不过如果数据量特别大也可以额导致消息延迟或者丢失   如果硬件不多地情况下,可以使用该方法,
 * 3. 使用线程池的方式  不建议使用该方法,如果数据量较大,线程池任务太多,可能会导致数据丢失(具体看拒绝策略),并且还会产生消息多路复用
 */
@Slf4j
public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {

    /**
     * 设备Id
     */
    public String id;
    /**
     * 保证设备断开 没有特殊心跳
     * 第一次心跳机制同步一次数据
     */
    public Boolean isConnect = false;
    /**
     * 每个系统的连接参数
     * 设置以为唯一的UUID,因为多线程的原因导致发送到未断掉的设备,因为设备断开后,会重新连接,所以需要重新生成一个
     * */
    public String code = UUID.randomUUID().toString();

    /**
     * 每二十次心跳同步心跳
     */
    public Integer frequencySynchronization = DaNiuMiniProgramApplication.FIXED_FREQUENC;



    /**
     * 从客户端收到新的数据时,这个方法会在收到消息时被调用
     *
     * @param ctx 通道
     * @param msg 接收的参数
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
       	// 该方法需要根据设备的编号为主建
       	// 具体方式要实际操作
       	//给硬件返回的参数,如果要返回低字节的数据,下一篇博客发出来
       	ctx.write("aa");
    }


    /**
     * 从客户端收到新的数据、读取完成时调用
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
        cause.printStackTrace();
        if (StringUtils.isNotEmpty(id)) {
            ChannelVO channelVO = BootNettyServer.contextMap.get(id);
            if (channelVO == null){
                ctx.close();
                return;
            }
            if (!channelVO.getCode().equals(code)) {
                ctx.close();
                return;
            }
            BootNettyServer.contextMap.remove(id);
        }
        ctx.close();
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelActive(ctx);
        ctx.channel().read();
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
    }

    /**
     * 客户端与服务端 断连时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
        if (StringUtils.isNotEmpty(id)) {
            ChannelVO channelVO = BootNettyServer.contextMap.get(id);
            if (channelVO == null){
                ctx.close();
                return;
            }
            if (!channelVO.getCode().equals(code)) {
                ctx.close();
                return;
            }
            BootNettyServer.contextMap.remove(id);
            ctx.close();
        }
    }

    /**
     * 服务端当read超时, 会调用这个方法
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
        super.userEventTriggered(ctx, evt);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        ctx.close();//超时时断开连接
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    }

}

4.创建通道初始化类 如果请求参数为json一定要看此处

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.handler.codec.json.JsonObjectDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.Charset;
import java.util.concurrent.TimeUnit;

/**
 * 通道初始化
 * @author FengRui
 */
@Slf4j
public class BootNettyChannelInitializer extends ChannelInitializer<Channel> {
	private static final int MAX_FRAME_LENGTH = 2048; // 设置最大数据长度为 65535 字节
	@Override
	protected void initChannel(Channel ch) throws Exception {
		// 设置接收参数最大字节
		ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(MAX_FRAME_LENGTH));
		// 设置字节编码
		ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8")));
		// 设置接收接收的数据为JSON,如果不加会导致ChannelInboundHandlerAdapter 接收到的参数不为JSON,而导致报错
        ch.pipeline().addLast("json", new JsonObjectDecoder());
		// 设置字节编码
        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
        // 设置连接最大闲置时间
		ch.pipeline().addLast(new IdleStateHandler(0,0,30, TimeUnit.SECONDS));
		// 自定义消息处理通道
        ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter());
	}
}

5.如何启动

import com.daniu.netty.BootNettyServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;


/**
 * @author FengRui
 * 我使用的是Netty 容器,不是Tomcat 容器哦,因为该项目只用于对接硬件,不做任何业务处理,最多只能用于参数传输
 */
@Slf4j
@SpringBootApplication
public class DaNiuMiniProgramApplication implements CommandLineRunner {


	public static void main(String[] args) {
		SpringApplication.run(DaNiuMiniProgramApplication.class, args);
		System.out.println("
" +
				"//                          _ooOoo_                               //
" +
				"//                         o8888888o                              //
" +
				"//                         88" . "88                              //
" +
				"//                         (| ^_^ |)                              //
" +
				"//                         O\  =  /O                              //
" +
				"//                      ____/`---'\____                           //
" +
				"//                    .'  \\|     |//  `.                         //
" +
				"//                   /  \\|||  :  |||//  \                        //
" +
				"//                  /  _||||| -:- |||||-  \                       //
" +
				"//                  |   | \\\  -  /// |   |                       //
" +
				"//                  | \_|  ''\---/''  |   |                       //
" +
				"//                  \  .-\__  `-`  ___/-. /                       //
" +
				"//                ___`. .'  /--.--\  `. . ___                     //
" +
				"//              ."" '<  `.___\_<|>_/___.'  >'"".                  //
" +
				"//            | | :  `- \`.;`\ _ /`;.`/ - ` : | |                 //
" +
				"//            \  \ `-.   \_ __\ /__ _/   .-` /  /                 //
" +
				"//      ========`-.____`-.___\_____/___.-`____.-'========         //
" +
				"//                           `=---='                              //
" +
				"//      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^        //
" +
				"//            佛祖保佑       永不宕机      永无BUG                  //
" +
				"");
	}

	@Async
	@Override
	public void run(String... args) throws Exception {
		new BootNettyServer().bind(Integer.valueOf(tcpPort));
	}

}

6.如何返回低字节的数据和十六进制的数据

方法一和方法二都要定位到BootNettyChannelInboundHandlerAdapter类中的 channelRead方法

1.返回十六进制数据

	// 记住不要转换十六进制数据,有部分硬件会自动转换的,如果没有低字节数据直接返回String 类型的数据接口
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.writeAndFlush("L2XTEND");
    }

2.返回十六进制加低字节的数据,设置包头和包尾

低字节校验和计算类
import cn.hutool.core.util.HexUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.commons.lang3.StringUtils;

import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * @author FengRui
 * @Description
 * @Date 2023-04-28 10:53
 */

public class HexadecimalUtils {

    /**
     * 将16进制的数据求和树数据转为 16进制的低字节
     */
    public static String makeChecksum(String data) {
        if (StringUtils.isEmpty(data)) {
            return "";
        }
        int total = 0;
        int len = data.length();
        int num = 0;
        while (num < len) {
            String s = data.substring(num, num + 2);
            total += Integer.parseInt(s, 16);
            num = num + 2;
        }
/**
 * 用256求余最大是255,即16进制的FF
 */
        int mod = total % 256;
        String hex = Integer.toHexString(mod);
        len = hex.length();
// 如果不够校验位的长度,补0,这里用的是两位校验
        if (len < 2) {
            hex = "0" + hex;
        }
        return hex;
    }

    public static CompositeByteBuf returnValue(String prefix, String lowByteDataString, String suffix) {
        CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
        compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(prefix.getBytes(StandardCharsets.UTF_8)));
        compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(HexUtil.decodeHex(HexadecimalUtils.makeChecksum(HexUtil.encodeHexStr(lowByteDataString)))));
        compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(suffix.getBytes(StandardCharsets.UTF_8)));
        return compositeByteBuf;
    }


    public static byte[] convertoTime(String timeString){
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
        LocalDateTime dateTime = LocalDateTime.parse(timeString, formatter);

        byte[] bytes = new byte[7];
        bytes[0] = (byte) (dateTime.getYear() / 100);
        bytes[1] = (byte) (dateTime.getYear() % 100);
        bytes[2] = (byte) dateTime.getMonthValue();
        bytes[3] = (byte) dateTime.getDayOfMonth();
        bytes[4] = (byte) dateTime.getHour();
        bytes[5] = (byte) dateTime.getMinute();
        bytes[6] = (byte) dateTime.getSecond();
        return bytes;
    }

    public static int bytesToInt(byte[] bytes) {
        int highByte = bytes[0] & 0xFF;
        int lowByte = bytes[1] & 0xFF;
        return (highByte << 8) | lowByte;
    }

}
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    	 // new 一个返回数据类,如果是有低字节的数据,一定要使用该类返回
         CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
         // 返回包头+固定数据
         compositeByteBuf.addComponent(true,Unpooled.wrappedBuffer("L2JS".getBytes(StandardCharsets.UTF_8)));
         // 返回实际参数
         compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(bytes));
         //计算前面的参数和校验和
         compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(HexUtil.decodeHex(HexadecimalUtils.makeChecksum(HexUtil.encodeHexStr("JS") + hexString))));
         // 包尾
         compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer("END".getBytes(StandardCharsets.UTF_8)));
    }

3.设置包头包尾

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * 通道初始化
 *  注意我这里没有计算校验和哦
 * @author FengRui
 */
public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {
    // 设置最大数据长度为 65535 字节

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(BootNettyServer.MAX_FRAME_LENGTH));
        /**
         * 解决包头包尾粘包拆包问题
         * */
        pipeline.addLast("decoder", new MyDecoder());
        //返回参数子返回字符串
        ByteBuf baotou = Unpooled.copiedBuffer("4c32".getBytes());
       /**
         * 解决包头包尾粘包拆包问题
       * */
        ByteBuf tail = Unpooled.copiedBuffer("454e44".getBytes());
        pipeline.addLast(new DelimiterBasedFrameDecoder(BootNettyServer.MAX_FRAME_LENGTH, baotou, tail));
        pipeline.addLast(new StringEncoder());
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 4, 4, -8, 0));
        //设置未读写操作的连接断开时间
        ch.pipeline().addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS));
        pipeline.addLast(new BootNettyChannelInboundHandlerAdapter());
    }
}