Spring 集成 Netty ——TCP 协议——传参JSON
1.引入依赖
2. 创建启动类
/** * netty的server * */ @Slf4j public class BootNettyServer { // 用于存储通道,给指定的通道发送消息,建议使用线程安全的ConcurrentHashMap,或者使用Netty 自带的ChannelGroup 来存储通道 // 不过使用ConcurrentHashMap 获取通道更便捷,但是需要手动的去添加通道,删除通道 // ChannelGroup 是Netty 专门提供来关系通道的集合类,可以自动处理通道的添加,删除,需要手动的地方更少 public static final Map<String, ChannelVO> contextMap = new ConcurrentHashMap<>(); /** * port 端口号 * netty 启动需要占用一个独立的端口 * */ public void bind(int port) throws Exception { /** * 配置服务端的NIO线程组 * NioEventLoopGroup 是用来处理I/O操作的Reactor线程组 * bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写, * bossGroup接收到连接后就会把连接信息注册到workerGroup * workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍 */ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { /** * ServerBootstrap 是一个启动NIO服务的辅助启动类 */ ServerBootstrap serverBootstrap = new ServerBootstrap(); /** * 设置group,将bossGroup, workerGroup线程组传递到ServerBootstrap */ serverBootstrap = serverBootstrap.group(bossGroup, workerGroup); /** * ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接,这里告诉Channel通过NioServerSocketChannel获取新的连接 */ serverBootstrap = serverBootstrap.channel(NioServerSocketChannel.class); /** * option是设置 bossGroup,childOption是设置workerGroup * netty 默认数据包传输大小为1024字节, 设置它可以自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费 最小 初始化 最大 (根据生产环境实际情况来定) * 使用对象池,重用缓冲区 */ // serverBootstrap = serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576)); // serverBootstrap = serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576)); /** * 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息 */ serverBootstrap = serverBootstrap.childHandler(new BootNettyChannelInitializer()); log.info("netty 启动成功!"); /** * 绑定端口,同步等待成功 */ ChannelFuture f = serverBootstrap.bind(port).sync(); /** * 等待服务器监听端口关闭 */ f.channel().closeFuture().sync(); } catch (InterruptedException e) { } finally { /** * 退出,释放线程池资源 */ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
3.创建自定义通道
import com.alibaba.fastjson.JSONObject; import com.daniu.DaNiuMiniProgramApplication; import com.daniu.config.RedisConfig; import com.daniu.modules.entity.MessageTake; import com.daniu.modules.entity.dto.*; import com.daniu.modules.entity.vo.*; import com.daniu.modules.redisUtils.RedisUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import java.io.IOException; import java.net.InetSocketAddress; import java.util.UUID; /** * * I/O数据读写处理类 * 1. 该方法为单例方法,不能注入Bean 到其中,并且Netty 是Nio,也就是Io 多路复用,所以在此方法中不建议写过的的业务代码,不然处理时间过长,会导致消息延迟,或者导致硬件消息重试 * 2. 该类问多例,不能使用注入Bean 的方式来进行注入,最好是将Bean 注入到静态类中来使用 * * 问题1的解决方案: * 1. 使用MQ的方式,让其他的消费者来处理 最优方式 * 2. 使用Redis 的消息订阅来进行处理,但是Redis 的消息通道是有内存到大小的,记得设置大小,不过如果数据量特别大也可以额导致消息延迟或者丢失 如果硬件不多地情况下,可以使用该方法, * 3. 使用线程池的方式 不建议使用该方法,如果数据量较大,线程池任务太多,可能会导致数据丢失(具体看拒绝策略),并且还会产生消息多路复用 */ @Slf4j public class BootNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter { /** * 设备Id */ public String id; /** * 保证设备断开 没有特殊心跳 * 第一次心跳机制同步一次数据 */ public Boolean isConnect = false; /** * 每个系统的连接参数 * 设置以为唯一的UUID,因为多线程的原因导致发送到未断掉的设备,因为设备断开后,会重新连接,所以需要重新生成一个 * */ public String code = UUID.randomUUID().toString(); /** * 每二十次心跳同步心跳 */ public Integer frequencySynchronization = DaNiuMiniProgramApplication.FIXED_FREQUENC; /** * 从客户端收到新的数据时,这个方法会在收到消息时被调用 * * @param ctx 通道 * @param msg 接收的参数 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException { // 该方法需要根据设备的编号为主建 // 具体方式要实际操作 //给硬件返回的参数,如果要返回低字节的数据,下一篇博客发出来 ctx.write("aa"); } /** * 从客户端收到新的数据、读取完成时调用 * * @param ctx */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws IOException { ctx.flush(); } /** * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时 * * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException { cause.printStackTrace(); if (StringUtils.isNotEmpty(id)) { ChannelVO channelVO = BootNettyServer.contextMap.get(id); if (channelVO == null){ ctx.close(); return; } if (!channelVO.getCode().equals(code)) { ctx.close(); return; } BootNettyServer.contextMap.remove(id); } ctx.close(); } /** * 客户端与服务端第一次建立连接时 执行 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException { super.channelActive(ctx); ctx.channel().read(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); } /** * 客户端与服务端 断连时 执行 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException { if (StringUtils.isNotEmpty(id)) { ChannelVO channelVO = BootNettyServer.contextMap.get(id); if (channelVO == null){ ctx.close(); return; } if (!channelVO.getCode().equals(code)) { ctx.close(); return; } BootNettyServer.contextMap.remove(id); ctx.close(); } } /** * 服务端当read超时, 会调用这个方法 * * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException { super.userEventTriggered(ctx, evt); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); ctx.close();//超时时断开连接 } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } }
4.创建通道初始化类 如果请求参数为json一定要看此处
import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.handler.codec.json.JsonObjectDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; import java.nio.charset.Charset; import java.util.concurrent.TimeUnit; /** * 通道初始化 * @author FengRui */ @Slf4j public class BootNettyChannelInitializer extends ChannelInitializer<Channel> { private static final int MAX_FRAME_LENGTH = 2048; // 设置最大数据长度为 65535 字节 @Override protected void initChannel(Channel ch) throws Exception { // 设置接收参数最大字节 ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(MAX_FRAME_LENGTH)); // 设置字节编码 ch.pipeline().addLast(new StringEncoder(Charset.forName("UTF-8"))); // 设置接收接收的数据为JSON,如果不加会导致ChannelInboundHandlerAdapter 接收到的参数不为JSON,而导致报错 ch.pipeline().addLast("json", new JsonObjectDecoder()); // 设置字节编码 ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8"))); // 设置连接最大闲置时间 ch.pipeline().addLast(new IdleStateHandler(0,0,30, TimeUnit.SECONDS)); // 自定义消息处理通道 ch.pipeline().addLast(new BootNettyChannelInboundHandlerAdapter()); } }
5.如何启动
import com.daniu.netty.BootNettyServer; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.Async; /** * @author FengRui * 我使用的是Netty 容器,不是Tomcat 容器哦,因为该项目只用于对接硬件,不做任何业务处理,最多只能用于参数传输 */ @Slf4j @SpringBootApplication public class DaNiuMiniProgramApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(DaNiuMiniProgramApplication.class, args); System.out.println(" " + "// _ooOoo_ // " + "// o8888888o // " + "// 88" . "88 // " + "// (| ^_^ |) // " + "// O\ = /O // " + "// ____/`---'\____ // " + "// .' \\| |// `. // " + "// / \\||| : |||// \ // " + "// / _||||| -:- |||||- \ // " + "// | | \\\ - /// | | // " + "// | \_| ''\---/'' | | // " + "// \ .-\__ `-` ___/-. / // " + "// ___`. .' /--.--\ `. . ___ // " + "// ."" '< `.___\_<|>_/___.' >'"". // " + "// | | : `- \`.;`\ _ /`;.`/ - ` : | | // " + "// \ \ `-. \_ __\ /__ _/ .-` / / // " + "// ========`-.____`-.___\_____/___.-`____.-'======== // " + "// `=---=' // " + "// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ // " + "// 佛祖保佑 永不宕机 永无BUG // " + ""); } @Async @Override public void run(String... args) throws Exception { new BootNettyServer().bind(Integer.valueOf(tcpPort)); } }
6.如何返回低字节的数据和十六进制的数据
方法一和方法二都要定位到BootNettyChannelInboundHandlerAdapter类中的 channelRead方法
1.返回十六进制数据
// 记住不要转换十六进制数据,有部分硬件会自动转换的,如果没有低字节数据直接返回String 类型的数据接口 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ctx.writeAndFlush("L2XTEND"); }
2.返回十六进制加低字节的数据,设置包头和包尾
低字节校验和计算类
import cn.hutool.core.util.HexUtil; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import org.apache.commons.lang3.StringUtils; import java.nio.charset.StandardCharsets; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; /** * @author FengRui * @Description * @Date 2023-04-28 10:53 */ public class HexadecimalUtils { /** * 将16进制的数据求和树数据转为 16进制的低字节 */ public static String makeChecksum(String data) { if (StringUtils.isEmpty(data)) { return ""; } int total = 0; int len = data.length(); int num = 0; while (num < len) { String s = data.substring(num, num + 2); total += Integer.parseInt(s, 16); num = num + 2; } /** * 用256求余最大是255,即16进制的FF */ int mod = total % 256; String hex = Integer.toHexString(mod); len = hex.length(); // 如果不够校验位的长度,补0,这里用的是两位校验 if (len < 2) { hex = "0" + hex; } return hex; } public static CompositeByteBuf returnValue(String prefix, String lowByteDataString, String suffix) { CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(prefix.getBytes(StandardCharsets.UTF_8))); compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(HexUtil.decodeHex(HexadecimalUtils.makeChecksum(HexUtil.encodeHexStr(lowByteDataString))))); compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(suffix.getBytes(StandardCharsets.UTF_8))); return compositeByteBuf; } public static byte[] convertoTime(String timeString){ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss"); LocalDateTime dateTime = LocalDateTime.parse(timeString, formatter); byte[] bytes = new byte[7]; bytes[0] = (byte) (dateTime.getYear() / 100); bytes[1] = (byte) (dateTime.getYear() % 100); bytes[2] = (byte) dateTime.getMonthValue(); bytes[3] = (byte) dateTime.getDayOfMonth(); bytes[4] = (byte) dateTime.getHour(); bytes[5] = (byte) dateTime.getMinute(); bytes[6] = (byte) dateTime.getSecond(); return bytes; } public static int bytesToInt(byte[] bytes) { int highByte = bytes[0] & 0xFF; int lowByte = bytes[1] & 0xFF; return (highByte << 8) | lowByte; } }
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // new 一个返回数据类,如果是有低字节的数据,一定要使用该类返回 CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); // 返回包头+固定数据 compositeByteBuf.addComponent(true,Unpooled.wrappedBuffer("L2JS".getBytes(StandardCharsets.UTF_8))); // 返回实际参数 compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(bytes)); //计算前面的参数和校验和 compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(HexUtil.decodeHex(HexadecimalUtils.makeChecksum(HexUtil.encodeHexStr("JS") + hexString)))); // 包尾 compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer("END".getBytes(StandardCharsets.UTF_8))); }
3.设置包头包尾
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.FixedRecvByteBufAllocator; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; /** * 通道初始化 * 注意我这里没有计算校验和哦 * @author FengRui */ public class BootNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> { // 设置最大数据长度为 65535 字节 @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ch.config().setRecvByteBufAllocator(new FixedRecvByteBufAllocator(BootNettyServer.MAX_FRAME_LENGTH)); /** * 解决包头包尾粘包拆包问题 * */ pipeline.addLast("decoder", new MyDecoder()); //返回参数子返回字符串 ByteBuf baotou = Unpooled.copiedBuffer("4c32".getBytes()); /** * 解决包头包尾粘包拆包问题 * */ ByteBuf tail = Unpooled.copiedBuffer("454e44".getBytes()); pipeline.addLast(new DelimiterBasedFrameDecoder(BootNettyServer.MAX_FRAME_LENGTH, baotou, tail)); pipeline.addLast(new StringEncoder()); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 4, 4, -8, 0)); //设置未读写操作的连接断开时间 ch.pipeline().addLast(new IdleStateHandler(0, 0, 120, TimeUnit.SECONDS)); pipeline.addLast(new BootNettyChannelInboundHandlerAdapter()); } }