add test for scheduler

This commit is contained in:
binux 2014-03-07 01:44:58 +08:00
parent 0f25a36b2b
commit f5b23b3e72
10 changed files with 354 additions and 86 deletions

1
data/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
*.db

View File

@ -53,5 +53,5 @@ class ProjectDB(BaseProjectDB, BaseDB):
def check_update(self, timestamp, fields=None):
what = ','.join(('`%s`' % x for x in fields)) if fields else '*'
where = "updatetime >= %d" % timestamp
where = "updatetime >= %f" % timestamp
return self._select2dic(self.__tablename__, what=what, where=where)

View File

@ -45,10 +45,11 @@ class TaskDB(BaseTaskDB, BaseDB):
def _parse(self, data):
for each in ('schedule', 'fetch', 'process', 'track'):
if each in data and data[each]:
data[each] = json.loads(data[each])
else:
data[each] = {}
if each in data:
if data[each]:
data[each] = json.loads(data[each])
else:
data[each] = {}
return data
def _stringify(self, data):
@ -58,7 +59,7 @@ class TaskDB(BaseTaskDB, BaseDB):
return data
def load_tasks(self, status, project=None, fields=None):
if project not in self.projects:
if project and project not in self.projects:
raise StopIteration
what = ','.join(fields) if fields else '*'
where = "status = %d" % status
@ -73,6 +74,8 @@ class TaskDB(BaseTaskDB, BaseDB):
yield self._parse(each)
def get_task(self, project, taskid, fields=None):
if project not in self.projects:
self._list_project()
if project not in self.projects:
return None
what = ','.join(fields) if fields else '*'

View File

@ -257,10 +257,10 @@ class Fetcher(object):
self._quit = True
tornado.ioloop.IOLoop.instance().stop()
def xmlrpc_run(self, port=24444, bind='127.0.0.1'):
def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
from SimpleXMLRPCServer import SimpleXMLRPCServer
server = SimpleXMLRPCServer((bind, port), allow_none=True)
server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
server.register_introspection_functions()
server.register_multicall_functions()

View File

@ -33,3 +33,17 @@ def hide_me(tb, g=globals()):
if not tb:
tb = base_tb
return tb
def run_in_thread(func, *args, **kwargs):
from threading import Thread
thread = Thread(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread
def run_in_subprocess(func, *args, **kwargs):
from multiprocessing import Process
thread = Process(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread

15
run.py
View File

@ -11,23 +11,10 @@ import logging
import logging.config
from multiprocessing import Queue
from database.sqlite import taskdb, projectdb
from libs.utils import run_in_thread, run_in_subprocess
logging.config.fileConfig("logging.conf")
def run_in_thread(func, *args, **kwargs):
from threading import Thread
thread = Thread(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread
def run_in_subprocess(func, *args, **kwargs):
from multiprocessing import Process
thread = Process(target=func, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
return thread
def get_taskdb():
return taskdb.TaskDB('./data/task.db')

View File

@ -24,6 +24,7 @@ class Scheduler(object):
'itag': None,
}
LOOP_LIMIT = 1000
LOOP_INTERVAL = 0.1
def __init__(self, taskdb, projectdb, newtask_queue, status_queue, out_queue):
self.taskdb = taskdb
@ -47,9 +48,9 @@ class Scheduler(object):
"all": counter.CounterManager(
lambda : counter.TotalCounter()),
}
self._cnt['1h'].load('.scheduler.1h')
self._cnt['1d'].load('.scheduler.1d')
self._cnt['all'].load('.scheduler.all')
self._cnt['1h'].load('./data/scheduler.1h')
self._cnt['1d'].load('./data/scheduler.1d')
self._cnt['all'].load('./data/scheduler.all')
self._last_dump_cnt = 0
def _load_projects(self):
@ -62,8 +63,7 @@ class Scheduler(object):
now = time.time()
if self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now:
return
self._last_update_project = now
for project in self.projectdb.check_update(now):
for project in self.projectdb.check_update(self._last_update_project):
logger.debug("project: %s updated." % project['name'])
self.projects[project['name']] = project
if project['name'] not in self.task_queue:
@ -74,6 +74,7 @@ class Scheduler(object):
else:
self.task_queue[project['name']].rate = 0
self.task_queue[project['name']].burst = 0
self._last_update_project = now
scheduler_task_fields = ['taskid', 'project', 'schedule', ]
def _load_tasks(self, project):
@ -94,7 +95,7 @@ class Scheduler(object):
def task_verify(self, task):
for each in ('taskid', 'project', 'url', ):
if each not in task:
if each not in task or not task[each]:
logger.error('%s not in task: %s' % (each, unicode(task)[:200]))
return False
if task['project'] not in self.task_queue:
@ -144,7 +145,7 @@ class Scheduler(object):
logger.info('ignore newtask %(project)s:%(taskid)s %(url)s' % task)
continue
oldtask = self.taskdb.get_task(task['project'], task['taskid'],
self.merge_task_fields)
fields=self.merge_task_fields)
if oldtask:
task = self.on_old_request(task, oldtask)
else:
@ -170,9 +171,9 @@ class Scheduler(object):
return cnt_dict
def _dump_cnt(self):
self._cnt['1h'].dump('.scheduler.1h')
self._cnt['1d'].dump('.scheduler.1d')
self._cnt['all'].dump('.scheduler.all')
self._cnt['1h'].dump('./data/scheduler.1h')
self._cnt['1d'].dump('./data/scheduler.1d')
self._cnt['all'].dump('./data/scheduler.all')
def _try_dump_cnt(self):
now = time.time()
@ -199,14 +200,15 @@ class Scheduler(object):
self._check_task_done()
self._check_request()
self._check_select()
time.sleep(0.1)
time.sleep(self.LOOP_INTERVAL)
logger.info("scheduler exiting...")
self._dump_cnt()
def xmlrpc_run(self, port=23333, bind='127.0.0.1'):
def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):
from SimpleXMLRPCServer import SimpleXMLRPCServer
server = SimpleXMLRPCServer((bind, port), allow_none=True)
server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests)
server.register_introspection_functions()
server.register_multicall_functions()

View File

@ -60,9 +60,6 @@ class TestTaskDB(unittest.TestCase):
'updatetime': time.time(),
}
def setUp(self):
pass
def test_create_project(self):
taskdb = TaskDB(':memory:')
with self.assertRaises(AssertionError):
@ -158,9 +155,9 @@ class TestProjectDB(unittest.TestCase):
self.assertNotIn('gourp', project)
# update
projectdb.update('not found', status='RUNNING')
time.sleep(0.1)
now = time.time()
projectdb.update('not found', status='RUNNING')
projectdb.update('abc', status='RUNNING')
# check update

View File

@ -59,57 +59,55 @@ class TestProjectModule(unittest.TestCase):
'name': self.project,
'status': 'DEBUG',
}
def test_build_module(self):
module = project_module.ProjectModule(self.project, self.script, self.env)
self.module = module = project_module.ProjectModule(self.project, self.script, self.env)
module.rethrow()
_class = module.get()
instance = _class()._init(self.project_info)
self.instance = _class()._init(self.project_info)
# hello
def test_2_hello(self):
self.base_task['process']['callback'] = 'hello'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, "hello world!")
# echo
def test_3_echo(self):
self.base_task['process']['callback'] = 'echo'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, "test data")
# saved
def test_4_saved(self):
self.base_task['process']['callback'] = 'saved'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, self.base_task['process']['save'])
# echo task
def test_5_echo_task(self):
self.base_task['process']['callback'] = 'echo_task'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, self.project)
# catch_status_code
def test_6_catch_status_code(self):
self.fetch_result['status_code'] = 403
self.base_task['process']['callback'] = 'catch_status_code'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(ret.result, 403)
self.fetch_result['status_code'] = 200
# raise_exception
def test_7_raise_exception(self):
self.base_task['process']['callback'] = 'raise_exception'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNotNone(ret.exception)
logstr = ret.logstr()
self.assertIn('info', logstr)
self.assertIn('warning', logstr)
self.assertIn('error', logstr)
# add_task
def test_8_add_task(self):
self.base_task['process']['callback'] = 'add_task'
ret = instance.run(module, self.base_task, self.fetch_result)
ret = self.instance.run(self.module, self.base_task, self.fetch_result)
self.assertIsNone(ret.exception)
self.assertEqual(len(ret.follows), 1)
self.assertEqual(len(ret.messages), 1)

