`
xidajiancun
  • 浏览: 455462 次
文章分类
社区版块
存档分类
最新评论

netty-factorial

 
阅读更多
package org.q.netty.factorial;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class FactorialServer {
	
	public static void main(String[] args) throws Exception {
        ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));

        bootstrap.setPipelineFactory(new FactorialServerPipelineFactory());

        bootstrap.bind(new InetSocketAddress(9999));
    }

}

package org.q.netty.factorial;

import java.util.logging.Logger;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
import org.jboss.netty.handler.codec.compression.ZlibWrapper;

public class FactorialServerPipelineFactory implements ChannelPipelineFactory {

	private static final Logger logger = Logger
			.getLogger(FactorialClientHandler.class.getName());

	public FactorialServerPipelineFactory() {
		logger.info("FactorialServerPipelineFactory");
	}

	public ChannelPipeline getPipeline() throws Exception {

		logger.info("getPipeline");

		ChannelPipeline pipeline = Channels.pipeline();

		// Enable stream compression (you can remove these two if unnecessary)
		pipeline.addLast("deflater", new ZlibEncoder(ZlibWrapper.GZIP));
		pipeline.addLast("inflater", new ZlibDecoder(ZlibWrapper.GZIP));

		// Add the number codec first,
		pipeline.addLast("decoder", new BigIntegerDecoder());
		pipeline.addLast("encoder", new NumberEncoder());

		// and then business logic.
		// Please note we create a handler for every new channel
		// because it has stateful properties.
		pipeline.addLast("handler", new FactorialServerHandler());

		return pipeline;
	}
}

package org.q.netty.factorial;

import java.math.BigInteger;
import java.util.logging.Logger;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

public class BigIntegerDecoder extends FrameDecoder {
	
	private static final Logger logger = Logger.getLogger(BigIntegerDecoder.class.getName());
	
	public BigIntegerDecoder() {
		logger.info("BigIntegerDecoder...");
	}

	@Override
	protected Object decode(
			ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
		logger.info("decode...");
		// Wait until the length prefix is available.
        if (buffer.readableBytes() < 5) {
            return null;
        }
        buffer.markReaderIndex();

        // Check the magic number.
        int magicNumber = buffer.readUnsignedByte();
        if (magicNumber != 'F') {
            buffer.resetReaderIndex();
            throw new CorruptedFrameException(
                    "Invalid magic number: " + magicNumber);
        }

        // Wait until the whole data is available.
        int dataLength = buffer.readInt();
        if (buffer.readableBytes() < dataLength) {
            buffer.resetReaderIndex();
            return null;
        }

        // Convert the received data into a new BigInteger.
        byte[] decoded = new byte[dataLength];
        buffer.readBytes(decoded);

        return new BigInteger(decoded);
	}

}

package org.q.netty.factorial;

import java.math.BigInteger;
import java.util.logging.Logger;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

public class NumberEncoder extends OneToOneEncoder {
	
	private static final Logger logger = Logger.getLogger(FactorialClientHandler.class.getName());
	
	public NumberEncoder() {
		logger.info( "NumberEncoder");
	}

    @Override
    protected Object encode(
            ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
    	logger.info("encode");
        if (!(msg instanceof Number)) {
            // Ignore what this encoder can't encode.
            return msg;
        }

        // Convert to a BigInteger first for easier implementation.
        BigInteger v;
        if (msg instanceof BigInteger) {
            v = (BigInteger) msg;
        } else {
            v = new BigInteger(String.valueOf(msg));
        }

        // Convert the number into a byte array.
        byte[] data = v.toByteArray();
        int dataLength = data.length;

        // Construct a message.
        ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
        buf.writeByte((byte) 'F'); // magic number
        buf.writeInt(dataLength);  // data length
        buf.writeBytes(data);      // data

        return buf;
    }
}

package org.q.netty.factorial;

import java.math.BigInteger;
import java.util.Formatter;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

public class FactorialServerHandler extends SimpleChannelUpstreamHandler {

	private static final Logger logger = Logger.getLogger(FactorialClientHandler.class.getName());
	
	private int lastMultiplier = 1;
    private BigInteger factorial = new BigInteger(new byte[] { 1 });
    
    public FactorialServerHandler() {
    	logger.info("FactorialServerHandler");
    }

    /**
     * 服务端接收到数数据时处理阶乘
     */
    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    	logger.info( "messageReceived."+e.getChannel().getId());
        // Calculate the cumulative factorial and send it to the client.
        BigInteger number;
        if (e.getMessage() instanceof BigInteger) {
            number = (BigInteger) e.getMessage();
        } else {
            number = new BigInteger(e.getMessage().toString());
        }
        lastMultiplier = number.intValue();
        factorial = factorial.multiply(number);
        e.getChannel().write(factorial);
    }
    
    @Override
	public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
			throws Exception {
    	logger.info( "handleUpstream."+e.getChannel().getId());
    	logger.info( e.toString());
    	super.handleUpstream(ctx, e);
	}

	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
			throws Exception {
		super.channelConnected(ctx, e);
	}

	@Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		logger.info( "channelDisconnected."+e.getChannel().getId());
		logger.info(new Formatter().format(
                "Factorial of %,d is: %,d", lastMultiplier, factorial).toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
    	logger.log(
                Level.WARNING,
                "Unexpected exception from downstream.",
                e.getCause());
        e.getChannel().close();
    }
}

package org.q.netty.factorial;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;


public class FactorialClient {
	
	public static void main(String[] args) throws Exception {
		/*byte[] b = new byte[1];
		System.in.read(b);
		String cStr = new String(b);
		System.out.println(cStr);
		if(!Character.isDigit(cStr.charAt(0))) {
			System.out.println("请输入数字!");
			return ;
		}*/
		ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
		ClientBootstrap bootstrap = new ClientBootstrap(factory);
		int count = 6;//Integer.parseInt(cStr);
		bootstrap.setPipelineFactory(new FactorialClientPipelineFactory(count));
		ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 9999));
		//等待future.connect执行成功
        Channel channel = future.awaitUninterruptibly().getChannel();
        
        FactorialClientHandler handler = (FactorialClientHandler) channel.getPipeline().getLast();
        
        System.err.format("FactorialClient: Factorial of %d is: %d", count, handler.getFactorial());
        
//        bootstrap.releaseExternalResources();
	}
}

package org.q.netty.factorial;

import java.util.logging.Logger;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.compression.ZlibDecoder;
import org.jboss.netty.handler.codec.compression.ZlibEncoder;
import org.jboss.netty.handler.codec.compression.ZlibWrapper;

public class FactorialClientPipelineFactory implements ChannelPipelineFactory {
	
	private static final Logger logger = Logger.getLogger(FactorialClientHandler.class.getName());
	
	private int count;

	public FactorialClientPipelineFactory() {
		
	}
	
	public FactorialClientPipelineFactory(int count) {
		this.count = count;
		logger.info(""+count);
	}

	public ChannelPipeline getPipeline() throws Exception {
		
		logger.info("getPipeline");
		
		ChannelPipeline pipeline = Channels.pipeline();
		
		//对需要传输的数据进行压缩, 非必要
        pipeline.addLast("deflater", new ZlibEncoder(ZlibWrapper.GZIP));
        pipeline.addLast("inflater", new ZlibDecoder(ZlibWrapper.GZIP));

        //添加逻辑组件之前先添加codes转码组件
        pipeline.addLast("decoder", new BigIntegerDecoder());
        pipeline.addLast("encoder", new NumberEncoder());

        //添加逻辑组件
        pipeline.addLast("handler", new FactorialClientHandler(count));

        return pipeline;
	}

}

package org.q.netty.factorial;

import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;

public class FactorialClientHandler extends SimpleChannelHandler {
	
	private static final Logger logger = Logger.getLogger(FactorialClientHandler.class.getName());
	
	private int i = 1;
    private int receivedMessages = 0;
    private int count;
    final BlockingQueue<BigInteger> answer = new LinkedBlockingQueue<BigInteger>();

    public FactorialClientHandler() {}
    
    public FactorialClientHandler(int count) {
        this.count = count;
        logger.info(""+count);
    }

    public BigInteger getFactorial() {
    	logger.info("getFactorial");
        boolean interrupted = false;
        for (;;) {
        	logger.info("id:"+Thread.currentThread().getId()+",name:"+Thread.currentThread().getName()+",isAlive:"+Thread.currentThread().isAlive()+",size:"+answer.size());
            try {
                BigInteger factorial = answer.take();
                logger.info("after answer.take(),interrupted="+interrupted+", factorial="+factorial);
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
                return factorial;
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }
    }
    /**
     * 将下传的事件类型转换为具体的子类型事件, 并调用对应的处理方法
     */
    @Override
    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
    	logger.info("handleUpstream."+e.getChannel().getId());
        if (e instanceof ChannelStateEvent) {
            e.toString();
        }
        super.handleUpstream(ctx, e);
    }
    
    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
    	logger.info("channelConnected."+e.getChannel().getId());
        sendNumbers(e);
    }

    @Override
    public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
    	logger.info("channelInterestChanged."+e.getChannel().getId());
        sendNumbers(e);
    }
    
    @Override
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
    	logger.info("messageReceived."+e.getChannel().getId()+",receivedMessages="+receivedMessages);
        receivedMessages ++;
        if (receivedMessages == count) {
            e.getChannel().close().addListener(new ChannelFutureListener() {
                public void operationComplete(ChannelFuture future) {
                	logger.info("messageReceived."+e.getMessage());
                    boolean offered = answer.offer((BigInteger) e.getMessage());
                    assert offered;
                }
            });
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        e.getCause().printStackTrace();
        e.getChannel().close();
    }

    private void sendNumbers(ChannelStateEvent e) {
        Channel channel = e.getChannel();
        while (channel.isWritable()) {
            if (i <= count) {
                channel.write(Integer.valueOf(i));
                i ++;
            } else {
                break;
            }
        }
    }
}

1. 服务端启动, 创建FactorialServerPipelineFactory对象
2. 客户端启动, 创建FactorialClientPipelineFactory对象。
3. 客户端获取pipeline管道对象, 初始化pipeline中的handler,并使用获取的pipeline对象创建Channel(pipeline.addLast中管道逻辑处理类FactorialClientHandler要放到最后)
4. 客户端调用handleUpstream方法,之后调用channelConnected方法向服务端写数据。
5. 服务端调用handleUpstream方法,之后调用channelConnected方法。
6. 服务端收到客户端的数据后调用messageReceived处理阶乘。
7. 客户端收到数据后判断数据收发是否完成,若完成则将数据写入LinkedBlockingQueue队列中。
8. 客户端在main方法中获取FactorialClientHandler对象, 之后调用getFactorial获取阶乘结果。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics