diff --git a/README.md b/README.md index 86445855b..cf9ff2b3a 100644 --- a/README.md +++ b/README.md @@ -3,13 +3,12 @@ pyspider [![Build Status]][Travis CI] [![Coverage Status]][Coverage] [![Try]][De A Powerful Spider(Web Crawler) System in Python. **[TRY IT NOW!][Demo]** -- Write script in python with powerful API -- Python 2&3 +- Write script in Python - Powerful WebUI with script editor, task monitor, project manager and result viewer -- Javascript pages supported! -- MySQL, MongoDB, SQLite, PostgreSQL as database backend -- Task priority, retry, periodical, recrawl by age and more -- Distributed architecture +- [MySQL](https://www.mysql.com/), [MongoDB](https://www.mongodb.org/), [Redis](http://redis.io/), [SQLite](https://www.sqlite.org/), [PostgreSQL](http://www.postgresql.org/) with [SQLAlchemy](http://www.sqlalchemy.org/) as database backend +- [RabbitMQ](http://www.rabbitmq.com/), [Beanstalk](http://kr.github.com/beanstalkd/), [Redis](http://redis.io/) and [Kombu](http://kombu.readthedocs.org/) as message queue +- Task priority, retry, periodical, recrawl by age, etc... +- Distributed architecture, Crawl Javascript pages, Python 2&3, etc... Documentation: [http://docs.pyspider.org/](http://docs.pyspider.org/) Tutorial: [http://docs.pyspider.org/en/latest/tutorial/](http://docs.pyspider.org/en/latest/tutorial/) diff --git a/docs/Command-Line.md b/docs/Command-Line.md index 8df58248b..41126054f 100644 --- a/docs/Command-Line.md +++ b/docs/Command-Line.md @@ -87,15 +87,18 @@ type: #### --message-queue ``` - rabbitmq: - amqp://username:password@host:5672/%2F - Refer: https://www.rabbitmq.com/uri-spec.html - beanstalk: - beanstalk://host:11300/ - redis: - redis://host:6379/db - builtin: - None +rabbitmq: + amqp://username:password@host:5672/%2F + see https://www.rabbitmq.com/uri-spec.html +beanstalk: + beanstalk://host:11300/ +redis: + redis://host:6379/db +kombu: + kombu+transport://userid:password@hostname:port/virtual_host + see http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls +builtin: + None ``` #### --phantomjs-proxy diff --git a/docs/Deployment.md b/docs/Deployment.md index 2b2817775..d630c9d91 100644 --- a/docs/Deployment.md +++ b/docs/Deployment.md @@ -78,13 +78,15 @@ type: You can use connection URL to specify the message queue: ``` - rabbitmq: - amqp://username:password@host:5672/%2F - Refer: https://www.rabbitmq.com/uri-spec.html - beanstalk: - beanstalk://host:11300/ - redis: - redis://host:6379/db +rabbitmq: + amqp://username:password@host:5672/%2F + Refer: https://www.rabbitmq.com/uri-spec.html +beanstalk: + beanstalk://host:11300/ +redis: + redis://host:6379/db +builtin: + None ``` running diff --git a/docs/index.md b/docs/index.md index 47c3d604f..73c3ae906 100644 --- a/docs/index.md +++ b/docs/index.md @@ -3,13 +3,12 @@ pyspider [![Build Status][Build Status]][Travis CI] [![Coverage Status][Coverage A Powerful Spider(Web Crawler) System in Python. **[TRY IT NOW!][Demo]** -- Write script in python with powerful API -- Python 2&3 +- Write script in Python - Powerful WebUI with script editor, task monitor, project manager and result viewer -- Javascript pages supported! -- MySQL, MongoDB, SQLite, PostgreSQL as database backend -- Task priority, retry, periodical, recrawl by age and more -- Distributed architecture +- [MySQL](https://www.mysql.com/), [MongoDB](https://www.mongodb.org/), [Redis](http://redis.io/), [SQLite](https://www.sqlite.org/), [PostgreSQL](http://www.postgresql.org/) with [SQLAlchemy](http://www.sqlalchemy.org/) as database backend +- [RabbitMQ](http://www.rabbitmq.com/), [Beanstalk](http://kr.github.com/beanstalkd/), [Redis](http://redis.io/) and [Kombu](http://kombu.readthedocs.org/) as message queue +- Task priority, retry, periodical, recrawl by age, ... +- Distributed architecture, Crawl Javascript pages, Python 2&3, ... Sample Code diff --git a/pyspider/message_queue/__init__.py b/pyspider/message_queue/__init__.py index cc06987ee..e241add30 100644 --- a/pyspider/message_queue/__init__.py +++ b/pyspider/message_queue/__init__.py @@ -20,11 +20,14 @@ def connect_message_queue(name, url=None, maxsize=0): rabbitmq: amqp://username:password@host:5672/%2F - Refer: https://www.rabbitmq.com/uri-spec.html + see https://www.rabbitmq.com/uri-spec.html beanstalk: beanstalk://host:11300/ redis: redis://host:6379/db + kombu: + kombu+transport://userid:password@hostname:port/virtual_host + see http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls builtin: None """ @@ -49,5 +52,10 @@ def connect_message_queue(name, url=None, maxsize=0): db = 0 return Queue(name, parsed.hostname, parsed.port, db=db, maxsize=maxsize) + else: + if url.startswith('kombu+'): + url = url[len('kombu+'):] + from .kombu_queue import Queue + return Queue(name, url, maxsize=maxsize) raise Exception('unknow connection url: %s', url) diff --git a/pyspider/message_queue/kombu_queue.py b/pyspider/message_queue/kombu_queue.py new file mode 100644 index 000000000..6bc145f17 --- /dev/null +++ b/pyspider/message_queue/kombu_queue.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +# vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8: +# Author: Binux +# http://binux.me +# Created on 2015-05-22 20:54:01 + +import time +import umsgpack +from kombu import Connection, enable_insecure_serializers +from kombu.serialization import register +from kombu.exceptions import ChannelError +from six.moves import queue as BaseQueue + + +register('umsgpack', umsgpack.packb, umsgpack.unpackb, 'application/x-msgpack') +enable_insecure_serializers(['umsgpack']) + + +class KombuQueue(object): + """ + kombu is a high-level interface for multiple message queue backends. + + KombuQueue is built on top of kombu API. + """ + + Empty = BaseQueue.Empty + Full = BaseQueue.Full + max_timeout = 0.3 + + def __init__(self, name, url="amqp://", maxsize=0, lazy_limit=True): + """ + Constructor for KombuQueue + + url: http://kombu.readthedocs.org/en/latest/userguide/connections.html#urls + maxsize: an integer that sets the upperbound limit on the number of + items that can be placed in the queue. + """ + self.name = name + self.conn = Connection(url) + self.queue = self.conn.SimpleQueue(self.name, no_ack=True, serializer='umsgpack') + + self.maxsize = maxsize + self.lazy_limit = lazy_limit + if self.lazy_limit and self.maxsize: + self.qsize_diff_limit = int(self.maxsize * 0.1) + else: + self.qsize_diff_limit = 0 + self.qsize_diff = 0 + + def qsize(self): + try: + return self.queue.qsize() + except ChannelError: + return 0 + + def empty(self): + if self.qsize() == 0: + return True + else: + return False + + def full(self): + if self.maxsize and self.qsize() >= self.maxsize: + return True + else: + return False + + def put(self, obj, block=True, timeout=None): + if not block: + return self.put_nowait() + + start_time = time.time() + while True: + try: + return self.put_nowait(obj) + except BaseQueue.Full: + if timeout: + lasted = time.time() - start_time + if timeout > lasted: + time.sleep(min(self.max_timeout, timeout - lasted)) + else: + raise + else: + time.sleep(self.max_timeout) + + def put_nowait(self, obj): + if self.lazy_limit and self.qsize_diff < self.qsize_diff_limit: + pass + elif self.full(): + raise BaseQueue.Full + else: + self.qsize_diff = 0 + return self.queue.put(obj) + + def get(self, block=True, timeout=None): + try: + ret = self.queue.get(block, timeout) + return ret.payload + except self.queue.Empty: + raise BaseQueue.Empty + + def get_nowait(self): + try: + ret = self.queue.get_nowait() + return ret.payload + except self.queue.Empty: + raise BaseQueue.Empty + + def delete(self): + self.queue.queue.delete() + + def __del__(self): + self.queue.close() + + +Queue = KombuQueue diff --git a/pyspider/message_queue/redis_queue.py b/pyspider/message_queue/redis_queue.py index 6426632e5..3744b1ef6 100644 --- a/pyspider/message_queue/redis_queue.py +++ b/pyspider/message_queue/redis_queue.py @@ -54,7 +54,6 @@ def full(self): def put_nowait(self, obj): if self.lazy_limit and self.last_qsize < self.maxsize: - print(self.name, self.last_qsize) pass elif self.full(): raise self.Full diff --git a/requirements.txt b/requirements.txt index adf38fbe1..f053d6b9d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,3 +18,4 @@ SQLAlchemy>=0.9.7 six amqp>=1.3.0 redis +kombu diff --git a/setup.py b/setup.py index 838b0bf3e..11e189e2d 100644 --- a/setup.py +++ b/setup.py @@ -78,6 +78,7 @@ 'unittest2>=0.5.1', 'SQLAlchemy>=0.9.7', 'redis', + 'kombu', ], }, diff --git a/tests/test_message_queue.py b/tests/test_message_queue.py index ab23093df..84e07207c 100644 --- a/tests/test_message_queue.py +++ b/tests/test_message_queue.py @@ -169,3 +169,51 @@ def tearDownClass(self): self.q2.get() while not self.q3.empty(): self.q3.get() + +class TestKombuQueue(TestMessageQueue, unittest.TestCase): + kombu_url = 'kombu+memory://' + + @classmethod + def setUpClass(self): + from pyspider.message_queue import connect_message_queue + with utils.timeout(3): + self.q1 = connect_message_queue('test_queue', self.kombu_url, maxsize=5) + self.q2 = connect_message_queue('test_queue', self.kombu_url, maxsize=5) + self.q3 = connect_message_queue('test_queue_for_threading_test', self.kombu_url) + while not self.q1.empty(): + self.q1.get() + while not self.q2.empty(): + self.q2.get() + while not self.q3.empty(): + self.q3.get() + + @classmethod + def tearDownClass(self): + while not self.q1.empty(): + self.q1.get() + self.q1.delete() + while not self.q2.empty(): + self.q2.get() + self.q2.delete() + while not self.q3.empty(): + self.q3.get() + self.q3.delete() + +@unittest.skip('test cannot pass, get is buffered') +@unittest.skipIf(os.environ.get('IGNORE_RABBITMQ'), 'no rabbitmq server for test.') +class TestKombuAmpqQueue(TestKombuQueue): + kombu_url = 'kombu+amqp://' + +@unittest.skip('test cannot pass, put is buffered') +@unittest.skipIf(os.environ.get('IGNORE_REDIS'), 'no redis server for test.') +class TestKombuRedisQueue(TestKombuQueue): + kombu_url = 'kombu+redis://' + +@unittest.skip('test cannot pass, get is buffered') +@unittest.skipIf(os.environ.get('IGNORE_BEANSTALK'), 'no beanstalk server for test.') +class TestKombuBeanstalkQueue(TestKombuQueue): + kombu_url = 'kombu+beanstalk://' + +@unittest.skipIf(os.environ.get('IGNORE_MONGODB'), 'no rabbitmq server for test.') +class TestKombuMongoDBQueue(TestKombuQueue): + kombu_url = 'kombu+mongodb://'