web消息系统设计-持久化


大家好,

第一次发帖,不要拍砖啊,哈哈。

目前正在设计消息系统这块,已经有了2种方案,都是比较不错的,但是现在纠结在消息持久化这块。

第一种比较传统:收到消息后直接存入数据库,标记未读,当用户刷新页面,或者跳转的时候,会拉取下未读消息,并显示在导航栏那里。

第二种比较前卫:用nodejs+socket.io+redis(sub/pub)做的,优点是不用刷新页面用户就可以直接收到提醒,而且引入scoket.io以后还能做很多事情,个人打算采用第二种。

但是现在纠结在消息的持久化方面,如果在web层做消息入库,那么我发现socket.io除了在用户不刷新页面的时候“ 弹一下 ”,真没什么用了。反正消息已经存进去了。

要么就是web只管往redis里面publish一个消息,由node这里统一入库,并通知。我第一次用nodejs,对性能没什么信心,主要是nodejs对mysql的操作方面。

想听听大家的意见和看法, 谢谢!!


补充: 刚才被一位qq好友一顿批评:

这你还用问吗?如果你采用第二种方案,当然是在nodejs做持久化了,你引入redis的pub/sub干什么用的啊!?不就是为了分布式吗?能从不同的地方发消息吗?难道我发一条消息还要先在你的数据库保存一下,再做redis的publish吗?如果是那样,你的nodejs可以扔了。。。。

还是想听听大家的意见。。。。。谢谢。。。。。

消息传递 websocket node.js socket.io

kukuli 12 years ago

socket.io 其实就是每秒访问一次服务器,看有没有消息,效率不是很高

狂拽√龙少 answered 12 years ago

comet + redis
redis的数据落地

性能高,速度快

宁静的哈哈哈 answered 12 years ago

第一种明显更具有挑战性 高性能 高可用等特点了 何不尝试?

NUKED answered 12 years ago

对于第二种方案经过了2天的实践,最终放弃了,还是选择了第一种简单的。 首先说明:第二种方案是可行的


开始分享代码吧

我的系统对nodejs定位只是用到websocket这块,不提供http服务(ngxin提供),所以没用到任何框架express,sails什么的,再说我喜欢从最基本的代码来了解一个新东西,好了,先看下package.json:


 "dependencies": {
        "log4js": "0.6.21",
        "cookie": "0.1.2",
        "redis": "0.12.1",
        "mysql": "2.5.3",
        "generic-pool": "2.1.1",
        "async": "0.9.0",
        "string-template": "0.2.0",
        "socket.io": "0.9.17"
    }

string-template 是用来输出消息的,别的大家应该很熟悉了。先看下 app.js


 global.CHANNEL = 'test.socket' //redis的订阅频道
var Test = require('./test'); //自定义的一个module
var test = new Test();
var sub = redis.createClient(6379, '127.0.0.1', {}); //负责订阅的一个redis链接
sub.subscribe(CHANNEL) //订阅频道
sub.on('message', function(channel, message) { //收到频道消息的事件通知,消息格式json
        var json = null
        try {
            json = JSON.parse(message)
            test.emit(io, json) //处理消息
        } catch (e) {
            LOG.error('exception in emit message: ' + e)
        }
    })
    //mysql连接池
global.MYSQL_POOL = poolModule.Pool({
    name: 'mysql',
    create: function(callback) {
        var connection = mysql.createConnection({
            host: 'localhost',
            user: 'root',
            password: '',
            charset: 'utf8_general_ci',
            database: 'test'
        });
        callback(null, connection);
    },
    destroy: function(connection) {
        connection.end();
    },
    max: 5,
    min: 2,
    idleTimeoutMillis: 30000,
    log: false
});
var io = require('socket.io').listen(3000, {
    'log level': 1
});
//设置socket验证机制
io.configure(function() {
    io.set('authorization', function(handshake, accept) {
        test.authorize(handshake, accept)
    });
});
//链接进来的事件
io.sockets.on('connection', function(socket) {
    test.connect(socket)
})

/* shutdown event for pm2 */
process.on('SIGTERM', function() {
    sub.quit();
    LOG.info('destroied subscribe redis')
    MYSQL_POOL.destroyAllNow();
    LOG.info('destroied mysql pool')
    process.exit(0)
});

/* shutdown event for Ctrl+C */
process.on('SIGINT', function() {
    sub.quit();
    LOG.info('destroied subscribe redis')
    MYSQL_POOL.destroyAllNow();
    LOG.info('destroied mysql pool')
    process.exit(0)
});

test.js 的authorize方法,判断链接是否可信,我用的cookie做的验证


 this.authorize = function(handshake, accept) {
    var cookies = cookie.parse(handshake.headers.cookie)
    //检查cookie
    if(ok) {
        handshake.userId = userId //把userId放到握手信息里面,后面的connect才可以拿到
    }
    ....
    //允许accept(null, true)-200,拒绝accept(null, false)-403,服务端错误accept(err, false)--500
};

