diff --git a/data/.gitignore b/data/.gitignore new file mode 100644 index 0000000..98e6ef6 --- /dev/null +++ b/data/.gitignore @@ -0,0 +1 @@ +*.db diff --git a/database/sqlite/projectdb.py b/database/sqlite/projectdb.py index a5acfa1..9eb9703 100644 --- a/database/sqlite/projectdb.py +++ b/database/sqlite/projectdb.py @@ -53,5 +53,5 @@ class ProjectDB(BaseProjectDB, BaseDB): def check_update(self, timestamp, fields=None): what = ','.join(('`%s`' % x for x in fields)) if fields else '*' - where = "updatetime >= %d" % timestamp + where = "updatetime >= %f" % timestamp return self._select2dic(self.__tablename__, what=what, where=where) diff --git a/database/sqlite/taskdb.py b/database/sqlite/taskdb.py index a89b93a..f4a545d 100644 --- a/database/sqlite/taskdb.py +++ b/database/sqlite/taskdb.py @@ -45,10 +45,11 @@ class TaskDB(BaseTaskDB, BaseDB): def _parse(self, data): for each in ('schedule', 'fetch', 'process', 'track'): - if each in data and data[each]: - data[each] = json.loads(data[each]) - else: - data[each] = {} + if each in data: + if data[each]: + data[each] = json.loads(data[each]) + else: + data[each] = {} return data def _stringify(self, data): @@ -58,7 +59,7 @@ class TaskDB(BaseTaskDB, BaseDB): return data def load_tasks(self, status, project=None, fields=None): - if project not in self.projects: + if project and project not in self.projects: raise StopIteration what = ','.join(fields) if fields else '*' where = "status = %d" % status @@ -73,6 +74,8 @@ class TaskDB(BaseTaskDB, BaseDB): yield self._parse(each) def get_task(self, project, taskid, fields=None): + if project not in self.projects: + self._list_project() if project not in self.projects: return None what = ','.join(fields) if fields else '*' diff --git a/fetcher/tornado_fetcher.py b/fetcher/tornado_fetcher.py index 8c845e3..11303a8 100644 --- a/fetcher/tornado_fetcher.py +++ b/fetcher/tornado_fetcher.py @@ -257,10 +257,10 @@ class Fetcher(object): self._quit = True tornado.ioloop.IOLoop.instance().stop() - def xmlrpc_run(self, port=24444, bind='127.0.0.1'): + def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False): from SimpleXMLRPCServer import SimpleXMLRPCServer - server = SimpleXMLRPCServer((bind, port), allow_none=True) + server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests) server.register_introspection_functions() server.register_multicall_functions() diff --git a/libs/utils.py b/libs/utils.py index 137ca59..1c636b2 100644 --- a/libs/utils.py +++ b/libs/utils.py @@ -33,3 +33,17 @@ def hide_me(tb, g=globals()): if not tb: tb = base_tb return tb + +def run_in_thread(func, *args, **kwargs): + from threading import Thread + thread = Thread(target=func, args=args, kwargs=kwargs) + thread.daemon = True + thread.start() + return thread + +def run_in_subprocess(func, *args, **kwargs): + from multiprocessing import Process + thread = Process(target=func, args=args, kwargs=kwargs) + thread.daemon = True + thread.start() + return thread diff --git a/run.py b/run.py index 2f82e12..3a7bb82 100755 --- a/run.py +++ b/run.py @@ -11,23 +11,10 @@ import logging import logging.config from multiprocessing import Queue from database.sqlite import taskdb, projectdb +from libs.utils import run_in_thread, run_in_subprocess logging.config.fileConfig("logging.conf") -def run_in_thread(func, *args, **kwargs): - from threading import Thread - thread = Thread(target=func, args=args, kwargs=kwargs) - thread.daemon = True - thread.start() - return thread - -def run_in_subprocess(func, *args, **kwargs): - from multiprocessing import Process - thread = Process(target=func, args=args, kwargs=kwargs) - thread.daemon = True - thread.start() - return thread - def get_taskdb(): return taskdb.TaskDB('./data/task.db') diff --git a/scheduler/scheduler.py b/scheduler/scheduler.py index a6349e8..dccf2de 100644 --- a/scheduler/scheduler.py +++ b/scheduler/scheduler.py @@ -24,6 +24,7 @@ class Scheduler(object): 'itag': None, } LOOP_LIMIT = 1000 + LOOP_INTERVAL = 0.1 def __init__(self, taskdb, projectdb, newtask_queue, status_queue, out_queue): self.taskdb = taskdb @@ -47,9 +48,9 @@ class Scheduler(object): "all": counter.CounterManager( lambda : counter.TotalCounter()), } - self._cnt['1h'].load('.scheduler.1h') - self._cnt['1d'].load('.scheduler.1d') - self._cnt['all'].load('.scheduler.all') + self._cnt['1h'].load('./data/scheduler.1h') + self._cnt['1d'].load('./data/scheduler.1d') + self._cnt['all'].load('./data/scheduler.all') self._last_dump_cnt = 0 def _load_projects(self): @@ -62,8 +63,7 @@ class Scheduler(object): now = time.time() if self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now: return - self._last_update_project = now - for project in self.projectdb.check_update(now): + for project in self.projectdb.check_update(self._last_update_project): logger.debug("project: %s updated." % project['name']) self.projects[project['name']] = project if project['name'] not in self.task_queue: @@ -74,6 +74,7 @@ class Scheduler(object): else: self.task_queue[project['name']].rate = 0 self.task_queue[project['name']].burst = 0 + self._last_update_project = now scheduler_task_fields = ['taskid', 'project', 'schedule', ] def _load_tasks(self, project): @@ -94,7 +95,7 @@ class Scheduler(object): def task_verify(self, task): for each in ('taskid', 'project', 'url', ): - if each not in task: + if each not in task or not task[each]: logger.error('%s not in task: %s' % (each, unicode(task)[:200])) return False if task['project'] not in self.task_queue: @@ -144,7 +145,7 @@ class Scheduler(object): logger.info('ignore newtask %(project)s:%(taskid)s %(url)s' % task) continue oldtask = self.taskdb.get_task(task['project'], task['taskid'], - self.merge_task_fields) + fields=self.merge_task_fields) if oldtask: task = self.on_old_request(task, oldtask) else: @@ -170,9 +171,9 @@ class Scheduler(object): return cnt_dict def _dump_cnt(self): - self._cnt['1h'].dump('.scheduler.1h') - self._cnt['1d'].dump('.scheduler.1d') - self._cnt['all'].dump('.scheduler.all') + self._cnt['1h'].dump('./data/scheduler.1h') + self._cnt['1d'].dump('./data/scheduler.1d') + self._cnt['all'].dump('./data/scheduler.all') def _try_dump_cnt(self): now = time.time() @@ -199,14 +200,15 @@ class Scheduler(object): self._check_task_done() self._check_request() self._check_select() - time.sleep(0.1) + time.sleep(self.LOOP_INTERVAL) + logger.info("scheduler exiting...") self._dump_cnt() - def xmlrpc_run(self, port=23333, bind='127.0.0.1'): + def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False): from SimpleXMLRPCServer import SimpleXMLRPCServer - server = SimpleXMLRPCServer((bind, port), allow_none=True) + server = SimpleXMLRPCServer((bind, port), allow_none=True, logRequests=logRequests) server.register_introspection_functions() server.register_multicall_functions() diff --git a/test/test_database_sqlite.py b/test/test_database_sqlite.py index 6c90bf0..9977e2c 100644 --- a/test/test_database_sqlite.py +++ b/test/test_database_sqlite.py @@ -60,9 +60,6 @@ class TestTaskDB(unittest.TestCase): 'updatetime': time.time(), } - def setUp(self): - pass - def test_create_project(self): taskdb = TaskDB(':memory:') with self.assertRaises(AssertionError): @@ -158,9 +155,9 @@ class TestProjectDB(unittest.TestCase): self.assertNotIn('gourp', project) # update + projectdb.update('not found', status='RUNNING') time.sleep(0.1) now = time.time() - projectdb.update('not found', status='RUNNING') projectdb.update('abc', status='RUNNING') # check update diff --git a/test/test_processor.py b/test/test_processor.py index 5411427..b97f6cb 100644 --- a/test/test_processor.py +++ b/test/test_processor.py @@ -59,57 +59,55 @@ class TestProjectModule(unittest.TestCase): 'name': self.project, 'status': 'DEBUG', } - - def test_build_module(self): - module = project_module.ProjectModule(self.project, self.script, self.env) + self.module = module = project_module.ProjectModule(self.project, self.script, self.env) module.rethrow() _class = module.get() - instance = _class()._init(self.project_info) + self.instance = _class()._init(self.project_info) - # hello + def test_2_hello(self): self.base_task['process']['callback'] = 'hello' - ret = instance.run(module, self.base_task, self.fetch_result) + ret = self.instance.run(self.module, self.base_task, self.fetch_result) self.assertIsNone(ret.exception) self.assertEqual(ret.result, "hello world!") - # echo + def test_3_echo(self): self.base_task['process']['callback'] = 'echo' - ret = instance.run(module, self.base_task, self.fetch_result) + ret = self.instance.run(self.module, self.base_task, self.fetch_result) self.assertIsNone(ret.exception) self.assertEqual(ret.result, "test data") - # saved + def test_4_saved(self): self.base_task['process']['callback'] = 'saved' - ret = instance.run(module, self.base_task, self.fetch_result) + ret = self.instance.run(self.module, self.base_task, self.fetch_result) self.assertIsNone(ret.exception) self.assertEqual(ret.result, self.base_task['process']['save']) - # echo task + def test_5_echo_task(self): self.base_task['process']['callback'] = 'echo_task' - ret = instance.run(module, self.base_task, self.fetch_result) + ret = self.instance.run(self.module, self.base_task, self.fetch_result) self.assertIsNone(ret.exception) self.assertEqual(ret.result, self.project) - # catch_status_code + def test_6_catch_status_code(self): self.fetch_result['status_code'] = 403 self.base_task['process']['callback'] = 'catch_status_code' - ret = instance.run(module, self.base_task, self.fetch_result) + ret = self.instance.run(self.module, self.base_task, self.fetch_result) self.assertIsNone(ret.exception) self.assertEqual(ret.result, 403) self.fetch_result['status_code'] = 200 - # raise_exception + def test_7_raise_exception(self): self.base_task['process']['callback'] = 'raise_exception' - ret = instance.run(module, self.base_task, self.fetch_result) + ret = self.instance.run(self.module, self.base_task, self.fetch_result) self.assertIsNotNone(ret.exception) logstr = ret.logstr() self.assertIn('info', logstr) self.assertIn('warning', logstr) self.assertIn('error', logstr) - # add_task + def test_8_add_task(self): self.base_task['process']['callback'] = 'add_task' - ret = instance.run(module, self.base_task, self.fetch_result) + ret = self.instance.run(self.module, self.base_task, self.fetch_result) self.assertIsNone(ret.exception) self.assertEqual(len(ret.follows), 1) self.assertEqual(len(ret.messages), 1) diff --git a/test/test_scheduler.py b/test/test_scheduler.py index a1ae773..aa520e8 100644 --- a/test/test_scheduler.py +++ b/test/test_scheduler.py @@ -5,47 +5,52 @@ # http://binux.me # Created on 2014-02-08 22:37:13 +import os import time import unittest +import logging +import logging.config +logging.config.fileConfig("logging.conf") + + from scheduler.task_queue import TaskQueue -from scheduler.token_bucket import Bucket - - class TestTaskQueue(unittest.TestCase): - def setUp(self): - pass + @classmethod + def setUpClass(self): + self.task_queue = TaskQueue() + self.task_queue.rate = 100000 + self.task_queue.burst = 100000 + self.task_queue.processing_timeout = 0.2 - def test_task_queue(self): - task_queue = TaskQueue() - task_queue.processing_timeout = 0.1 - task_queue.put('a3', 3, time.time()+0.1) - task_queue.put('a1', 1) - task_queue.put('a2', 2) + self.task_queue.put('a3', 2, time.time()+0.1) + self.task_queue.put('a1', 1) + self.task_queue.put('a2', 3) - # priority queue - self.assertEqual(task_queue.get(), 'a2') + def test_1_priority_queue(self): + self.assertEqual(self.task_queue.get(), 'a2') - # time queue + def test_2_time_queue(self): time.sleep(0.1) - task_queue._check_time_queue() - self.assertEqual(task_queue.get(), 'a3') - self.assertEqual(task_queue.get(), 'a1') + self.task_queue.check_update() + self.assertEqual(self.task_queue.get(), 'a3') + self.assertEqual(self.task_queue.get(), 'a1') - # processing queue - task_queue._check_processing() - self.assertEqual(task_queue.get(), 'a2') - self.assertEqual(len(task_queue), 0) - - # done - task_queue.done('a2') - task_queue.done('a1') + def test_3_processing_queue(self): time.sleep(0.1) - task_queue._check_processing() - task_queue._check_time_queue() - self.assertEqual(task_queue.get(), 'a3') - self.assertEqual(task_queue.get(), None) + self.task_queue.check_update() + self.assertEqual(self.task_queue.get(), 'a2') + self.assertEqual(len(self.task_queue), 0) + + def test_4_done(self): + self.task_queue.done('a2') + self.task_queue.done('a1') + time.sleep(0.1) + self.task_queue.check_update() + self.assertEqual(self.task_queue.get(), 'a3') + self.assertEqual(self.task_queue.get(), None) +from scheduler.token_bucket import Bucket class TestBucket(unittest.TestCase): def test_bucket(self): bucket = Bucket(100, 1000) @@ -60,5 +65,266 @@ class TestBucket(unittest.TestCase): self.assertAlmostEqual(bucket.get(), 920, 0) +import xmlrpclib +from multiprocessing import Queue +from scheduler.scheduler import Scheduler +from database.sqlite import taskdb, projectdb +from libs.utils import run_in_subprocess, run_in_thread +class TestScheduler(unittest.TestCase): + taskdb_path = './data/.test_task.db' + projectdb_path = './data/.test_project.db' + check_project_time = 1 + scheduler_xmlrpc_port = 23333 + + @classmethod + def rm_db(self): + try: + os.remove(self.taskdb_path) + except OSError: + pass + try: + os.remove(self.projectdb_path) + except OSError: + pass + + @classmethod + def setUpClass(self): + self.rm_db() + def get_taskdb(): + return taskdb.TaskDB(self.taskdb_path) + self.taskdb = get_taskdb() + def get_projectdb(): + return projectdb.ProjectDB(self.projectdb_path) + self.projectdb = get_projectdb() + self.newtask_queue = Queue(10) + self.status_queue = Queue(10) + self.scheduler2fetcher = Queue(10) + self.rpc = xmlrpclib.ServerProxy('http://localhost:%d' % self.scheduler_xmlrpc_port) + + def run_scheduler(): + scheduler = Scheduler(taskdb=get_taskdb(), projectdb=get_projectdb(), + newtask_queue=self.newtask_queue, status_queue=self.status_queue, out_queue=self.scheduler2fetcher) + scheduler.UPDATE_PROJECT_INTERVAL = 0.05 + scheduler.LOOP_INTERVAL = 0.01 + run_in_thread(scheduler.xmlrpc_run, port=self.scheduler_xmlrpc_port) + scheduler.run() + + self.process = run_in_subprocess(run_scheduler) + time.sleep(1) + + @classmethod + def tearDownClass(self): + self.rm_db() + self.process.terminate() + + def test_10_new_task_ignore(self): + self.newtask_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url' + }) + self.assertEqual(self.rpc.size(), 0) + + def test_20_new_project(self): + self.projectdb.insert('test_project', { + 'name': 'test_project', + 'group': 'group', + 'status': 'TODO', + 'script': 'import time\nprint time.time()', + 'comments': 'test project', + 'rate': 1.0, + 'burst': 10, + }) + time.sleep(0.1) + self.newtask_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url', + 'fetch': { + 'data': 'abc', + }, + 'process': { + 'data': 'abc', + }, + 'schedule': { + 'age': 0, + }, + }) + time.sleep(0.1) + self.assertEqual(self.rpc.size(), 1) + self.assertEqual(self.rpc.counter('all', 'sum')['test_project']['pending'], 1) + + def test_30_update_project(self): + self.projectdb.update('test_project', status="DEBUG") + task = self.scheduler2fetcher.get(timeout=5) + self.assertIsNotNone(task) + self.assertEqual(task['project'], 'test_project') + self.assertIn('fetch', task) + self.assertIn('process', task) + self.assertNotIn('schedule', task) + self.assertNotIn('track', task) + self.assertEqual(task['fetch']['data'], 'abc') + + def test_40_taskdone_error_no_project(self): + self.status_queue.put({ + 'taskid': 'taskid', + 'project': 'no_project', + 'url': 'url' + }) + time.sleep(0.1) + self.assertEqual(self.rpc.size(), 0) + + def test_50_taskdone_error_no_track(self): + self.status_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url' + }) + time.sleep(0.1) + self.assertEqual(self.rpc.size(), 0) + self.status_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url', + 'track': {} + }) + time.sleep(0.1) + self.assertEqual(self.rpc.size(), 0) + + def test_60_taskdone_failed_retry(self): + self.status_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url', + 'track': { + 'fetch': { + 'ok': True + }, + 'process': { + 'ok': False + }, + } + }) + task = self.scheduler2fetcher.get(timeout=5) + self.assertIsNotNone(task) + + def test_70_taskdone_ok(self): + self.status_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url', + 'track': { + 'fetch': { + 'ok': True + }, + 'process': { + 'ok': True + }, + } + }) + time.sleep(0.1) + self.assertEqual(self.rpc.size(), 0) + + def test_80_newtask_age_ignore(self): + self.newtask_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url', + 'fetch': { + 'data': 'abc', + }, + 'process': { + 'data': 'abc', + }, + 'schedule': { + 'age': 30, + }, + }) + time.sleep(0.1) + self.assertEqual(self.rpc.size(), 0) + + def test_90_newtask_with_itag(self): + time.sleep(0.1) + self.newtask_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url', + 'fetch': { + 'data': 'abc', + }, + 'process': { + 'data': 'abc', + }, + 'schedule': { + 'age': 0, + 'retries': 1 + }, + }) + task = self.scheduler2fetcher.get(timeout=5) + self.assertIsNotNone(task) + + self.test_70_taskdone_ok() + + def test_a10_newtask_restart_by_age(self): + self.newtask_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url', + 'fetch': { + 'data': 'abc', + }, + 'process': { + 'data': 'abc', + }, + 'schedule': { + 'age': 0, + 'retries': 1 + }, + }) + task = self.scheduler2fetcher.get(timeout=5) + self.assertIsNotNone(task) + + def test_a20_failed_retry(self): + self.status_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url', + 'track': { + 'fetch': { + 'ok': True + }, + 'process': { + 'ok': False + }, + } + }) + task = self.scheduler2fetcher.get(timeout=5) + self.assertIsNotNone(task) + + self.status_queue.put({ + 'taskid': 'taskid', + 'project': 'test_project', + 'url': 'url', + 'track': { + 'fetch': { + 'ok': False + }, + 'process': { + 'ok': True + }, + } + }) + time.sleep(0.2) + + def test_z10_startup(self): + self.assertTrue(self.process.is_alive()) + + def test_z20_quit(self): + self.rpc._quit() + time.sleep(0.2) + self.assertFalse(self.process.is_alive()) + self.taskdb.conn.commit() + self.assertEqual(self.taskdb.get_task('test_project', 'taskid')['status'], self.taskdb.FAILED) + if __name__ == '__main__': unittest.main()