`
cqupt123
  • 浏览: 66172 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

netty从入门到精通——实例篇(一)

阅读更多

 

  前面了解了netty的基本用法与几个核心概念,从本文开始会通过一些实例(主要参考源码example)来进一步学习netty的使用方法以及其中的原理。

  先来实现一个简单的功能:服务端如果接收到客户端的连接,则返回字符串”success”,客户端读到该消息打印出来。

  服务端的方法改写了第一篇blogNetty从入门到精通—入门篇中的handler,具体如下:

 

class MyChannelHandler extends SimpleChannelHandler {
	…
	…
	…
	@Override
	public void channelConnected(ChannelHandlerContext ctx,
			ChannelStateEvent e) throws Exception {
		System.out.println("Channel connected " + e);
		Channel ch = e.getChannel();
		ChannelBuffer cb = ChannelBuffers.wrappedBuffer("success".getBytes()) ;
		ch.write(cb);
		}
	…
	…
	…
}

   客户端的编码如下:

 

package com.netty.intr;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

public class NettyClient {
	final static String host = "127.0.0.1";
	final static int port = 8080;

	public static void main(String[] args) {
		Client client = new Client();
		client.config(host, port).start();
	}
}

class Client {
	ClientBootstrap bootstrap;
	ChannelHandler myHandler = new MyClientHandler();
	String host;
	int port;
	public Client() {
		bootstrap = new ClientBootstrap(
	            new NioClientSocketChannelFactory(
	                    Executors.newCachedThreadPool(),
	                    Executors.newCachedThreadPool()));
		
		 bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
	         public ChannelPipeline getPipeline() throws Exception {
	             return Channels.pipeline(myHandler);
	         }
	     });
	}
	
	Client config(String host, int port) {
		this.host = host;
		this.port = port;
		bootstrap.setOption("remoteAddress", new InetSocketAddress(this.host, this.port));
		return this;
	}
	
	void start() {
		 bootstrap.connect();
	}
	
	class MyClientHandler extends SimpleChannelUpstreamHandler {
		@Override
		public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e)
				throws Exception {
			System.out.println("Client Channel closed " + e);
		}

		@Override
		public void channelConnected(ChannelHandlerContext ctx,
				ChannelStateEvent e) throws Exception {
			System.out.println("Client Channel connected " + e);
		}

		@Override
		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
				throws Exception {
			try {
				ChannelBuffer buf = (ChannelBuffer) e.getMessage();
				byte[] bytes = buf.array();
				System.out.println("Client reseived message : " + new String(bytes));
			} catch (Exception ex) {
				ex.printStackTrace();
				throw ex;
			}
		}
	}
}

 

   依次启动NettyServerNettyClient

  Serverconsole打印:

 

Channel connected [id: 0xcb6626ea, /127.0.0.1:49503 => /127.0.0.1:8080] CONNECTED: /127.0.0.1:49503
  

  Clientconsole打印:

 

Client Channel connected [id: 0xc4c911b3, /127.0.0.1:49503 => /127.0.0.1:8080] CONNECTED: /127.0.0.1:8080
Client reseived message : success
  

  Server的处理器MyChannelHandler中,回写消息调用了Channel.write(Object)方法。

 

Channel ch = e.getChannel();
ChannelBuffer cb = ChannelBuffers.wrappedBuffer("success".getBytes()) ;
ch.write(cb);
 

 

  既然write方法的参数为Object,为什么这里不能直接传入字符串”success”呢,而是封装到ChannelBuffer类型的对象中,ChannelBuffer是做什么的。概念篇没有提到,而放在这里刚好合适。

  NIO中的Buffer

   前面提到过,在NIO中同样存在一个缓冲区,叫做ByteBuffer,来配合Channel的使用。在ByteBuffer内部存储数据的实质为一个字节数组,如:final byte[] hb,并定义了四个标记来管理它。其中包括:mark <= position <= limit <= capacity。其中capacity用来表示缓冲区的大小;position用来标识下一个可读取或者写入的位置;limit表示读取或者写入的上限位置,如果要在>=limit的位置做读写操作会抛出异常;mark用来记录当前position的值,记录之后position随着读写发生变化,在调用reset()方法时,会将position恢复为mark记录的值。在buffer中提供了很多putget方法来放入和读取数据,这里不多做介绍,可以查看API。但其中有几个重要的方法需要关注:

 

1.  flip()方法:在读取或者写入n个字节(position + n < limit)后,position += n。如果是先读取数据到buffer后写入到Channel,必须position的值回退到起初的值,并且将limit设置为有效位置,才能让读入的数据真正的写入Channel。调用flip()方法后,buffer中的四个标记会发生以下变化:

 

limit = position;
position = 0;
mark = -1;
 

2.  clear()方法:同样的,将buffer中的数据写入Channel后,再读取一些数据到buffer中,此时往往需要将各标记的值归位,当做一个新的buffer来使用(当然也有特殊情况)。调用clear()方法后,标记变化如下:

 

position = 0;
limit = capacity;
mark = -1;
 

3.  rewind()方法:如果发现刚才从Channel读取的数据需要重新读取,可以调用该方法。调用后标记变化如下:

 

position = 0;
mark = -1;

  尤其是flip()clear()方法,在使用的过程中会频繁用到,否则会造成读取和写入的错乱。

  ByteBuffer主要有两个继承的类分别是:HeapByteBufferMappedByteBuffer。他们的不同之处在于HeapByteBuffer会在JVM的堆上分配内存资源,而MappedByteBuffer的资源则会由JVM之外的操作系统内核来分配。DirectByteBuffer继承了MappedByteBuffer,采用了直接内存映射的方式,将文件直接映射到虚拟内存,同时减少在内核缓冲区和用户缓冲区之间的调用,尤其在处理大文件方面有很大的性能优势。但是在使用内存映射的时候会造成文件句柄一直被占用而无法删除的情况,网上也有很多介绍。 

 

  Netty中的Buffer

   Netty中使用ChannelBuffer来处理读写,之所以废弃ByteBuffer,官方说法是ChannelBuffer简单易用并且有性能方面的优势。在ChannelBuffer中使用ByteBuffer或者byte[]来存储数据。同样的,ChannelBuffer也提供了几个标记来控制读写并以此取代ByteBufferpositionlimit,分别是:

0 <= readerIndex <= writerIndex <= capacity,同时也有类似于markmarkedReaderIndexmarkedWriterIndex。当写入buffer时,writerIndex增加,从buffer中读取数据时readerIndex增加,而不能超过writerIndex。有了这两个变量后,就不用每次写入buffer后调用flip()方法,方便了很多。在ChannelBuffer中有几个重要的类继承,如下图:



   AbstractChannelBuffer中实现了基本的方法;HeapChannelBuffer是对NIOheapBuffer的封装,它有两个继承类:BigEndianHeapChannelBufferLittleEndianHeapChannelBuffer(试想,我们将一个int类型(32)的数据放入内存中,内存会以什么样的顺序放入这32位的数据呢?这就分为big-endianlittle-endian的字节序,big-endian就是说将数据的高位放在内存地址更小的位置,little-endian是将低位放在内存地址更小的位置,选择和所用硬件和操作系统相同的字节序有利于提高性能);ByteBufferBackedChannelBuffer是对NIOderectBuffer的封装;DynamicChannelBuffer继承于AbstractChannelBuffer,实现了buffer的自动扩容;CompositeChannelBuffer也是继承于AbstractChannelBuffer,抽象了操作多个buffer的情况,将多个buffer有序的放入数组中,通过计算找出要操作的buffer的下标,而不是将多个buffer复制到一个更大的buffer中;实现WrappedChannelBuffer接口的类主要是对buffer进行进一步的包装,一般由netty框架内部调用;ReplayingDecoderBuffer用于封装了解码时常有的处理,配合ReplayDecoder使用,后面会对编码解码做专门研究。

  ChannelBuffer往往由BufferFactory或者ChannelBuffers类来创建实例。

  回到本实例,netty在读取到客户端的msg时,根据用户配置的BufferFactory的不同会将消息封装成derectBuffer或者heapBuffer。所以,当我们在接收数据时,pipline中第一个upstreamHandler拿到的msg(e.getMessage())虽然是Object类型,但是肯定是这两种形式的buffer。那么,我们在写回数据的时候能不能直接使用其他类型呢,答案是:pipline中第一个downstreamHandler不能随意的放入其他对象,原因是piplinedownstream事件是从tail端往上执行的,所以第一个downstreamHandler调用的channel.write()或者channels.write()方法传入的object会直接传递给netty底层处理,而在netty的写入出口中,只接收两种类型的对象:ChannelBufferFileRegionFileRegion可用于0-copy的文件传输,一般情况下,应用程序向socket发送文件流时,操作系统需要先将文件的字节流存储到内核缓冲区(file_read_buffer),然后拷贝到用户缓冲区,在由用户缓冲区拷贝到内核缓冲区(socket_buffer)由协议引擎发送。这样会创建四个缓冲区,两次复制的过程。所谓零拷贝是指:内核通过DMA引擎,直接将file_read_buffer的数据copysocket_buffer中,而在后面的改进中废除了复制的操作,给socket_buffer增加了数据的位置和长度信息描述,直接将数据从file_read_buffer传递给协议引擎。在netty中只有选择NIO模型才能支持0-copy,当然JDK版本或者操作系统不支持也是不行的)。所以本实例先将字符串”success”方法放入到ChannelBuffer中,然后在调用write方法。

  多处理器

  再来看看Client端,在接收到数据后也需要从ChannelBuffer中取出字节,然后再转换成我们想要的String。当然ChannnelBuffer提供了很多如何取出一些基本类型数据的方法,在必要的时候可以使用。但是这样还是不太方便,我们能不能直接获得一个String对象呢?这里就可以使用前面介绍过的在pipline中添加一个handler专门用来解码,转换成我们所需要的String类型,再传递给MyClientHandler

  首先添加一个解码的handler:

 

class StringClientHandler extends SimpleChannelUpstreamHandler {
		@Override
		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
				throws Exception {
			try {
				ChannelBuffer buf = (ChannelBuffer) e.getMessage();
				byte[] bytes = buf.array();
				System.out
						.println("Client reseived message and convert it to a String!");
				Channels.fireMessageReceived(e.getChannel(), new String(bytes));
			} catch (Exception ex) {
				ex.printStackTrace();
				throw ex;
			}
		}
}

   修改MyClientHandlermessageReceived方法,直接将收到的msg当做字符串来处理。

     class MyClientHandler extends SimpleChannelUpstreamHandler {

		…
		…
		…
		@Override
		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
				throws Exception {
			try {
				String msg = (String) e.getMessage();
				System.out.println("Client reseived message : " + msg);
			} catch (Exception ex) {
				ex.printStackTrace();
				throw ex;
			}
		}
	}
 

   StringClientHandler放到pipline中的第一个位置:

  class Client {

	…
…
	StringClientHandler stringHandler = new StringClientHandler();
	public Client() {
		…
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(stringHandler, myHandler);
			}
		});
	}
…
…
}

   这样再次运行程序,客户端就会有这样的信息打印出来:

 

Client reseived message and convert it to a String!
Client reseived message : success
 

   StringClientHandler中首先处理了接收到的ChannelBuffer,然后转换成String,在通过方法Channels.fireMessageReceived(ctx, new String(bytes));向下传递消息。注意这个方法,如果没有调用,对于msg的处理就会到此为止,MyClientHandler将不会运行。Channels为我们提供了很多方法用于向上或者向下传递事件,对应于概念篇中讲到的各种事件。其中以fire开头方法用于upstream事件,而例如write这样的方法主要用于downstream事件。而这些方法往往是成对出现的,例如:fireChannelOpen(Channel channel)fireChannelOpen(ChannelHandlerContext ctx)等,由于这两个方法有不同的参数,造成了流程的不同,如果参数是Channel,则整个流程会从piplinehead upstreamHandler开始重新执行,如果参数是ChannelHandlerContext,则会直接执行下一个upstreamHandler。同样,write(Channel channel, Object message)write(ChannelHandlerContext ctx, ChannelFuture future, Object message)等方法,出了多了一个future外,和前面的两个方法相同,流程也会相同,如果没有传入ChannelFuture,则会在方法中的第一步中创建一个future来支持netty的异步事件处理机制。

  所以,如果我们将Channels.fireMessageReceived(ctx, new String(bytes));改为Channels.fireMessageReceived(e.getChannel(), new String(bytes));则会出现下面的异常:

 

java.lang.ClassCastException: java.lang.String cannot be cast to org.jboss.netty.buffer.ChannelBuffer

   这是因为在第一次接收到数据后,将msg转换为String类了,并调用Channels.fireMessageReceived(e.getChannel(), new String(bytes));,通知pipline从头upstreamHandler开始执行,即执行StringClientHandler.messageReseived,此时msg已经是String类型了,所以转换成ChannelBuffer就会报错。如果在应用程序中没有打印异常信息,而我们应用的日志级别在WARN以上的话,我们将看不到异常信息,所以最好的方式是在handler中实现exceptionCaught方法来处理异常。

 

 

  • 大小: 50.8 KB
分享到:
评论
2 楼 ykdsg 2014-09-09  
楼上正解,看了下源码才知道Channels.fireMessageReceived(e.getChannel(), new String(bytes));  是从头开始执行的
1 楼 jacky2014 2014-03-14  
Channels.fireMessageReceived(e.getChannel(), new String(bytes));
应该改成
Channels.fireMessageReceived(ctx, str); 
否则还是传回StringClientHandler处理

相关推荐

Global site tag (gtag.js) - Google Analytics