avatar

netty 核心概念

Netty简介

Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护高性能服务器和客户端

Netty是一个NIO客户机-服务器框架,它支持快速、简单地开发网络应用程序;极大的简化了网络编程,如TCP和UDP套接字服务器

快速和简单 并不意味着生成的应用程序将收到可维护性或性能问题的影响。Netty经过精心设计,并积累了许多协议(Ftp、Smtp、http、https…)的实施经验,以及各种二进制和基于文本的遗留协议。因此,Netty成功的找到了一种方法,在不妥协的情况下实现了易于开发高性能稳定性灵活性

Netty中的核心概念

Channel

管道,Channel是对Socket的封装,其包含了一组API,大大简化了直接对Socket进行操作的复杂性

EventLoopGroup

EventLoopGroup是一个EventLoop,包含很多的EventLoop

Netty为每个Channel分配一个EventLoop,用于处理用户的连接请求、对用户请求的处理等所有事件。EventLoop本身只是一个线程驱动,在其生命周期内只会绑定一个线程,让该线程处理一个Channel的所有IO事件

一个Channel一旦与一个EventLoop相绑定,那么在Channel的整个生命周期内是不能改变的。一个EventLoop可以与多个Channel绑定,即ChannelEventLoop的关系是 n:1,而EventLoop线程的关系是 1:1

ServerBootStrap、BootStrap

用于配置整个Netty代码,将各个组建关联起来
服务端使用的是ServerBootStrap
客户端使用的是BootStrap

ChannelHandler与ChannelPipeline

ChannelHandler是对Channel数据的处理器,这些处理器可以是系统本身定义好的编解码器,也可以用户自定义编解码器。这些处理器会被统一添加到一个ChannelPipeline的对象中,然后按照添加的数据依次对Channel中的数据进行处理

ChannelFuture

Netty中所有的I/O操作都是异步的,即操作不会立即得到返回结果,所以Netty中定义了一个ChannelFuture对象作为这个异步操作的代言人,表示异步操作本身。如果想获取到该异步操作的返回值,可以通过该异步操作对象的**addListener()**方法为该异步操作添加监听器,为其注册回调:当结果出来后马上调用执行

  • Netty的异步编程模型都是建立在Future与回调概念之上的

Netty执行流程

Netty_process

Demo(Http)

1
2
3
4
5
6
7
8
9
10
11
12
13
<!-- netty-all依赖       -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.36.Final</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package http.nettyDemo.fundodoo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;

/**
* @author drunk
*/
public class FundodooServer {

public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup parentGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(parentGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new HttpServerCodec())
.addLast(new HttpCustomerHandler());

}
});

try {
ChannelFuture future = serverBootstrap.bind(8888).sync();
future.channel().closeFuture().sync();
}finally {
parentGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package http.nettyDemo.fundodoo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

/**
* @author drunk
*/
public class HttpCustomerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest httpRequest = (HttpRequest) msg;
System.out.println("请求方式:"+ httpRequest.method().name());

if ("/favicon.ioc".equals(httpRequest.uri())) {
System.out.println("不处理/favicon.ioc");
return;
}

ByteBuf contents = Unpooled.copiedBuffer("hello Netty World", CharsetUtil.UTF_8);

DefaultFullHttpResponse response =
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,contents);

HttpHeaders httpHeaders = response.headers();
httpHeaders.set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
httpHeaders.set(HttpHeaderNames.CONTENT_LENGTH,contents.readableBytes());

ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}


}

Socket编程

前面的工程是一个仅存在服务端的Http请求的服务器,而Netty中最为常见的是C/S构架的Socket代码。

需求

  1. 客户端连接上服务端后,其马上会向服务端发送一个数据
  2. 服务端接收到数据后,马上向客户端回复一个数据
  3. 客户端收到数据后,便会向服务端发送一个数据

编码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package socket.nettyDemo.fundodoo;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

/**
* @author 醉探索戈壁
*
*/
public class FundodooServer {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup parentGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();

ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(parentGroup,workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new CustomSocketServerHandler());
}
});
try {
ChannelFuture future = bootstrap.bind(8888).sync();
future.channel().closeFuture().sync();
}finally {
parentGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package socket.nettyDemo.fundodoo;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.EventExecutorGroup;

import java.util.concurrent.TimeUnit;

/**
* @author 醉探索戈壁
* @date 2020/4/13 13:33
*/
public class CustomSocketServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(ctx.channel().remoteAddress() + "===" + msg);
ctx.writeAndFlush("from server " + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(500);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package socket.nettyDemo.fundodoo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;

import java.util.logging.SocketHandler;

/**
* @author 醉探索戈壁
* @date 2020/4/13 13:40
*/
public class FundodooClient {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup workGroup = new NioEventLoopGroup();

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8))
.addLast(new StringDecoder(CharsetUtil.UTF_8))
.addLast(new CustomSocketClientHandler());
}
});
try {
ChannelFuture future = bootstrap.connect("localhost", 8888).sync();
future.channel().closeFuture().sync();
}finally {
if (null != workGroup) {
workGroup.shutdownGracefully();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package socket.nettyDemo.fundodoo;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.EventExecutorGroup;

import java.util.concurrent.TimeUnit;

/**
* @author 醉探索戈壁
* @date 2020/4/13 13:48
*/
public class CustomSocketClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
ctx.writeAndFlush("from client" + System.currentTimeMillis());
TimeUnit.MILLISECONDS.sleep(500);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush("connected:" + System.currentTimeMillis());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
文章作者:
文章链接: https://www.fundodoo.com/zh-CN/2020/04/13/15.html
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 醉探索戈壁
打赏
  • 微信
    微信
  • 支付寶
    支付寶

评论