Netty是一个以事件驱动的异步通信网络框架,可以帮助我们实现多种协议的客户端和服务端通信,话不多说,上代码,需要引入下方依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.42.Final</version> </dependency> <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>0.6.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.30</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-core</artifactId> <version>1.2.4</version> </dependency> <dependency> <groupId>com.itextpdf</groupId> <artifactId>itextpdf</artifactId> <version>5.5.8</version> </dependency> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcprov-jdk15on</artifactId> <version>1.49</version> <type>jar</type> <scope>compile</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcpkix-jdk15on</artifactId> <version>1.49</version> <type>jar</type> <scope>compile</scope> <optional>true</optional> </dependency>
1.Server
package http; import constant.Constant; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.SelfSignedCertificate; import java.security.cert.CertificateException; public class HttpServer { // 通过nio方式来接收连接和处理连接 private static EventLoopGroup group = new NioEventLoopGroup(); // 服务端引导类 private static ServerBootstrap b = new ServerBootstrap(); // 是否开启SSL模式 public static final boolean SSL = false; // Netty创建全部都是实现自AbstractBootstrap,客户端的是Bootstrap,服务端的则是ServerBootstrap public static void main(String[] args) throws Exception { final SslContext sslContext; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslContext = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslContext = null; } try { b.group(group) .channel(NioServerSocketChannel.class) // 设置过滤器 .childHandler(new ServerHandlerInit(sslContext)); // 异步进行绑定 ChannelFuture f = b.bind(Constant.DEFAULT_PORT); // 给ChannelFuture 增加监听器 f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { System.out.println("绑定端口已成功...."); } }); System.out.println("服务端启动成功,端口是:" + Constant.DEFAULT_PORT); System.out.println("服务器启动模式: " + (SSL ? "SSL安全模式" : "普通模式")); // 监听服务器关闭监听 ChannelFuture closeFuture = f.channel().closeFuture().sync(); closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { System.out.println("服务器已经关闭...."); } }); } finally { // 关闭EventLoopGroup,释放掉所有资源,包括创建的线程 group.shutdownGracefully(); } } }
package http; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.ssl.SslContext; public class ServerHandlerInit extends ChannelInitializer<SocketChannel> { private final SslContext sslContext; public ServerHandlerInit(SslContext sslContext) { this.sslContext = sslContext; } @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslContext != null) { pipeline.addLast(sslContext.newHandler(ch.alloc())); } // pipeline中的handler可以自定义名称方便排查问题 // 把应答报文 编码 pipeline.addLast("encoder", new HttpResponseEncoder()); // 把请求报文 解码 pipeline.addLast("decoder", new HttpRequestDecoder()); // 聚合http为一个完整的报文 pipeline.addLast("aggregator", new HttpObjectAggregator(10*1024*1024)); // 把应答报文压缩 pipeline.addLast("compressor", new HttpContentCompressor()); pipeline.addLast(new BusinessHandler()); } }
2.业务处理类
package http; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; public class BusinessHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String result = ""; FullHttpRequest httpRequest = (FullHttpRequest)msg; System.out.println(httpRequest.headers()); try { // 获取路径 String path = httpRequest.uri(); // 获取body String body = httpRequest.content().toString(CharsetUtil.UTF_8); // 获取请求方法 HttpMethod method = httpRequest.method(); System.out.println("接收到 " + method + "请求"); // 如果不是这个路径,就直接返回错误 if (!"/test".equalsIgnoreCase(path)) { result = "非法请求!" + path; send(ctx,result, HttpResponseStatus.BAD_REQUEST); return; } // 如果是GET请求 if (HttpMethod.GET.equals(method)) { // 接收到的消息,做业务处理... System.out.println("body :" + body); result = "GET请求,应答:" + RespConstant.getNews(); send(ctx, result, HttpResponseStatus.OK); return ; } // 如果是其他类型请求,如post if (HttpMethod.POST.equals(method)) { // 接收到的消息,做业务逻辑处理 // .... // return; } } catch (Exception e) { System.out.println("处理请求失败!"); e.printStackTrace(); } finally { // 释放请求 httpRequest.release(); } } private void send(ChannelHandlerContext ctx, String context, HttpResponseStatus status) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer(context, CharsetUtil.UTF_8)); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8"); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } // 建立连接时,返回消息 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接的客户端地址 :" + ctx.channel().remoteAddress()); // super.channelActive(ctx); } }
3.Client
package http; import constant.Constant; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpObjectAggregator; public class HttpClient { public static final String HOST = "127.0.0.1"; public static void main(String[] args) throws InterruptedException { if (HttpServer.SSL) { System.out.println("服务器处于SSL模式,客户端不支持,推出"); return ; } HttpClient client = new HttpClient(); client.connect(Constant.DEFAULT_SERVER_IP, Constant.DEFAULT_PORT); } public void connect(String host, int port) throws InterruptedException { EventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(workerGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new HttpClientCodec()); // 聚合Http为一个完整的报文 ch.pipeline().addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024)); // 解压缩 ch.pipeline().addLast("decompressor", new HttpContentDecompressor()); ch.pipeline().addLast(new HttpClientInboundHandler()); } }); // start ChannelFuture f = b.connect(host, port).sync(); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { System.out.println("连接成功...."); } }); ChannelFuture closeFuture = f.channel().closeFuture().sync(); closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { System.out.println("关闭成功..."); } }); } finally { workerGroup.shutdownGracefully(); } } }
package http; import constant.Constant; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import java.net.URI; public class HttpClientInboundHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { FullHttpResponse httpResponse = (FullHttpResponse) msg; System.out.println(httpResponse.status()); System.out.println(httpResponse.headers()); ByteBuf buf = httpResponse.content(); System.out.println(buf.toString(CharsetUtil.UTF_8)); httpResponse.release(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive......."); URI uri = new URI("/test"); String msg = "Hello"; DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.toASCIIString(), Unpooled.wrappedBuffer(msg.getBytes("UTF-8"))); // 构建http请求 request.headers().set(HttpHeaderNames.HOST, Constant.DEFAULT_SERVER_IP); request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes()); // 发送http请求 ctx.writeAndFlush(request); // super.channelActive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); // super.exceptionCaught(ctx, cause); } }
package http; import java.util.Random; public class RespConstant { private static final String[] NEWS = { "她那时候还太年轻,不知道所有命运赠送的礼物,早已在暗中标好了价格。——斯蒂芬·茨威格《断头皇后》", "这是一个最好的时代,也是一个最坏的时代;这是一个智慧的年代,这是一个愚蠢的年代; " + "这是一个信任的时期,这是一个怀疑的时期;这是一个光明的季节,这是一个黑暗的季节; " + "这是希望之春,这是失望之冬;人们面前应有尽有,人们面前一无所有; " + "人们正踏上天堂之路,人们正走向地狱之门。 —— 狄更斯《双城记》", }; private static final Random R = new Random(); public static String getNews() { return NEWS[R.nextInt(NEWS.length)]; } }
package constant; import java.util.Date; /** * 常量 */ public class Constant { public static final Integer DEFAULT_PORT = 7777; public static final String DEFAULT_SERVER_IP= "127.0.0.1"; // 根据输入信息拼接出一个应答信息 public static String response(String msg) { return "Hello, " + msg + ", Now is" + new Date(System.currentTimeMillis()).toString(); } }
4.总结分析
如果你想实现http请求,需要把HttpServer中的SSL置为false,结果如下
如果你想实现Https的请求,则将SSL的变量置为true,目前的代码中是没有支持客户端的SSL请求的,我们可以在postman或者chrome浏览器中查看
由于我们的证书是自己设置的,所以chrome浏览器认为这个证书不是有效的,需要我们手动点击
IDEA中会出现红色证书错误,暂时可以不用管,你还可以在postman的GET请求中添加body
到此http的简易服务器就搭建好了,如果你的程序出现了以下错误,请检查开头的pom配置是否下载成功,这是由于证书有问题才报的错