twisted中的reactor.callLater可能出现任务不执行的情况


自己在测试利用twisted来做mysql的多个查询任务的并发执行,然后在下面的测试脚本中将三个查询任务的reactor.callLater的delay参数同时置为0时可能出现只执行其中两个任务的情况,也会出现三个任务都执行的情况。报错如下:


 Unhandled error in Deferred:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.6/site-packages/twisted/python/threadpool.py", line 157, in _worker
    context.call(ctx, function, *args, **kwargs)
  File "/usr/lib64/python2.6/site-packages/twisted/python/context.py", line 64, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/usr/lib64/python2.6/site-packages/twisted/python/context.py", line 43, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "/usr/lib64/python2.6/site-packages/twisted/internet/threads.py", line 37, in _putResultInDeferred
    result = f(*args, **kwargs)
  File "AsyncDBClient.py", line 29, in execute_sql
    self.cursor.execute(query)
  File "/usr/lib64/python2.6/site-packages/MySQLdb/cursors.py", line 174, in execute
    self.errorhandler(self, exc, value)
  File "/usr/lib64/python2.6/site-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
    raise errorclass, errorvalue
<class '_mysql_exceptions.ProgrammingError'>: (2014, "Commands out of sync; you can't run this command now")

因而这里有三个问题:mysqldb中对于同一个连接的多个查询是不是阻塞的?twsited中任务调度为什么出现有些任务不执行的情况?twisted中任务调度的延时参数怎么设置?
下面是自己的测试脚本。


 import sys                                                                                                                                       
import MySQLdb
from twisted.internet import reactor, threads
import time

class AsyncDBClient:
    def __init__(self, host, port, user, pwd, db):
        self.host   = host
        self.port   = int(port)
        self.user   = user
        self.passwd = pwd
        self.db     = db
        self.conn   = None

        try:
            self.conn   = MySQLdb.connect(host=self.host,port=self.port,user=self.user,passwd=self.passwd,db=self.db,charset='gbk')
            self.conn.autocommit(1)
            self.cursor = self.conn.cursor()
        except MySQLdb.Error,e:
            sys.exit(1)

    def gend(self, sql, args, callback=None):
        d = threads.deferToThread(self.execute_sql, sql)
        if callback:
            d.addCallback(callback, args)
            #d.addErrCallback()

    def execute_sql(self, query):
        self.cursor.execute(query)
        results = self.cursor.fetchall()                     
        return results

    def close(self):
        print "[+] reactor stop"
        self.cursor.close()
        self.conn.close()
        reactor.stop()

    def fetch(self, delay, sql, callback=None, args={}):
        reactor.callLater(delay, self.gend, sql, args, callback=handle_res)

    def run(self):
        #reactor.callLater(10, self.close)
        reactor.run()


def test():
    async_db = AsyncDBClient('XXXX', XXXX, 'XXXX', 'XXXX', 'XXXX')
    async_db.fetch(0, 'select ip from agent_hosts', handle_res, {'db':async_db, 'is_close': True})
    async_db.fetch(0, 'select task_id from threads', handle_res)
    async_db.fetch(0, 'select task_id from tasks', handle_res)
    async_db.run()

def handle_res(results, args):
    print results
    #if args.has_key("is_close") and args["is_close"] is True:
        #args['db'].close()

if __name__ == "__main__":
    test()

twisted

konasu 10 years, 5 months ago

Your Answer