twisted中的reactor.callLater可能出现任务不执行的情况
自己在测试利用twisted来做mysql的多个查询任务的并发执行,然后在下面的测试脚本中将三个查询任务的reactor.callLater的delay参数同时置为0时可能出现只执行其中两个任务的情况,也会出现三个任务都执行的情况。报错如下:
Unhandled error in Deferred:
Traceback (most recent call last):
File "/usr/lib64/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.6/site-packages/twisted/python/threadpool.py", line 157, in _worker
context.call(ctx, function, *args, **kwargs)
File "/usr/lib64/python2.6/site-packages/twisted/python/context.py", line 64, in callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "/usr/lib64/python2.6/site-packages/twisted/python/context.py", line 43, in callWithContext
return func(*args,**kw)
--- <exception caught here> ---
File "/usr/lib64/python2.6/site-packages/twisted/internet/threads.py", line 37, in _putResultInDeferred
result = f(*args, **kwargs)
File "AsyncDBClient.py", line 29, in execute_sql
self.cursor.execute(query)
File "/usr/lib64/python2.6/site-packages/MySQLdb/cursors.py", line 174, in execute
self.errorhandler(self, exc, value)
File "/usr/lib64/python2.6/site-packages/MySQLdb/connections.py", line 36, in defaulterrorhandler
raise errorclass, errorvalue
<class '_mysql_exceptions.ProgrammingError'>: (2014, "Commands out of sync; you can't run this command now")
因而这里有三个问题:mysqldb中对于同一个连接的多个查询是不是阻塞的?twsited中任务调度为什么出现有些任务不执行的情况?twisted中任务调度的延时参数怎么设置?
下面是自己的测试脚本。
import sys
import MySQLdb
from twisted.internet import reactor, threads
import time
class AsyncDBClient:
def __init__(self, host, port, user, pwd, db):
self.host = host
self.port = int(port)
self.user = user
self.passwd = pwd
self.db = db
self.conn = None
try:
self.conn = MySQLdb.connect(host=self.host,port=self.port,user=self.user,passwd=self.passwd,db=self.db,charset='gbk')
self.conn.autocommit(1)
self.cursor = self.conn.cursor()
except MySQLdb.Error,e:
sys.exit(1)
def gend(self, sql, args, callback=None):
d = threads.deferToThread(self.execute_sql, sql)
if callback:
d.addCallback(callback, args)
#d.addErrCallback()
def execute_sql(self, query):
self.cursor.execute(query)
results = self.cursor.fetchall()
return results
def close(self):
print "[+] reactor stop"
self.cursor.close()
self.conn.close()
reactor.stop()
def fetch(self, delay, sql, callback=None, args={}):
reactor.callLater(delay, self.gend, sql, args, callback=handle_res)
def run(self):
#reactor.callLater(10, self.close)
reactor.run()
def test():
async_db = AsyncDBClient('XXXX', XXXX, 'XXXX', 'XXXX', 'XXXX')
async_db.fetch(0, 'select ip from agent_hosts', handle_res, {'db':async_db, 'is_close': True})
async_db.fetch(0, 'select task_id from threads', handle_res)
async_db.fetch(0, 'select task_id from tasks', handle_res)
async_db.run()
def handle_res(results, args):
print results
#if args.has_key("is_close") and args["is_close"] is True:
#args['db'].close()
if __name__ == "__main__":
test()
konasu
10 years, 5 months ago