web消息系统设计-持久化
大家好,
第一次发帖,不要拍砖啊,哈哈。
目前正在设计消息系统这块,已经有了2种方案,都是比较不错的,但是现在纠结在消息持久化这块。
第一种比较传统:收到消息后直接存入数据库,标记未读,当用户刷新页面,或者跳转的时候,会拉取下未读消息,并显示在导航栏那里。
第二种比较前卫:用nodejs+socket.io+redis(sub/pub)做的,优点是不用刷新页面用户就可以直接收到提醒,而且引入scoket.io以后还能做很多事情,个人打算采用第二种。
但是现在纠结在消息的持久化方面,如果在web层做消息入库,那么我发现socket.io除了在用户不刷新页面的时候“ 弹一下 ”,真没什么用了。反正消息已经存进去了。
要么就是web只管往redis里面publish一个消息,由node这里统一入库,并通知。我第一次用nodejs,对性能没什么信心,主要是nodejs对mysql的操作方面。
想听听大家的意见和看法, 谢谢!!
补充: 刚才被一位qq好友一顿批评:
这你还用问吗?如果你采用第二种方案,当然是在nodejs做持久化了,你引入redis的pub/sub干什么用的啊!?不就是为了分布式吗?能从不同的地方发消息吗?难道我发一条消息还要先在你的数据库保存一下,再做redis的publish吗?如果是那样,你的nodejs可以扔了。。。。
还是想听听大家的意见。。。。。谢谢。。。。。
消息传递 websocket node.js socket.io
Answers
对于第二种方案经过了2天的实践,最终放弃了,还是选择了第一种简单的。 首先说明:第二种方案是可行的
开始分享代码吧
我的系统对nodejs定位只是用到websocket这块,不提供http服务(ngxin提供),所以没用到任何框架express,sails什么的,再说我喜欢从最基本的代码来了解一个新东西,好了,先看下package.json:
"dependencies": {
"log4js": "0.6.21",
"cookie": "0.1.2",
"redis": "0.12.1",
"mysql": "2.5.3",
"generic-pool": "2.1.1",
"async": "0.9.0",
"string-template": "0.2.0",
"socket.io": "0.9.17"
}
string-template 是用来输出消息的,别的大家应该很熟悉了。先看下 app.js
global.CHANNEL = 'test.socket' //redis的订阅频道
var Test = require('./test'); //自定义的一个module
var test = new Test();
var sub = redis.createClient(6379, '127.0.0.1', {}); //负责订阅的一个redis链接
sub.subscribe(CHANNEL) //订阅频道
sub.on('message', function(channel, message) { //收到频道消息的事件通知,消息格式json
var json = null
try {
json = JSON.parse(message)
test.emit(io, json) //处理消息
} catch (e) {
LOG.error('exception in emit message: ' + e)
}
})
//mysql连接池
global.MYSQL_POOL = poolModule.Pool({
name: 'mysql',
create: function(callback) {
var connection = mysql.createConnection({
host: 'localhost',
user: 'root',
password: '',
charset: 'utf8_general_ci',
database: 'test'
});
callback(null, connection);
},
destroy: function(connection) {
connection.end();
},
max: 5,
min: 2,
idleTimeoutMillis: 30000,
log: false
});
var io = require('socket.io').listen(3000, {
'log level': 1
});
//设置socket验证机制
io.configure(function() {
io.set('authorization', function(handshake, accept) {
test.authorize(handshake, accept)
});
});
//链接进来的事件
io.sockets.on('connection', function(socket) {
test.connect(socket)
})
/* shutdown event for pm2 */
process.on('SIGTERM', function() {
sub.quit();
LOG.info('destroied subscribe redis')
MYSQL_POOL.destroyAllNow();
LOG.info('destroied mysql pool')
process.exit(0)
});
/* shutdown event for Ctrl+C */
process.on('SIGINT', function() {
sub.quit();
LOG.info('destroied subscribe redis')
MYSQL_POOL.destroyAllNow();
LOG.info('destroied mysql pool')
process.exit(0)
});
test.js 的authorize方法,判断链接是否可信,我用的cookie做的验证
this.authorize = function(handshake, accept) {
var cookies = cookie.parse(handshake.headers.cookie)
//检查cookie
if(ok) {
handshake.userId = userId //把userId放到握手信息里面,后面的connect才可以拿到
}
....
//允许accept(null, true)-200,拒绝accept(null, false)-403,服务端错误accept(err, false)--500
};
test.js 的connect方法,用户通过验证后的链接逻辑:
this.connect = function(socket) { //用户通过验证后链接进来的事件
socket.join('user'); //默认加入所有用户频道
socket.join('user:' + socket.handshake.userId); //默认加入该用户的专用频道
async.waterfall([ //用到了async库,配合连接池,避免callback地狱
function(callback) {
MYSQL_POOL.acquire(function(err, client) {
callback(err, client)
});
},
function(client, callback) {
//取出该用户数据库中所有的订阅频道
client.query('select room from room where userid = ?', [socket.handshake.userId], function(err, rooms) {
MYSQL_POOL.release(client)//回收链接
callback(err, rooms)
});
},
function(rooms, callback) {
rooms.forEach(function(room) {
//join用户频道
LOG.debug('user ' + socket.handshake.userId + ' joined room ' + room.room)
socket.join(room.room)
});
callback(null, null)
}
], function(err, result) {
if (err) {
LOG.error(err)
}
})
};
到这里用户可以正常拦截,链接,并join自己订阅的room了。下面就是怎么从redis的channel里面取到消息通知客户端了,我这里简答的做了一个新用户注册后的消息通知:
this.emit = function(io, json) {
LOG.debug(json)
var event = json.event
if (event == 'user.register') {//用户注册消息
sayWelcome(io, json)
} else {
throw new Error('undifined event: ' + event)
}
};
sayWelcome私有方法:
var WELCOME = '欢迎你的加入 SF 社区!<a href="#">{0}</a>'
//一个新用户在http服务端注册成功后,直接往redis的test.socket频道里面publish一个
//{"userId":"4ab1h89hkk58", "nickname": "土豆2015", "event": "user.register"}
//userId是在http端生成的
sayWelcome = function(io, json) {
var userId = json.userId
if (userId == null) {
throw new Error('user id is null')
}
var nickname = json.nickname
if (nickname == null) {
throw new Error('user nickname is null')
}
async.waterfall([
function(callback) {
MYSQL_POOL.acquire(function(err, client) {
callback(err, client)
});
},
function(client, callback) {
var msg = format(WELCOME, [nickname]) //格式化欢迎消息
//往消息库里面插入一个用户消息
client.query('insert into message(userid, msg) values(?, ?)', [userId, msg], function(err, result) {
MYSQL_POOL.release(client)
callback(err)
});
},
function(callback) {
//向该用户room发送一个消息
io.sockets.in('user:' + userId).emit('message', '');
callback(null, null)
}
], function(err, result) {
if (err) {
LOG.error(err)
}
})
}
客户端,实时通知:
socket.on('message', function (msg) {
var count = parseInt($('#message').text());
count = count + 1//在原有未读消息数目上+1
$('#message').text(count)
});
用户在导航栏点消息的时候,进入消息页面。
还有好多用法,向所有用户发实时一条消息:
伪代码:
[REDIS] publish test.channel {"event":"ALL", "msg":"全体通知"}
sayToAll : function(io, json) {
//消息入库json.msg
io.sockets.in('user').emit('message', '');
}
当帖子有了新回复时,向关注帖子的用户发一条消息:
[REDIS] publish test.channel {"event":"topic.notification", "topicId":"hy6123kd"}
var TOPIC_REPLY = '您关注的帖子<a href={0}>{1}</a>有了新回复'
sayToTopic : function(io, json) {
var topicId = json.topicId
var topicTitle = //根据id取到title
var msg = format(TOPIC_REPLY,['topic/'+ topicId, 'topicTitle'])
//消息入库msg
io.sockets.in('topic:' + topicId).emit('message', '');
}
如果不打算做nodejs分布式的可以不用看下面,上面已经可行了。
好了,到了这里,好像都很ok,消息系统从原来的http移植到了nodejs,但是我发现了一个很严重的问题:
引入redis的pub/sub就是为了分布式设计的,现在我在另一个端口启动一个同样的nodejs,2个nodejs同时监听 test.socket频道。那么,同样的一个消息会被写入数据库2次,N个nodejs,那么同样的消息会被写入N次。因为大家都在监听同样的频道。
这个问题也能解决,在redis端先生成一个新消息的id,然后用redis的setnx命令去锁这个id,只可能被一个nodejs线程锁到,抢到的进行消息入库,没锁到的只负责通知。
但是这么一来,如此简单的一个消息系统被设计的这么复杂。。。。。。。。
我个人对设计一个理解:
需求本身有一定复杂程度,为了实现这个需求我们要引入另外一些复杂度,当被引入的复杂程度大于需求本身的复杂程度时,那么这个设计是失败的。
于是打算回归第一种简单的设计,也许我对nodejs的这个使用场景根本就是错误的,欢迎大家一起聊聊。