`
xidajiancun
  • 浏览: 452900 次
文章分类
社区版块
存档分类
最新评论

netty-time-First Solution

 
阅读更多
package org.q.netty.time1;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class TimeServerHandler extends SimpleChannelHandler {

	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		Channel channel = e.getChannel();
		ChannelBuffer time = ChannelBuffers.buffer(4);
		time.writeInt((int)System.currentTimeMillis()/1000);
		ChannelFuture future = channel.write(time);
		future.addListener(new ChannelFutureListener() {
			public void operationComplete(ChannelFuture future) throws Exception {
				Channel c = future.getChannel();
				c.close();
			}
		});
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
		e.getCause().printStackTrace();
		e.getChannel().close();
	}
}


To send a new message, we need to allocate a new buffer which will contain the message. We are going to write a 32-bit integer, and therefore we need aChannelBuffer whose capacity is 4 bytes. The ChannelBuffers helper class is used to allocate a new buffer. Besides thebuffer method,ChannelBuffers provides a lot of useful methods related to theChannelBuffer. For more information, please refer to the API reference.

On the other hand, it is a good idea to use static imports for ChannelBuffers:

import static org.jboss.netty.buffer.ChannelBuffers.*;
...
ChannelBuffer  dynamicBuf = dynamicBuffer(256);
ChannelBuffer ordinaryBuf = buffer(1024);


But wait, where's the flip? Didn't we used to callByteBuffer.flip() before sending a message in NIO?ChannelBuffer does not have such a method because it has two pointers; one for read operations and the other for write operations. The writer index increases when you write something to aChannelBuffer while the reader index does not change. The reader index and the writer index represents where the message starts and ends respectively.

Netty有两个指针读指针和写指针, 分别来控制读操作和写操作。

Channel.write()方法的返回值是ChannelFuture, ChannelFuture代表了该渠道中尚未发生的IO操作。因为netty中所有io操作都是异步的, 如果直接调用Channel.close()可能会导致消息尚未处理完毕,渠道已经关闭, 所以需要通过ChannelFuture.addListener 来添加一个关闭渠道的操作。

f.addListener(ChannelFutureListener.CLOSE);

package org.q.netty.time1;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class TimeClientHandler extends SimpleChannelHandler {

	private final ChannelBuffer buf = ChannelBuffers.dynamicBuffer();

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
			throws Exception {
		e.getCause().printStackTrace();
		e.getChannel().close();
	}

	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
			throws Exception {
		ChannelBuffer timeBuffer = (ChannelBuffer)e.getMessage();
		buf.writeBytes(timeBuffer);
		
		if(buf.readableBytes()>=4) {
			long time = buf.readInt()*1000L;
			SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
			Date date = new Date(time);
			System.out.println(date);
			System.out.println(format.format(date));
			e.getChannel().close();
		}
	}
}

package org.q.netty.time1;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

public class TimeClient {
	
	private static final String host = "127.0.0.1";
	private static final int port = 9999;
	
	public static void main(String[] args) {
		ClientBootstrap bootstrap = new ClientBootstrap(
				new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(new TimeClientHandler());
			}
		});
		bootstrap.setOption("tcpNoDelay", true);
		bootstrap.setOption("keepAlive", true);
		bootstrap.connect(new InetSocketAddress(host, port));
	}

}


分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics