Skip to content
This repository has been archived by the owner on Jun 10, 2024. It is now read-only.

Commit

Permalink
add new command send_message, sending message to project via command-…
Browse files Browse the repository at this point in the history
…line

close #67
  • Loading branch information
binux committed Feb 8, 2015
1 parent ece43a2 commit 2471eb4
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 0 deletions.
27 changes: 27 additions & 0 deletions pyspider/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,33 @@ def one(ctx, interactive, enable_phantomjs, scripts):
phantomjs_obj.quit()


@cli.command()
@click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler')
@click.argument('project', nargs=1)
@click.argument('message', nargs=1)
@click.pass_context
def send_message(ctx, scheduler_rpc, project, message):
if isinstance(scheduler_rpc, six.string_types):
scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
scheduler_rpc = connect_rpc(ctx, None, 'http://%s/' % (
os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
if scheduler_rpc is None:
scheduler_rpc = connect_rpc(ctx, None, 'http://localhost:23333/')

return scheduler_rpc.send_task({
'taskid': utils.md5string('data:,on_message'),
'project': project,
'url': 'data:,on_message',
'fetch': {
'save': ('__command__', message),
},
'process': {
'callback': '_on_message',
}
})


def main():
cli()

Expand Down
6 changes: 6 additions & 0 deletions pyspider/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,12 @@ def new_task(task):
return False
server.register_function(new_task, 'newtask')

def send_task(task):
'''dispatch task to fetcher'''
self.send_task(task)
return True
server.register_function(send_task, 'send_task')

def update_project():
self._force_update_project = True
server.register_function(update_project, 'update_project')
Expand Down
41 changes: 41 additions & 0 deletions tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,44 @@ def wait_text(timeout=1):
self.assertIn('scheduler exiting...', text)
os.close(fd)
os.kill(pid, signal.SIGINT)

class TestSendMessage(unittest.TestCase):

@classmethod
def setUpClass(self):
shutil.rmtree('./data/tests', ignore_errors=True)
os.makedirs('./data/tests')

ctx = run.cli.make_context('test', [
'--taskdb', 'sqlite+taskdb:///data/tests/task.db',
'--projectdb', 'sqlite+projectdb:///data/tests/projectdb.db',
'--resultdb', 'sqlite+resultdb:///data/tests/resultdb.db',
], None, obj=dict(testing_mode=True))
self.ctx = run.cli.invoke(ctx)

ctx = run.scheduler.make_context('scheduler', [], self.ctx)
scheduler = run.scheduler.invoke(ctx)
utils.run_in_thread(scheduler.xmlrpc_run)
utils.run_in_thread(scheduler.run)

time.sleep(1)

@classmethod
def tearDownClass(self):
for each in self.ctx.obj.instances:
each.quit()
time.sleep(1)

shutil.rmtree('./data/tests', ignore_errors=True)

def test_10_send_message(self):
ctx = run.send_message.make_context('send_message', [
'test_project', 'test_message'
], self.ctx)
self.assertTrue(run.send_message.invoke(ctx))
while True:
task = self.ctx.obj.scheduler2fetcher.get(timeout=1)
if task['url'] == 'data:,on_message':
break
self.assertEqual(task['process']['callback'], '_on_message')

0 comments on commit 2471eb4

Please sign in to comment.