channellvscan inactivee 什么时候触发

5646人阅读
netty现在应该是java界最流行的网络框架之一了,高性能,可扩展,代码优雅。之前做的页游都是用netty3.x来做网络层通信。最近看到netty4快要出来了,一些新的特性还是很值得推介的。
ChannelBuffer变成了ByteBuff。还引入了Buff对象池Unpooled来管理回收不用的buff,避免gc的频率。在netty3中,buff都是固定大小或者dynamic(write*需要更多的空间)。
为了解决这些问题。 netty4中的所有buff都是动态大小的,可以随意更改capacity,并且性能更好。
CompositeByteBuf是新引入的,可以避免大块内存的拷贝。当你需要组合一个新的buff时,用Unpooled.wrappedBuffer(...)或者positieBuff(...) 重点要介绍一些ByteBuf对象池,当一个ByteBuf被对象池回收后,可以重复利用,从而避免了频繁的内存分配和gc。当一个ByteBuf被write到目标socket是,它会自动被回收到对象池。
概念的更改,变得更容易理解了。Upstream变成Inbound,Downstream变成了Outbound。
之前netty3每次io读写,都会new一个ChannelBuffer,会导致gc频繁。netty4将socket的数据直接存入inbound buffer,并且使用了对象池技术后会改善这种状况。
netty3的事件处理笼统归为handleUpstream和handleDownstream 。
netty4引入了更具体的事件,包括:
void channelRegistered(ChannelHandlerContext ctx) throws E
void channelUnregistered(ChannelHandlerContext ctx) throws E
void channelActive(ChannelHandlerContext ctx) throws E
void channelInactive(ChannelHandlerContext ctx) throws E
void inboundBufferUpdated(ChannelHandlerContext ctx) throws E
netty4还简化了channel状态模型。之前一个channel connect后,会触发channelOpen, channelBound, 和 channelConnected.
channel关闭后会触发 channelDisconnected, channelUnbound, 和channelClosed.
上面的几个事件被合并成channelActive 和 channelInActive。 messageReceived 和 writeRequested 被 inboundBufferUpdated 和flush 替换了。
Channel支持 half-closed,可以指定关闭后接收剩余的数据。
3.I/0线程分配
netty4中,EventLoopGroup取代了ChannelFactory,一个channel必须显式地registere 到EventLoopGroup。这样有个好处,就是在不同的业务中,Channel可以随时deregister和registered到某个EventLoopGroup中去。
EventLoop 是本质上也是一个ScheduledExecutorService,所以一个channel的一些定时操作也可以交由它去执行。从而保证了一个channel的所有操作都是在同一个线程中执行了。。这样的内存模型可以简化很多的线程问题。
4.配置项和attach对象
之前ChannelConfig 的配置项项都是用字符串定义的。容易出问题。netty4重新定义了常量。例如 cfg.setOption(ChannelOption.TCP_NODELAY, true);
引入了AttributeMap,可以存储任何对象到Channel中。
netty4的改进是很值得推荐的,尤其是内存分配方面的改进,今后更高并发的系统将变得可能,gc造成的延迟也讲得到改善。 准备在netty4正式版出来,稳定之后用于下一个新游戏项目中。到时再跟大家分享一些心得和体会。
&准备将blog搬到这里了,希望能跟大家多分享交流技术,thanks
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:338420次
积分:2655
积分:2655
排名:第8962名
原创:14篇
评论:129条
有任何问题欢迎交流
(1)(1)(11)(1)<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&netty学习(16)
&&&&&&& 上一篇文章中,我们介绍了ServerBootstrap的启动过程,在这个过程中引出了一个很重要的类ChannelPipeline。如果说内存池,线程模型是netty的基础,我们接触的相对来说并不是那么直接,那么ChannelPipeline则是我们开发中直接能够接触到的概念,如果对其不了解,很容易造成一些不正确的使用姿势或一些然并卵的优化。现在我们就来看看ChannelPipeline的作用到底是啥。
&&&&&&& ChannelPipeline包含了一个ChannelHander形成的列表。其中ChannelHander是用于处理连接的Inbound或者Outbound事件,也是所有开发者都会直接接触到的一个类。ChannelPipeline实现了一个比较高级的拦截过滤模式,用户可以完全控制事件的处理,并且pipeline中的ChannelHandler可以相互交互。&&&&&&&
&&&&&&& 不管是server还是client,每个连接都会对应一个pipeline,该pipeline在Channel被创建的时候创建(注意这里的Channel是指netty中封装的Channel,而不是jdk的底层连接)。先看看ChannelPipeline的官方注释,(比较懒,直接截了个图。。。),该注释直观的描述了IO事件在pipeline中的处理流程。
&&&&&&& 从图中可以看到inbound事件(read为代表)是从ChannelHandler1开始处理一直到最后一个ChannelHandler N,而outbound事件(write为代表)则是从ChannelHandler N开始处理,一直到ChannelHandler1。当然这个只是一个典型的处理流程,用户可以完全控制这个流程,改变事件的处理流程,如跳过其中一个ChannelHander,甚至在处理过程中动态的添加或者删除ChannelHandler。那么inbound和outbound各自有哪些事件呢?来看看下面两个表:
inbound事件
ChannelHandlerContext#fireChannelRegistered()
ChannelHandlerContext#fireChannelUnregistered()
ChannelHandlerContext#fireChannelActive()
ChannelHandlerContext#fireChannelRead(Object)
ChannelHandlerContext#fireChannelReadComplete()
ChannelHandlerContext#fireExceptionCaught(Throwable)
ChannelHandlerContext#fireUserEventTriggered(Object)
用户事件触发
ChannelHandlerContext#fireChannelWritabilityChanged()
可写状态改变
ChannelHandlerContext#fireChannelInactive()
检测到连接断开
outbound事件
ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext#write(Object, ChannelPromise)
ChannelHandlerContext#flush()
将写的数据发送
ChannelHandlerContext#read()
触发读操作
ChannelHandlerContext#disconnect(ChannelPromise)
与对方的连接断开
ChannelHandlerContext#close(ChannelPromise)
关闭自己的连接
ChannelHandlerContext#deregister()
触发取消注册
&&&&&&& 需要注意的是不管是在inbound事件还是outbound事件处理的过程中发生异常,都会触发fireExceptionCaught方法的调用。当然fireExceptionCaught本身如果发生异常是不会再进行处理了,只是简单的记录一个异常日志,否则就死循环了。
&&&&&&& ChannelPipeline提供了很多方法来方便对ChannelHandler进行操作,如addLast,addFirst,remove,replace等。最后强调一下,ChannelPipeline的所有方法都是线程安全的!
& & & & 下面我们来看看ChannelPipeline的默认实现DefaultChannelPipeline。
final AbstractC
// pipeline对应的连接
// 通过context.next和context.prev形成一个双向链表
final AbstractChannelHandlerC
// pipeline中的第一个Context
final AbstractChannelHandlerC
// pipeline中的最后一个Context
&pre name=&code& class=&java&&
// 保存除head和tail外所有Context的map,由于ChannelPipeline中的操作都是线程安全的,所以这里是HashMap
private final Map&String, AbstractChannelHandlerContext& name2ctx =
new HashMap&String, AbstractChannelHandlerContext&(4);
private Map&EventExecutorGroup, ChannelHandlerInvoker& childI
&&&&&&& 这里引出了又一个重要的类ChannelHandlerContext,顾名思义ChannelHandlerContext就是一个拥有ChannelHandler的类,该类使ChannelHandler可以通过ChannelPipeline与其他的ChannelHandler交互,而ChannelPipeline也是通过该类对ChannelHandler进行操作,该类非常重要,我们一会会具体分析它。
DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException(&channel&);
this.channel =
tail = new TailContext(this);
head = new HeadContext(this);
head.next =
tail.prev =
&&&&&&& 从构造方法可以看出在pipeline中head和tail是默认分配的。
&&&&&&& TailContext是outbound事件的起点,inbound的终点,他的所有inbound方法都是空实现,而outbound方法则是简单的把控制权交给前面的handler。有了tail,我们能够保证inbound处理流最终能够优雅的结束掉(空实现,事件不会再往后传播),而不用在进行各种空判断;而对于outbound事件,它直接唤起下一个handler处理,充当了一个隐形的老好人。可以这么说,虽然它确实没什么太大的卵用,但是没有它,你会发现很多地方的代码将变得很难看。
& & && HeadContext是inbound的起点,outbound的终点。作为inbound的起点,他也只是默默的把控制权交给后面的handler。而作为outbound的终点,他承担的责任则非常重要,他负责对outbound事件进行最终的底层调用(这里的底层是netty底层,而非jdk底层或操作系统底层),因此如果你暂时不关心编解码,而想了解write方法之类的最终实现,可以直接在HeadContext的对应方法上加断点,然后就可以直接了解这一部分知识了。
&&&&&&&& 以下是HeadContext中outbound事件的部分方法:
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}&&&&&&& 有了头尾,我们来看看链表中间的部分是怎么生成/处理的。下面是将节点加到链表最前面的位置,注意,这里面的最前和最后是排除HeadContext、TailContext的。
public ChannelPipeline addFirst(ChannelHandlerInvoker invoker, String name, ChannelHandler handler) {
synchronized (this) {
name = filterName(name, handler);
addFirst0(name, new DefaultChannelHandlerContext(this, invoker, name, handler));
private void addFirst0(String name, AbstractChannelHandlerContext newCtx) {
// 检查该context中的hander是否重复添加了
&#160;checkMultiplicity(newCtx);
AbstractChannelHandlerContext nextCtx = head.
newCtx.prev =
newCtx.next = nextC
head.next = newC
nextCtx.prev = newC
name2ctx.put(name, newCtx);
// 触发handler.handlerAdded(newCtx)
callHandlerAdded(newCtx);
// 检查是否有重复添加的情况
private static void checkMultiplicity(ChannelHandlerContext ctx) {
&#160;&#160;&#160;&#160;&#160;&#160;&#160; ChannelHandler handler = ctx.handler();
&#160;&#160;&#160;&#160;&#160;&#160;&#160; if (handler instanceof ChannelHandlerAdapter) {
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; ChannelHandlerAdapter h = (ChannelHandlerAdapter)
// 如果Handler未添加Sharable注解,且added为true,则报错
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; if (!h.isSharable() && h.added) {
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; throw new ChannelPipelineException(
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; h.getClass().getName() +
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; & is not a @Sharable handler, so can&#39;t be added or removed multiple times.&);
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; }
// added标记设置为true,表示handler已经被使用到context中
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; h.added =
&#160;&#160;&#160;&#160;&#160;&#160;&#160; }
&#160;&#160;&#160; }
&&&&&&& ChannelHandlerInvoker是用来支持handler的线程访问模式的,正常情况下,pipeline中的所有方法使用启动时指定的EventLoop来处理handler中的方法,但如果你觉得netty的线程模型无法满足你的需求,你可以自己实现一个ChannelHandlerInvoker来使用自定义的线程模型。
&&&&&&& 注意,pipeline中的所有context都是私有的,针对context的所有操作都是线程安全的,但context对象包含的handler不一定是私有的。比如添加了Sharable注解的handler,表示该handler自身可以保证线程安全,这种handler只实例化一个就够了。而对于没有添加Sharable注解的handler,netty就默认该handler是有线程安全问题的,对应实例也不能被多个Context持有。
&&&&&&& pipeline中还提供了很多方法来最大化的支持你控制pipleline,如addLast,addBefore,addAfter,replace,remove,removeFirst,removeLast等,从方法名就能看出其作用,这里不过多介绍。需要注意的是remove(Class handlerType)方法移除的是pipeline中第一个type为handlerType的handler,而不是移除所有type未handlerType的handler。
&&&&&&& pipleline中定义了fireChannelInactive等一些列方法,该系列方法与ChannelHandlerContext中的方法名相同,然而他们的返回&#20540;并不同,方法也不是来自同样的接口。方法名相同估计是为了让开发者调用起来更加方便(记忆)。以下面两个方法为例,pipeline的fireChannelInactive直接调用了head.fireChannelInactive。
public ChannelPipeline fireChannelInactive() {
head.fireChannelInactive();
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
&#160;&#160;&#160;&#160;&#160;&#160;&#160; return tail.bind(localAddress, promise);
&#160;&#160;&#160; }&&&&&&& 之前看文章有同学在困惑下面两种调用方式到底有何不同,这个其实就是被相同的方法名给欺骗了。本来就不是一样的东西。 调用方式1,表示从当前的前一个handler开始处理(outbound),而调用方式二则是从tail开始处理。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 调用方式1
ctx.writeAndFlush(result);
// 调用方式2, 翻译过来就是tail.writeAndFlush(msg)
ctx.pipeline().writeAndFlush(result);
&&&&&&& 在这一堆outbound/inbound方法中有两个方法不是简单的调用ctx.xxx(), 这两个方法如下:
public ChannelPipeline fireChannelActive() {
&#160;&#160;&#160;&#160;&#160;&#160;&#160; head.fireChannelActive();
// 如果channel是自动开始读取,则此处会触发一次read,read方法最终会触发HeadContext的unsafe.beginRead();
// 对于一个ServerSocketChannel,beginRead就是向SelectKey注册OP_ACCEPT事件,而对于SocketChannel来说,则是向SelectKey注册OP_READ事件
&#160;&#160;&#160;&#160;&#160;&#160;&#160; if (channel.config().isAutoRead()) {
&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160;&#160; channel.read();
&#160;&#160;&#160;&#160;&#160;&#160;&#160; }
&#160;&#160;&#160;&#160;&#160;&#160;&#160;
&#160;&#160;&#160; }
public ChannelPipeline fireChannelUnregistered() {
head.fireChannelUnregistered();
// 如果连接关闭并且取消了注册,则依次移除所有handler(从头tail -& head)
if (!channel.isOpen()) {
destroy();
&&&&&&& 这两个方法正是一个channel注册到EventLoop后的开始和结束方法。总的说来DefaultChannelPipleline不复杂。 现在我们整理一下连接在pipeline的整个生命周期是什么样子的。
Server端:
&&&&&&& 1、ServerSocketChannel : fireChannelRegistered(注册到EventLoop) -& bind(绑定端口)-& fireChannelActive(激活) -&【read(注册OP_ACCEPT到SelectorKey) -& fireChannelRead(接收到客户端连接,此时会将客户端连接注册到workerGroup) -& fireChannelReadComplete(读取客户端连接完成) -& 接收下一个连接】 -& 直到最终关闭触发fireChannelUnregistered(从EventLoop中取消注册);
&&&&&&& 2、SocketChannel: ServerSocketChannel接收到客户端请求后,将其注册到workerGroup -& fireChannelRegistered(注册) -& fireChannelActive (激活) -&【 read(注册OP_READ到SelectorKey)-&fireChannelRead(读取到数据)-& (业务数据处理) -& (write(写数据到buffer)-&flush(数据最终发送) / writeAndFlush(前两个操作的组合))
-& fireChannelReadComplete(读取过程结束)】-& fireChannelInactive(连接关闭) -& fireChannelUnregistered(从EventLoop中取消注册);
Client端:
&&&&&&& SocketChannel: fireChannelRegistered(注册到EventLoop) -& connect(连接server) -& fireChannelActive(连接成功)-&【 read(注册OP_READ)-&(write(写数据到buffer)-&flush(数据最终发送) / writeAndFlush(前两个操作的组合))& -& fireChannelRead(读取到server传回的数据)-&(业务数据处理)-&fireChannelReadComplete(读取完成)
】-& fireChannelInactive(接收关闭连接的请求)-& close(关闭连接) -& fireChannelUnregistered(从EventLoop中取消注册);
&&&&&& 上面是一个简化的监听、连接、处理、关闭流程,实际的流程会更加复杂。
&&&&&& 总结一下,每个连接对应一个ChannelPipeline,ChannelPipeline维护了一个处理链,并提供了丰富的操作处理链的api。 处理链维护了一些列ChannelContext,其中head和tail由ChannelPipeline自己维护,用户无法修改这两个节点。每个ChannelContext包含一个ChannelHandler,所有ChannelContext是私有的线程安全的,但handler可以通过添加注解Sharable的方式来共享。ChannelPipeline提供了一系列操作TailContext和HeadContext的方法来达到处理handler的目的,但操作是由ChannelContext触发ChannelHandler完成的,而非直接操作handler。
&&&&&&& 本篇文章里原本需要配图的(直观、形象),然而冬天来了太冷了,有时间再补吧。下一篇将介绍ChannelContext。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:7134次
排名:千里之外
原创:25篇
(1)(1)(3)(2)(4)(7)(2)(1)(4)随笔 - 1134&
文章 - 32&评论 - 163&trackbacks - 0
我们从netty-example的Discard服务器端示例分析了netty的组件,今天我们从另一个简单的示例Echo客户端分析一下上个示例中没有出现的netty组件。
1.&服务端的连接处理,读写处理
echo客户端代码:
* Sends one message when a connection is open and echoes back any received
* data to the server.
Simply put, the echo client initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
public final class EchoClient {
static final boolean SSL = System.getProperty("ssl") != null;
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
// Configure SSL.git
final SslContext sslC
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
sslCtx = null;
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer&SocketChannel&() {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
//p.addLast(new ));
p.addLast(new EchoClientHandler());
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
从上面的代码可以看出,discard的服务端代码和echo的客户端代码基本相似,不同的是一个使用ServerBootStrap,另一个使用BootStrap而已。先看一下连接过程
NioEventLoop处理key的过程,
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See /netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
2.1 连接流程
调用AbstractNioByteChannel的finishConnect()方法
public final void finishConnect() {
// Note this method is invoked by the event loop only if the connection attempt was
// neither cancelled nor timed out.
assert eventLoop().inEventLoop();
boolean wasActive = isActive();
doFinishConnect();
fulfillConnectPromise(connectPromise, wasActive);
} catch (Throwable t) {
fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
} finally {
// Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis & 0 is used
// See /netty/netty/issues/1770
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
connectPromise = null;
触发channelActive操作:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (promise == null) {
// Closed via cancellation and the promise has been notified already.
// trySuccess() will return false if a user cancelled the connection attempt.
boolean promiseSet = promise.trySuccess();
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
// If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
if (!promiseSet) {
close(voidPromise());
2.2 读操作流程
调用AbstractNioByteChannel的read()方法,
  典型的autoRead流程如下:
  1. 当socket建立连接时,Netty触发一个inbound事件channelActive,然后提交一个read()请求给本身(参考DefaultChannelPipeline.fireChannelActive())
  2. 接收到read()请求后,Netty从socket读取消息。
  3. 当读取到消息时,Netty触发channelRead()。
  4. 当读取不到消息后,Netty触发ChannelReadCompleted().
  5. Netty提交另外一个read()请求来继续从socket中读取消息。
public final void read() {
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean needReadPendingReset = true;
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() &= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
allocHandle.incMessagesRead(1);
if (needReadPendingReset) {
needReadPendingReset = false;
setReadPending(false);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (allocHandle.lastBytesRead() & 0) {
closeOnRead(pipeline);
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, allocHandle.lastBytesRead() & 0, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
// See /netty/netty/issues/2254
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
&触发读操作
public ChannelHandlerContext fireChannelRead(Object msg) {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelRead(next, pipeline.touch(msg, next));
return this;
读完触发完成事件
public ChannelPipeline fireChannelReadComplete() {
head.fireChannelReadComplete();
if (channel.config().isAutoRead()) {
return this;
public ChannelHandlerContext fireChannelReadComplete() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelReadComplete(next);
return this;
2.3 写操作流程
@SuppressWarnings("deprecation")
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
final ChannelOutboundBuffer outboundBuffer = this.outboundB
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
if (isOpen()) {
outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION, true);
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION, false);
} finally {
inFlush0 = false;
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
close(voidPromise(), t, false);
outboundBuffer.failFlushed(t, true);
} finally {
inFlush0 = false;
写操作具体实现(以NioSocketChannel为例):
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
for (;;) {
int size = in.size();
if (size == 0) {
// All written so clear OP_WRITE
clearOpWrite();
long writtenBytes = 0;
boolean done = false;
boolean setOpWrite = false;
// Ensure the pending writes are made of ByteBufs only.
ByteBuffer[] nioBuffers = in.nioBuffers();
int nioBufferCnt = in.nioBufferCount();
long expectedWrittenBytes = in.nioBufferSize();
SocketChannel ch = javaChannel();
// Always us nioBuffers() to workaround data-corruption.
// See /netty/netty/issues/2761
switch (nioBufferCnt) {
// We have something else beside ByteBuffers to write so fallback to normal writes.
super.doWrite(in);
// Only one ByteBuf so use non-gathering write
ByteBuffer nioBuffer = nioBuffers[0];
for (int i = config().getWriteSpinCount() - 1; i &= 0; i --) {
final int localWrittenBytes = ch.write(nioBuffer);
if (localWrittenBytes == 0) {
setOpWrite = true;
expectedWrittenBytes -= localWrittenB
writtenBytes += localWrittenB
if (expectedWrittenBytes == 0) {
done = true;
for (int i = config().getWriteSpinCount() - 1; i &= 0; i --) {
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes == 0) {
setOpWrite = true;
expectedWrittenBytes -= localWrittenB
writtenBytes += localWrittenB
if (expectedWrittenBytes == 0) {
done = true;
// Release the fully written buffers, and update the indexes of the partially written buffer.
in.removeBytes(writtenBytes);
if (!done) {
// Did not write all buffers completely.
incompleteWrite(setOpWrite);
2.&ChannelInboundHandler和ChannelInboundHandler
Echo的handler代码如下:
* Handler implementation for the echo client.
It initiates the ping-pong
* traffic between the echo client and server by sending the first message to
* the server.
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstM
* Creates a client-side handler.
public EchoClientHandler() {
firstMessage = Unpooled.buffer(EchoClient.SIZE);
for (int i = 0; i & firstMessage.capacity(); i ++) {
firstMessage.writeByte((byte) i);
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
上面的代码出现了两个重要的netty组件:ChannelInboundHandlerAdapter和ByteBuf。其中ByteBuf在已经讲到。我们这次重点分析一下    ChannelInboundHandlerAdapter及其相关类。
  ChannelInboundHandlerAdapter继承了ChannelInboundHandler,它的作用是将operation转到ChannelPipeline中的下一个ChannelHandler。子类可以重写一个方法的实现来改变。注意:在方法#channelRead(ChannelHandlerContext, Object)自动返回前,message不会释放。若需要一个可以自动释放接收消息的ChannelInboundHandler实现时,请考虑SimpleChannelInboundHandler。
  ChannelOutboundHandlerAdapter继承了ChannelOutboundHandler,它仅通过调用ChannelHandlerContext跳转到每个方法。
  ChannelInboundHandler处理输入的事件,事件由外部事件源产生,例如从一个socket接收到数据。&
  ChannelOutboundHandler解析你自己应用提交的操作。
 2.1&ChannelInboundHandler.channelActive() 
从源码角度看一下,Netty触发一个inbound事件channelActive(以LoggingHandler为例):
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "ACTIVE"));
ctx.fireChannelActive();
触发操作如下:
public ChannelHandlerContext fireChannelActive() {
AbstractChannelHandlerContext next = findContextInbound();
next.invoker().invokeChannelActive(next);
return this;
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
ctx = ctx.
} while (!ctx.inbound);
&invokeChannelActive方法实现:
public void invokeChannelActive(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeChannelActiveNow(ctx);
executor.execute(new OneTimeTask() {
public void run() {
invokeChannelActiveNow(ctx);
public static void invokeChannelActiveNow(final ChannelHandlerContext ctx) {
((ChannelInboundHandler) ctx.handler()).channelActive(ctx);
} catch (Throwable t) {
notifyHandlerException(ctx, t);
2.2 ChannelOutboundHandler.Read()
读的流程:
public ChannelHandlerContext read() {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeRead(next);
return this;
查找outbound的过程:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
ctx = ctx.
} while (!ctx.outbound);
触发读操作:
public void invokeRead(final ChannelHandlerContext ctx) {
if (executor.inEventLoop()) {
invokeReadNow(ctx);
AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext)
Runnable task = dctx.invokeReadT
if (task == null) {
dctx.invokeReadTask = task = new Runnable() {
public void run() {
invokeReadNow(ctx);
executor.execute(task);
2.3&ChannelOutboundHandler.write()
以实现类LoggingHandler为例:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (logger.isEnabled(internalLevel)) {
logger.log(internalLevel, format(ctx, "WRITE", msg));
ctx.write(msg, promise);
具体实现:
public ChannelFuture write(Object msg, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
next.invoker().invokeWrite(next, pipeline.touch(msg, next), promise);
写操作的触发
public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
if (!validatePromise(ctx, promise, true)) {
// promise cancelled
ReferenceCountUtil.release(msg);
if (executor.inEventLoop()) {
invokeWriteNow(ctx, msg, promise);
safeExecuteOutbound(WriteTask.newInstance(ctx, msg, promise), promise, msg);
public static void invokeWriteNow(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
((ChannelOutboundHandler) ctx.handler()).write(ctx, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
&  Netty中,可以注册多个handler。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行,如下图所示,按照注册的先后顺序对Handler进行排序,request进入Netty后的执行顺序为:
【1】http://blog.csdn.net/u/article/details/
【2】/questions//in-netty4-why-read-and-write-both-in-outboundhandler
阅读(...) 评论()

我要回帖

更多关于 session inactive 的文章

 

随机推荐