Answers
netty是一个nio的服务器和客户端的框架,它为我们快速创建高性能的网络应用提供了方便。
Netty对http支持的很好,自带的httpstaticfileserver的例子甚至可用来做来开发环境的静态服务器。 用它来实现comet只是稍加改造的事儿, 只需要把request和response保存起来,在需要发送数据到客户端时调用response。
comet不是一项专门的技术,更像是一个解决方案。说来也简单,服务端需要能把connection hold,浏览器也需要特殊的支持保持从服务端获取数据,幸好用xmlhttprequest,ajax的实现也是靠它。comet一般有两种方式long poll和streaming。长轮询是client发起请求以后,服务端发现没有数据要发给客户端就先hold起来,直到数据要发给client,请求本次请求。客户端需要不断的发起这样的请求。长轮询相比定时发起的短轮询显然能减少对服务器的请求。注意虚线部分有一个close的动作,然后再发起一次请求。
另外一种方式是streaming,从名字上就看出,他是在建立长连接以后源源不断地从服务端接收数据。而不需要关闭再重现建立。不过这种方式也有点限制Streaming方式需要responseText有内容或者内容不同于上次时触发readyStateReady,IE在内容改变时不会触发readyStateChange,这样就接收不到新的内容。
需要注意的地方:
- 想要源源不断向浏览器写数据,那不能用content-length这样的length header的编码方式,而得用chunked方式。期间深入了解了一下chunked编码的方式。以及在netty中如何使用chunk
- 接收信息采用一个长连接,而发送消息采用短链接。
- xmlhttprequest的readyState状态有5种, 分别是0-4, 其中,2是收到所有的header并解析;3是开始接收body中的内容。在Chrome下运行正常反而在ff下不能好好的运行,后来仔细查看一番firefox的xmlhttprequest的文档发现用错了一个参数multipart,这个参数貌似只在ff下有。
下面给一段实例代码:
package com.tianjiaguo.netty.comet;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.http.*;
import org.jboss.netty.util.CharsetUtil;
import java.io.File;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import static java.lang.Thread.sleep;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static org.jboss.netty.buffer.ChannelBuffers.copiedBuffer;
import static org.jboss.netty.channel.Channels.pipeline;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
/**
* 12-6-26 下午4:07
*
* @author jiaguotian Copyright 2012 Sohu.com Inc. All Rights Reserved.
*/
public class NettyCometTest {
private static class Connection {
private final Executor executor;
private final HttpRequest httpRequest;
private final ChannelHandlerContext ctx;
private Connection(Executor executor, HttpRequest httpRequest, ChannelHandlerContext ctx) {
this.executor = executor;
this.httpRequest = httpRequest;
this.ctx = ctx;
}
public void send(String message) {
ctx.getChannel().write(copiedBuffer("data:" + message + "\n\n", CharsetUtil.UTF_8));
}
}
private static class ChannelHandler extends SimpleChannelUpstreamHandler {
private ExecutorService executor;
private Connection connection = null;
public ChannelHandler(ExecutorService executor) {
this.executor = executor;
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
executor.execute(new Runnable() {
@Override
public void run() {
try {
System.out.println("close");
if (connection != null) {
pusher.removeConnection(connection);
}
} catch (Exception e1) {
e1.printStackTrace();
}
}
});
}
@Override
public void messageReceived(final ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception {
final HttpRequest httpRequest = (HttpRequest) messageEvent.getMessage();
final HttpResponse httpResponse = new DefaultHttpResponse(HTTP_1_1, OK);
final org.jboss.netty.channel.ChannelHandler channelHandler = this;
executor.execute(new Runnable() {
@Override
public void run() {
try {
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(httpRequest.getUri());
String path = queryStringDecoder.getPath();
if ("/index.html".equals(path)) {
File file = new File("com/tianjiaguo/netty/comet");
URL resourceURL = getClass().getClassLoader().getResource(new File(file, "index.html").getPath());
InputStream stream = resourceURL.openStream();
byte[] bytes = new byte[stream.available()];
try {
stream.read(bytes);
} finally {
if (stream != null) {
stream.close();
}
}
httpResponse.addHeader("Content-Type", "text/html");
httpResponse.addHeader("Content-Length", bytes.length);
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer();
channelBuffer.writeBytes(copiedBuffer(bytes));
if (httpResponse.getStatus().getCode() != 200) {
httpResponse.setContent(copiedBuffer(httpResponse.getStatus().toString(), CharsetUtil.UTF_8));
httpResponse.addHeader("Content-Length", httpResponse.getContent().readableBytes());
}
try {
httpResponse.setContent(channelBuffer);
ChannelFuture future = ctx.getChannel().write(httpResponse);
future.addListener(ChannelFutureListener.CLOSE);
} catch (Exception e) {
e.printStackTrace();
}
} else {
httpResponse.setStatus(HttpResponseStatus.OK);
httpResponse.addHeader("Content-Type", "text/event-stream");
httpResponse.addHeader("Cache-Control", "no-cache");
ctx.getChannel().write(httpResponse);
ChannelPipeline p = ctx.getChannel().getPipeline();
p.remove("aggregator");
p.replace("handler", "sse_handler", channelHandler);
connection = new Connection(executor, httpRequest, ctx);
pusher.addConnection(connection);
}
} catch (Exception exception) {
exception.printStackTrace();
}
}
});
}
}
public static class Pusher implements Runnable {
private List<connection> connections = new ArrayList<connection>();
public Pusher(final Executor webThread) {
Executor pusherThread = Executors.newCachedThreadPool();
pusherThread.execute(new Runnable() {
@Override
public void run() {
while (true) {
webThread.execute(Pusher.this);
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
@Override
public void run() {
broadcast(String.format("%s %s", connections.size(), new Date().toString()));
}
private void broadcast(String message) {
for (Connection connection : connections) {
connection.send(message);
}
}
public void addConnection(Connection connection) {
connections.add(connection);
}
public void removeConnection(Connection connection) {
connections.remove(connection);
}
}
private static final Pusher pusher = new Pusher(newCachedThreadPool());
public static void main(String[] argv) {
final ExecutorService executor = newCachedThreadPool();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("aggregator", new HttpChunkAggregator(65536));
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", new ChannelHandler(executor));
return pipeline;
}
});
bootstrap.setFactory(new NioServerSocketChannelFactory(
newCachedThreadPool(),
newCachedThreadPool()));
Channel channel = bootstrap.bind(new InetSocketAddress(8888));
if (channel.isConnected()) {
System.out.println("connected");
}
}
}
这里只是一个例子,例子虽简单,却提供了一个思路。
Saerdna
answered 10 years, 4 months ago