test.js 的connect方法,用户通过验证后的链接逻辑:


 this.connect = function(socket) { //用户通过验证后链接进来的事件
    socket.join('user'); //默认加入所有用户频道
    socket.join('user:' + socket.handshake.userId); //默认加入该用户的专用频道
    async.waterfall([ //用到了async库,配合连接池,避免callback地狱
        function(callback) {
            MYSQL_POOL.acquire(function(err, client) {
                callback(err, client)
            });
        },
        function(client, callback) {
            //取出该用户数据库中所有的订阅频道
            client.query('select room from room where userid = ?', [socket.handshake.userId], function(err, rooms) {
                MYSQL_POOL.release(client)//回收链接
                callback(err, rooms)
            });
        },
        function(rooms, callback) {
            rooms.forEach(function(room) {
                //join用户频道
                LOG.debug('user ' + socket.handshake.userId + ' joined room ' + room.room)
                socket.join(room.room)
            });
            callback(null, null)
        }
    ], function(err, result) {
        if (err) {
            LOG.error(err)
        }
    })
};

到这里用户可以正常拦截,链接,并join自己订阅的room了。下面就是怎么从redis的channel里面取到消息通知客户端了,我这里简答的做了一个新用户注册后的消息通知:


 this.emit = function(io, json) {
    LOG.debug(json)
    var event = json.event
    if (event == 'user.register') {//用户注册消息
        sayWelcome(io, json)
    } else {
        throw new Error('undifined event: ' + event)
    }
};

sayWelcome私有方法:


 var WELCOME = '欢迎你的加入 SF 社区!<a href="#">{0}</a>'

//一个新用户在http服务端注册成功后,直接往redis的test.socket频道里面publish一个
//{"userId":"4ab1h89hkk58", "nickname": "土豆2015", "event": "user.register"}
//userId是在http端生成的
sayWelcome = function(io, json) {
    var userId = json.userId
    if (userId == null) {
        throw new Error('user id is null')
    }
    var nickname = json.nickname
    if (nickname == null) {
        throw new Error('user nickname is null')
    }
    async.waterfall([
        function(callback) {
            MYSQL_POOL.acquire(function(err, client) {
                callback(err, client)
            });
        },
        function(client, callback) {
            var msg = format(WELCOME, [nickname]) //格式化欢迎消息
                //往消息库里面插入一个用户消息
            client.query('insert into message(userid, msg) values(?, ?)', [userId, msg], function(err, result) {
                MYSQL_POOL.release(client)
                callback(err)
            });
        },
        function(callback) {
        //向该用户room发送一个消息
            io.sockets.in('user:' + userId).emit('message', '');
            callback(null, null)
        }
    ], function(err, result) {
        if (err) {
            LOG.error(err)
        }
    })
}

客户端,实时通知:


 socket.on('message', function (msg) {
    var count = parseInt($('#message').text());
    count = count + 1//在原有未读消息数目上+1
    $('#message').text(count)
});

用户在导航栏点消息的时候,进入消息页面。


还有好多用法,向所有用户发实时一条消息:
伪代码:


 [REDIS] publish test.channel {"event":"ALL", "msg":"全体通知"}

sayToAll : function(io, json) {
    //消息入库json.msg
    io.sockets.in('user').emit('message', '');
}

当帖子有了新回复时,向关注帖子的用户发一条消息:


 [REDIS] publish test.channel {"event":"topic.notification", "topicId":"hy6123kd"}
var TOPIC_REPLY = '您关注的帖子<a href={0}>{1}</a>有了新回复'
sayToTopic : function(io, json) {
    var topicId = json.topicId
    var topicTitle = //根据id取到title
    var msg = format(TOPIC_REPLY,['topic/'+ topicId, 'topicTitle'])
    //消息入库msg
    io.sockets.in('topic:' + topicId).emit('message', '');
}


如果不打算做nodejs分布式的可以不用看下面,上面已经可行了。

好了,到了这里,好像都很ok,消息系统从原来的http移植到了nodejs,但是我发现了一个很严重的问题:

引入redis的pub/sub就是为了分布式设计的,现在我在另一个端口启动一个同样的nodejs,2个nodejs同时监听 test.socket频道。那么,同样的一个消息会被写入数据库2次,N个nodejs,那么同样的消息会被写入N次。因为大家都在监听同样的频道。

这个问题也能解决,在redis端先生成一个新消息的id,然后用redis的setnx命令去锁这个id,只可能被一个nodejs线程锁到,抢到的进行消息入库,没锁到的只负责通知。

但是这么一来,如此简单的一个消息系统被设计的这么复杂。。。。。。。。
我个人对设计一个理解:

需求本身有一定复杂程度,为了实现这个需求我们要引入另外一些复杂度,当被引入的复杂程度大于需求本身的复杂程度时,那么这个设计是失败的。

于是打算回归第一种简单的设计,也许我对nodejs的这个使用场景根本就是错误的,欢迎大家一起聊聊。

yintama answered 12 years ago

Your Answer