View File

@ -5,47 +5,52 @@
# http://binux.me
# Created on 2014-02-08 22:37:13
import os
import time
import unittest
import logging
import logging.config
logging.config.fileConfig("logging.conf")
from scheduler.task_queue import TaskQueue
from scheduler.token_bucket import Bucket
class TestTaskQueue(unittest.TestCase):
def setUp(self):
pass
@classmethod
def setUpClass(self):
self.task_queue = TaskQueue()
self.task_queue.rate = 100000
self.task_queue.burst = 100000
self.task_queue.processing_timeout = 0.2
def test_task_queue(self):
task_queue = TaskQueue()
task_queue.processing_timeout = 0.1
task_queue.put('a3', 3, time.time()+0.1)
task_queue.put('a1', 1)
task_queue.put('a2', 2)
self.task_queue.put('a3', 2, time.time()+0.1)
self.task_queue.put('a1', 1)
self.task_queue.put('a2', 3)
# priority queue
self.assertEqual(task_queue.get(), 'a2')
def test_1_priority_queue(self):
self.assertEqual(self.task_queue.get(), 'a2')
# time queue
def test_2_time_queue(self):
time.sleep(0.1)
task_queue._check_time_queue()
self.assertEqual(task_queue.get(), 'a3')
self.assertEqual(task_queue.get(), 'a1')
self.task_queue.check_update()
self.assertEqual(self.task_queue.get(), 'a3')
self.assertEqual(self.task_queue.get(), 'a1')
# processing queue
task_queue._check_processing()
self.assertEqual(task_queue.get(), 'a2')
self.assertEqual(len(task_queue), 0)
# done
task_queue.done('a2')
task_queue.done('a1')
def test_3_processing_queue(self):
time.sleep(0.1)
task_queue._check_processing()
task_queue._check_time_queue()
self.assertEqual(task_queue.get(), 'a3')
self.assertEqual(task_queue.get(), None)
self.task_queue.check_update()
self.assertEqual(self.task_queue.get(), 'a2')
self.assertEqual(len(self.task_queue), 0)
def test_4_done(self):
self.task_queue.done('a2')
self.task_queue.done('a1')
time.sleep(0.1)
self.task_queue.check_update()
self.assertEqual(self.task_queue.get(), 'a3')
self.assertEqual(self.task_queue.get(), None)
from scheduler.token_bucket import Bucket
class TestBucket(unittest.TestCase):
def test_bucket(self):
bucket = Bucket(100, 1000)
@ -60,5 +65,266 @@ class TestBucket(unittest.TestCase):
self.assertAlmostEqual(bucket.get(), 920, 0)
import xmlrpclib
from multiprocessing import Queue
from scheduler.scheduler import Scheduler
from database.sqlite import taskdb, projectdb
from libs.utils import run_in_subprocess, run_in_thread
class TestScheduler(unittest.TestCase):
taskdb_path = './data/.test_task.db'
projectdb_path = './data/.test_project.db'
check_project_time = 1
scheduler_xmlrpc_port = 23333
@classmethod
def rm_db(self):
try:
os.remove(self.taskdb_path)
except OSError:
pass
try:
os.remove(self.projectdb_path)
except OSError:
pass
@classmethod
def setUpClass(self):
self.rm_db()
def get_taskdb():
return taskdb.TaskDB(self.taskdb_path)
self.taskdb = get_taskdb()
def get_projectdb():
return projectdb.ProjectDB(self.projectdb_path)
self.projectdb = get_projectdb()
self.newtask_queue = Queue(10)
self.status_queue = Queue(10)
self.scheduler2fetcher = Queue(10)
self.rpc = xmlrpclib.ServerProxy('http://localhost:%d' % self.scheduler_xmlrpc_port)
def run_scheduler():
scheduler = Scheduler(taskdb=get_taskdb(), projectdb=get_projectdb(),
newtask_queue=self.newtask_queue, status_queue=self.status_queue, out_queue=self.scheduler2fetcher)
scheduler.UPDATE_PROJECT_INTERVAL = 0.05
scheduler.LOOP_INTERVAL = 0.01
run_in_thread(scheduler.xmlrpc_run, port=self.scheduler_xmlrpc_port)
scheduler.run()
self.process = run_in_subprocess(run_scheduler)
time.sleep(1)
@classmethod
def tearDownClass(self):
self.rm_db()
self.process.terminate()
def test_10_new_task_ignore(self):
self.newtask_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url'
})
self.assertEqual(self.rpc.size(), 0)
def test_20_new_project(self):
self.projectdb.insert('test_project', {
'name': 'test_project',
'group': 'group',
'status': 'TODO',
'script': 'import time\nprint time.time()',
'comments': 'test project',
'rate': 1.0,
'burst': 10,
})
time.sleep(0.1)
self.newtask_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url',
'fetch': {
'data': 'abc',
},
'process': {
'data': 'abc',
},
'schedule': {
'age': 0,
},
})
time.sleep(0.1)
self.assertEqual(self.rpc.size(), 1)
self.assertEqual(self.rpc.counter('all', 'sum')['test_project']['pending'], 1)
def test_30_update_project(self):
self.projectdb.update('test_project', status="DEBUG")
task = self.scheduler2fetcher.get(timeout=5)
self.assertIsNotNone(task)
self.assertEqual(task['project'], 'test_project')
self.assertIn('fetch', task)
self.assertIn('process', task)
self.assertNotIn('schedule', task)
self.assertNotIn('track', task)
self.assertEqual(task['fetch']['data'], 'abc')
def test_40_taskdone_error_no_project(self):
self.status_queue.put({
'taskid': 'taskid',
'project': 'no_project',
'url': 'url'
})
time.sleep(0.1)
self.assertEqual(self.rpc.size(), 0)
def test_50_taskdone_error_no_track(self):
self.status_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url'
})
time.sleep(0.1)
self.assertEqual(self.rpc.size(), 0)
self.status_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url',
'track': {}
})
time.sleep(0.1)
self.assertEqual(self.rpc.size(), 0)
def test_60_taskdone_failed_retry(self):
self.status_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url',
'track': {
'fetch': {
'ok': True
},
'process': {
'ok': False
},
}
})
task = self.scheduler2fetcher.get(timeout=5)
self.assertIsNotNone(task)
def test_70_taskdone_ok(self):
self.status_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url',
'track': {
'fetch': {
'ok': True
},
'process': {
'ok': True
},
}
})
time.sleep(0.1)
self.assertEqual(self.rpc.size(), 0)
def test_80_newtask_age_ignore(self):
self.newtask_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url',
'fetch': {
'data': 'abc',
},
'process': {
'data': 'abc',
},
'schedule': {
'age': 30,
},
})
time.sleep(0.1)
self.assertEqual(self.rpc.size(), 0)
def test_90_newtask_with_itag(self):
time.sleep(0.1)
self.newtask_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url',
'fetch': {
'data': 'abc',
},
'process': {
'data': 'abc',
},
'schedule': {
'age': 0,
'retries': 1
},
})
task = self.scheduler2fetcher.get(timeout=5)
self.assertIsNotNone(task)
self.test_70_taskdone_ok()
def test_a10_newtask_restart_by_age(self):
self.newtask_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url',
'fetch': {
'data': 'abc',
},
'process': {
'data': 'abc',
},
'schedule': {
'age': 0,
'retries': 1
},
})
task = self.scheduler2fetcher.get(timeout=5)
self.assertIsNotNone(task)
def test_a20_failed_retry(self):
self.status_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url',
'track': {
'fetch': {
'ok': True
},
'process': {
'ok': False
},
}
})
task = self.scheduler2fetcher.get(timeout=5)
self.assertIsNotNone(task)
self.status_queue.put({
'taskid': 'taskid',
'project': 'test_project',
'url': 'url',
'track': {
'fetch': {
'ok': False
},
'process': {
'ok': True
},
}
})
time.sleep(0.2)
def test_z10_startup(self):
self.assertTrue(self.process.is_alive())
def test_z20_quit(self):
self.rpc._quit()
time.sleep(0.2)
self.assertFalse(self.process.is_alive())
self.taskdb.conn.commit()
self.assertEqual(self.taskdb.get_task('test_project', 'taskid')['status'], self.taskdb.FAILED)
if __name__ == '__main__':
unittest.main()