Netty如何实现comet长连接服务?


Netty如何实现comet长连接服务?有谁在这方面有经验的?

java netty comet

windry 11 years, 10 months ago

netty是一个nio的服务器和客户端的框架,它为我们快速创建高性能的网络应用提供了方便。

Netty对http支持的很好,自带的httpstaticfileserver的例子甚至可用来做来开发环境的静态服务器。 用它来实现comet只是稍加改造的事儿, 只需要把request和response保存起来,在需要发送数据到客户端时调用response。

comet不是一项专门的技术,更像是一个解决方案。说来也简单,服务端需要能把connection hold,浏览器也需要特殊的支持保持从服务端获取数据,幸好用xmlhttprequest,ajax的实现也是靠它。comet一般有两种方式long poll和streaming。长轮询是client发起请求以后,服务端发现没有数据要发给客户端就先hold起来,直到数据要发给client,请求本次请求。客户端需要不断的发起这样的请求。长轮询相比定时发起的短轮询显然能减少对服务器的请求。注意虚线部分有一个close的动作,然后再发起一次请求。

long poll

另外一种方式是streaming,从名字上就看出,他是在建立长连接以后源源不断地从服务端接收数据。而不需要关闭再重现建立。不过这种方式也有点限制Streaming方式需要responseText有内容或者内容不同于上次时触发readyStateReady,IE在内容改变时不会触发readyStateChange,这样就接收不到新的内容。

streaming

需要注意的地方:

  1. 想要源源不断向浏览器写数据,那不能用content-length这样的length header的编码方式,而得用chunked方式。期间深入了解了一下chunked编码的方式。以及在netty中如何使用chunk
  2. 接收信息采用一个长连接,而发送消息采用短链接。
  3. xmlhttprequest的readyState状态有5种, 分别是0-4, 其中,2是收到所有的header并解析;3是开始接收body中的内容。在Chrome下运行正常反而在ff下不能好好的运行,后来仔细查看一番firefox的xmlhttprequest的文档发现用错了一个参数multipart,这个参数貌似只在ff下有。

下面给一段实例代码:

package com.tianjiaguo.netty.comet;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.*;
import org.jboss.netty.util.CharsetUtil;

import java.io.File;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer;
import static org.jboss.netty.channel.Channels.pipeline;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;

/**
* 12-6-26 下午4:07
*
* @author jiaguotian Copyright 2012 Sohu.com Inc. All Rights Reserved.
*/
public class NettyCometTest {
    private static class Connection {
        private final Executor executor;
        private final HttpRequest httpRequest;
        private final ChannelHandlerContext ctx;

        private Connection(Executor executor, HttpRequest httpRequest, ChannelHandlerContext ctx) {
            this.executor = executor;
            this.httpRequest = httpRequest;
            this.ctx = ctx;
        }

        public void send(String message) {
            ctx.getChannel().write(copiedBuffer("data:" + message + "\n\n", CharsetUtil.UTF_8));
        }
    }

    private static class ChannelHandler extends SimpleChannelUpstreamHandler {
        private ExecutorService executor;
        private Connection connection = null;

        public ChannelHandler(ExecutorService executor) {
            this.executor = executor;
        }

        @Override
        public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("close");
                        if (connection != null) {
                            pusher.removeConnection(connection);
                        }
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                }
            });
        }

        @Override
        public void messageReceived(final ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
            final HttpRequest httpRequest = (HttpRequest) messageEvent.getMessage();
            final HttpResponse httpResponse = new DefaultHttpResponse(HTTP_1_1, OK);
            final org.jboss.netty.channel.ChannelHandler channelHandler = this;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(httpRequest.getUri());
                        String path = queryStringDecoder.getPath();
                        if ("/index.html".equals(path)) {
                            File file = new File("com/tianjiaguo/netty/comet");
                            URL resourceURL = getClass().getClassLoader().getResource(new File(file, "index.html").getPath());
                            InputStream stream = resourceURL.openStream();
                            byte[] bytes = new byte[stream.available()];
                            try {
                                stream.read(bytes);
                            } finally {
                                if (stream != null) {
                                    stream.close();
                                }
                            }
                            httpResponse.addHeader("Content-Type", "text/html");
                            httpResponse.addHeader("Content-Length", bytes.length);
                            ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
                            channelBuffer.writeBytes(copiedBuffer(bytes));
                            if (httpResponse.getStatus().getCode() != 200) {
                                httpResponse.setContent(copiedBuffer(httpResponse.getStatus().toString(), CharsetUtil.UTF_8));
                                httpResponse.addHeader("Content-Length", httpResponse.getContent().readableBytes());
                            }
                            try {
                                httpResponse.setContent(channelBuffer);
                                ChannelFuture future = ctx.getChannel().write(httpResponse);
                                future.addListener(ChannelFutureListener.CLOSE);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        } else {
                            httpResponse.setStatus(HttpResponseStatus.OK);
                            httpResponse.addHeader("Content-Type", "text/event-stream");
                            httpResponse.addHeader("Cache-Control", "no-cache");
                            ctx.getChannel().write(httpResponse);
                            ChannelPipeline p = ctx.getChannel().getPipeline();
                            p.remove("aggregator");
                            p.replace("handler", "sse_handler", channelHandler);

                            connection = new Connection(executor, httpRequest, ctx);
                            pusher.addConnection(connection);
                        }
                    } catch (Exception exception) {
                        exception.printStackTrace();
                    }
                }
            });
        }
    }

    public static class Pusher implements Runnable {
        private List<connection> connections = new ArrayList<connection>();

        public Pusher(final Executor webThread) {
            Executor pusherThread = Executors.newCachedThreadPool();
            pusherThread.execute(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        webThread.execute(Pusher.this);
                        try {
                            sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }

        @Override
        public void run() {
            broadcast(String.format("%s %s", connections.size(), new Date().toString()));
        }

        private void broadcast(String message) {
            for (Connection connection : connections) {
                connection.send(message);
            }
        }

        public void addConnection(Connection connection) {
            connections.add(connection);
        }

        public void removeConnection(Connection connection) {
            connections.remove(connection);
        }
    }

    private static final Pusher pusher = new Pusher(newCachedThreadPool());

    public static void main(String[] argv) {
        final ExecutorService executor = newCachedThreadPool();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = pipeline();
                pipeline.addLast("decoder", new HttpRequestDecoder());
                pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
                pipeline.addLast("encoder", new HttpResponseEncoder());
                pipeline.addLast("handler", new ChannelHandler(executor));
                return pipeline;
            }
        });

        bootstrap.setFactory(new NioServerSocketChannelFactory(
                newCachedThreadPool(),
                newCachedThreadPool()));
        Channel channel = bootstrap.bind(new InetSocketAddress(8888));
        if (channel.isConnected()) {
            System.out.println("connected");
        }
    }
}

这里只是一个例子,例子虽简单,却提供了一个思路。

Saerdna answered 10 years, 3 months ago

Your Answer