diff --git a/.gitignore b/.gitignore index 07ec55e..cf81f45 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ config.ini .DS_Store test.zip +iron.json diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 0000000..e6cd97c --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,22 @@ +Copyright (c) 2012, Iron.io, Inc. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, +this list of conditions and the following disclaimer. +* Redistributions in binary form must reproduce the above copyright notice, +this list of conditions and the following disclaimer in the documentation +and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; +OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR +OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF +ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..42eb410 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1 @@ +include LICENSE.txt diff --git a/README.md b/README.md new file mode 100644 index 0000000..b38ac90 --- /dev/null +++ b/README.md @@ -0,0 +1,222 @@ +iron_worker_python is Python language binding for IronWorker. + +IronWorker is a massively scalable background processing system. +[See How It Works](http://www.iron.io/products/worker/how). + +# Getting Started + + +## Get credentials +To start using iron_worker_python, you need to sign up and get an OAuth token. + +1. Go to http://iron.io/ and sign up. +2. Get an OAuth Token at http://hud.iron.io/tokens + +## Install iron_worker_python + +The recommended way to install iron_worker_python is through `pip` or `easy_install`. The package name is `iron_worker`: + +``` +$ easy_install iron_worker +``` + +For `pip`: + +``` +$ pip install iron_worker +``` + +If you don't want to use `pip` or `easy_install`, you can always install from source. First, you'll need [iron_core_python](https://github.com/iron-io/iron_core_python). Download that; the file you're after is named iron_core.py. Then, download the [iron_worker_python](https://github.com/iron-io/iron_worker_python) library. The file you're after is named iron_worker.py. As long as both iron_core.py and iron_worker.py are in a directory in the import path, you're all set. + +Including the library is easy: + +```python +from iron_worker import * +``` +## Configure + +iron_worker_python follows the [standard configuration](http://dev.iron.io/worker/reference/configuration) convention followed by the other official libraries. + +Create a file in the root of your project named "iron.json". You'll need your project ID and OAuth token. You can get them from [the HUD](https://hud.iron.io). Include them in the iron.json file as follows: + +``` +{ + "project_id": "Your_Project_ID", + "token": "Your_OAuth_Token" +} +``` + +## Creating a Worker + +Workers are just Python scripts that are run in the IronWorker cloud. Write them the way you would write any Python script. + +Here's an example worker: + +```python +print "Hello Python World!\n" +``` + +## Upload code to IronWorker + +Uploading code to the server is done by creating a code package and uploading it. Creating a code package is simple: + +```python +code = CodePackage(name="WorkerName") +code.merge("/path/to/file") +code.executable = "/path/to/file" +``` + +Every code package needs a name (which we specified above by passing it to the constructor, but we could have just as easily set it with `code.name = "WorkerName"`) and an executable (which we set using `code.executable` above, but could have just as easily passed it to the constructor). The executable is just the file you want the worker to run; it is the entry point for your code, the file you would execute if you were going to run the worker on your own machine. + +iron_worker_python tries to react intelligently to your input; in the example above, it would have noticed that there is only one file in the CodePackage, and would have set it to be the executable. You should not rely on this, however; it's recommended that you always set the executable manually. + +Once you have a CodePackage, you need to upload it using the API. + +```python +worker = IronWorker() # Instantiate the API +worker.upload(code) # upload the CodePackage +``` + +Note that, for brevity, you can build simple CodePackages and upload them all in one step: + +```python +worker = IronWorker() # Instantiate the API +worker.upload(target="/path/to/file/or/dir", name="WorkerName", executable="/path/to/executable") +``` + +This will create a CodePackage, merge in the target, set the name to "WorkerName", and set the executable to "/path/to/executable". + +## Queueing a Task + +To run your code, you need to queue a task against it. + +```python +worker = IronWorker() +task = worker.queue(code_name="HelloWorld") +``` + +That will queue a task against the CodePackage with the name "HelloWorld". To pass a [payload](http://dev.iron.io/worker/payloads), just pass the data to `worker.queue`. It will be JSON-serialised and passed into your worker at runtime: + +```python +worker = IronWorker() +task = worker.queue(code_name="HelloWorld", payload={"fruits": ["apples", "oranges", "bananas"], "best_song_ever": "Call Me Maybe"}) +``` + +If you'd like to reuse Tasks or do more complex things with them, you can also instantiate them as instances of the `Task` class, then pass them to `worker.queue` method (this is actually what `worker.queue` is doing, transparently): + +```python +worker = IronWorker() +task = Task(code_name="HelloWorld") +task.payload = { + "fruits": ["apples", "oranges", "bananas"], + "best_song_ever": "Call Me Maybe" +} +response = worker.queue(task) +``` + +If you'd like to, you can even set your task to run after a delay: + +```python +task = Task(code_name="HelloWorld") +task.payload = { + "fruits": ["apples", "oranges", "bananas"], + "best_song_ever": "Call Me Maybe" +} +task.delay = 300 # start this task in 300 seconds (5 minutes) +response = worker.queue(task) +``` + +## Scheduling a Task + +If you'd like to run a task at a specific time, or set a task to be run repeatedly, you want to create a [scheduled task](http://dev.iron.io/worker/scheduling). Unlike previous versions of iron_worker_python, we've unified tasks and scheduled tasks into the same interface. iron_worker_python will automatically detect when you want to create a scheduled task and react accordingly. + +```python +task = Task(code_name="HelloWorldRepeating") +task.payload = { + "fruits": ["apples", "oranges", "bananas"], + "best_song_ever": "Call Me Maybe" +} +task.run_every = 300 # The task will run every 300 seconds (5 minutes) +response = worker.queue(task) +``` + +Likewise, if you'd like to run a task at a specific time, doing so is easy. Just pass a `datetime.datetime` object: + +```python +task = Task(code_name="HelloFuture") +task.start_at = datetime.now() + timedelta(hours=1) # start tomorrow +response = worker.queue(task) +``` + +## Status of a Worker +To get the status of a worker, you can use the `worker.task` method. + +```python +task = worker.queue('HelloWorld') +details = worker.task(task) + +print details.status # prints 'queued', 'complete', 'error' etc. +``` + +If you don't have an instance of `Task`, you can also pass in the task ID. Note that if you do this, however, and you are attempting to retrieve that status of a scheduled task, you need to declare that as well: + +```python +task = worker.queue("HelloWorld") +details = worker.task(id=task.id) + +print details.status + +scheduled_task = worker.queue("HelloWorld", run_every=60, run_count=3) # run this task 3 times, once a minute +scheduled_details = worker.task(scheduled_task.id, scheduled=True) +print scheduled_details.status +``` + +## Get Worker Log + +Use any function that prints text inside your worker to insert messages into you worker's log. To retrieve a worker's log, use the `worker.log` method. + +```python +task = worker.queue('HelloWorld') +time.sleep(10) +print worker.log(task) +``` + +If you don't have an instance of the `Task` object handy, you can also just use the ID of the task: + +```python +task = worker.queue('HelloWorld') +time.sleep(10) +print worker.log(id=task.id) +``` + +## Loading the Task Data Payload + +When your code is executed, it will be passed three program arguments: + +* **-id** - The task id. +* **-payload** - the filename containing the data payload for this particular task. +* **-d** - the user writable directory that can be used while running your job. + +Simply open the filename passed by `-payload`, read its contents, and (if you used iron_worker_python to queue the task), decode the string as JSON: + +```python +payload = None +payload_file = None +for i in range(len(sys.argv)): + if sys.argv[i] == "-payload" and (i + 1) < len(sys.argv): + payload_file = sys.argv[i] + break + +f = open(payload_file, "r") +contents = f.read() +f.close() + +payload = json.loads(contents) +``` + +# Full Documentation + +You can find more documentation here: + +* [Iron.io Dev Center](http://dev.iron.io): Full documentation for Iron.io products. +* [Example Workers](https://github.com/iron-io/iron_worker_examples) diff --git a/iron_worker.py b/iron_worker.py new file mode 100644 index 0000000..749820d --- /dev/null +++ b/iron_worker.py @@ -0,0 +1,470 @@ +import os +import httplib +import mimetypes +import zipfile +import time +from dateutil.tz import * +import iron_core +try: + import json +except ImportError: + import simplejson as json + + +def file_exists(file): + """Check if a file exists.""" + if not os.path.exists(file): + return False + try: + open(file) + except IOError: + return False + return True + + +class Task: + id = None + project = None + code_id = None + code_history_id = None + status = None + code_name = None + code_rev = None + created_at = None + updated_at = None + start_time = None + end_time = None + duration = None + timeout = 3600 + message = None + delay = 0 + start_at = None + end_at = None + next_start = None + last_run_time = None + run_times = None + run_count = None + run_every = None + percent = None + payload = None + priority = 0 + + scheduled = False + repeating = False + + __json_attrs = ["payload"] + __rfc3339_attrs = ["created_at", "updated_at", "start_at", "end_at", + "next_start", "last_run_time"] + __timestamp_attrs = ["start_time", "end_time"] + __schedule_attrs = ["start_at", "end_at", "next_start", "last_run_time", + "run_count", "run_every"] + __repeating_attrs = ["end_at", "next_start", "run_every"] + __aliases = { + "project": "project_id", + "msg": "message" + } + __ignore = ["message"] + + def __str__(self): + if self.id is not None and self.scheduled: + return "IronWorker Scheduled Task #%s" % self.id + elif self.id is not None: + return "IronWorker Task #%s" % self.id + else: + return "IronWorker Task" + + def __repr__(self): + return "<%s>" % str(self) + + def __set(self, attr, value): + if attr in self.__rfc3339_attrs: + if isinstance(value, basestring): + value = iron_core.IronClient.fromRfc3339(value) + if attr in self.__schedule_attrs: + self.scheduled = True + if attr in self.__repeating_attrs: + self.repeating = True + if attr in self.__json_attrs: + if isinstance(value, basestring): + try: + value = json.loads(value) + except: + pass + setattr(self, attr, value) + + def __init__(self, values=None, **kwargs): + if values is None: + values = {} + + self.payload = {} + + attrs = [x for x in vars(self.__class__).keys() + if not x.startswith("__")] + + for k in kwargs.keys(): + values[k] = kwargs[k] + + for prop in values.keys(): + if prop in attrs and prop not in self.__ignore: + self.__set(prop, values[prop]) + elif prop in self.__aliases: + self.__set(self.__aliases[prop], values[prop]) + + +class CodePackage: + id = None + project = None + name = None + runtime = None + latest_checksum = None + revision = None + latest_history_id = None + latest_change = None + files = None + executable = None + zip_path = None + __rfc3339_attrs = ["latest_change"] + __aliases = { + "project_id": "project", + "rev": "revision", + "exec": "executable" + } + + def __str__(self): + if self.name is not None: + return "%s Code Package" % self.name + elif self.id is not None: + return "Code Package #%s" % self.id + else: + return "IronWorker Code Package" + + def __repr__(self): + return "<%s>" % str(self) + + def __set(self, attr, value): + if attr in self.__rfc3339_attrs: + value = iron_core.IronClient.fromRfc3339(value) + setattr(self, attr, value) + + def __init__(self, values=None, **kwargs): + if values is None: + values = {} + + self.files = {} + + for k in kwargs.keys(): + values[k] = kwargs[k] + + attrs = [x for x in vars(self.__class__).keys() + if not x.startswith("__")] + + for prop in values.keys(): + if prop in attrs: + self.__set(prop, values[prop]) + elif prop in self.__aliases: + self.__set(self.__aliases[prop], values[prop]) + + def merge(self, target, ignoreRootDir=False): + if os.path.isfile(target): + self.files[os.path.basename(target)] = target + elif os.path.isdir(target): + for dirname, dirnames, filenames in os.walk(target): + for filename in filenames: + path = os.path.join(dirname, filename) + if ignoreRootDir: + ziploc = path.lstrip(target).lstrip("/") + else: + ziploc = path + self.files[ziploc] = path + else: + raise ValueError("'%s' is not a file or directory." % target) + if len(self.files) == 1: + for dest, loc in self.files.iteritems(): + self.executable = dest + + def zip(self, destination=None, overwrite=True): + if destination is None: + if self.name is not None: + destination = "%s.zip" % self.name + else: + raise ValueError("Package name or destination is required.") + if file_exists(destination) and not overwrite: + raise ValueError("Destination '%s' already exists." % destination) + filelist = self.files.copy() + for dest, loc in filelist.items(): + if not file_exists(loc): + del(self.files[dest]) + if len(self.files) > 0: + z = zipfile.ZipFile(destination, "w") + for dest, loc in self.files.items(): + z.write(loc, dest) + z.close() + self.zip_path = destination + return file_exists(destination) + + +class IronWorker: + NAME = "iron_worker_python" + VERSION = "0.1.0" + + def __init__(self, **kwargs): + """Prepare a configured instance of the API wrapper and return it. + + Keyword arguments are passed directly to iron_core_python; consult its + documentation for a full list and possible values.""" + self.client = iron_core.IronClient(name=IronWorker.NAME, + version=IronWorker.VERSION, product="iron_worker", **kwargs) + + ############################################################# + ####################### CODE PACKAGES ####################### + ############################################################# + def codes(self): + packages = [] + resp = self.client.get("codes") + raw_packages = resp["body"]["codes"] + for package in raw_packages: + packages.append(CodePackage(package)) + return packages + + def code(self, id): + if isinstance(id, CodePackage): + id = id.id + resp = self.client.get("codes/%s" % id) + raw_package = resp["body"] + return CodePackage(raw_package) + + def postCode(self, code, zipFilename=None): + zip_loc = code.zip_path + if zipFilename is not None: + zip_loc = zipFilename + if zip_loc is None: + raise ValueError("Need to set the zip file to upload.") + if not file_exists(zip_loc): + raise ValueError("File doesn't exist: %s" % zip_loc) + if code.name is None: + raise ValueError("Code needs a name.") + if code.executable is None: + raise ValueError("Code's executable file needs to be set.") + if code.runtime is None: + code.runtime = "python" + file = open(zip_loc, "rb") + file_contents = file.read() + file.close() + + data = [("data", json.dumps({ + "name": code.name, + "runtime": code.runtime, + "file_name": code.executable + }))] + + files = [("file", zip_loc, file_contents)] + + content_type, body = IronWorker.encode_multipart_formdata(data, files) + headers = { + "Content-Type": content_type + } + resp = self.client.post(url="codes", body=body, headers=headers) + return CodePackage(resp["body"]) + + def upload(self, target, name=None, executable=None, overwrite=True): + if isinstance(target, CodePackage): + code = target + else: + code = CodePackage() + code.merge(target) + if name is not None: + code.name = name + if executable is not None: + code.executable = executable + if code.name is None: + raise ValueError("Need to set a name for the package.") + if code.executable is None: + raise ValueError("Need to set a file as the executable.") + clean_up = not file_exists("%s.zip" % code.name) or overwrite + if code.zip_path is None or not file_exists(code.zip_path): + code.zip(overwrite=overwrite) + result = self.postCode(code) + if clean_up: + os.remove(code.zip_path) + return result + + def deleteCode(self, id): + if isinstance(id, CodePackage): + id = id.id + resp = self.client.delete("codes/%s" % id) + return True + + def revisions(self, id): + revisions = [] + if isinstance(id, CodePackage): + id = id.id + resp = self.client.get("codes/%s/revisions" % id) + raw_revs = resp["body"]["revisions"] + for rev in raw_revs: + revisions.append(CodePackage(rev)) + return revisions + + def download(self, id, rev=None, destination=None): + if isinstance(id, CodePackage): + if rev is None and id.revision is not None: + rev = id.revision + id = id.id + url = "codes/%s/download" % id + if rev is not None: + url = "%s?revision=%s" % (url, rev) + resp = self.client.get(url) + dest = resp["resp"].getheader("Content-Disposition") + dest = dest.lstrip("filename=") + if destination is not None: + if os.path.isdir(destination): + dest = os.path.join(destination, dest) + else: + dest = destination + dup_dest = dest + iteration = 1 + while file_exists(dup_dest) and destination is None: + iteration += 1 + dup_dest = dest.rstrip(".zip") + " (" + str(iteration) + ").zip" + f = open(dup_dest, "wb") + f.write(resp["body"]) + f.close() + return file_exists(dup_dest) + + ############################################################# + ########################## TASKS ############################ + ############################################################# + def tasks(self, scheduled=False): + tasks = [] + if not scheduled: + resp = self.client.get("tasks") + raw_tasks = resp["body"] + raw_tasks = raw_tasks["tasks"] + else: + resp = self.client.get("schedules") + raw_tasks = resp["body"] + raw_tasks = raw_tasks["schedules"] + + for raw_task in raw_tasks: + tasks.append(Task(raw_task)) + return tasks + + def queue(self, task=None, tasks=None, **kwargs): + tasks_data = [] + if task is None: + task = Task(**kwargs) + if tasks is None: + tasks = [task] + for task in tasks: + payload = task.payload + if not isinstance(payload, basestring): + payload = json.dumps(payload) + if task.code_name is None: + raise ValueError("task.code_name is required.") + task_data = { + "name": task.code_name, + "code_name": task.code_name, + "payload": payload, + "priority": task.priority, + "delay": task.delay + } + if not task.scheduled: + type_str = "tasks" + task_data["timeout"] = task.timeout + else: + type_str = "schedules" + if task.run_every is not None: + task_data["run_every"] = task.run_every + if task.end_at is not None: + if task.end_at.tzinfo is None: + task.end_at = task.end_at.replace(tzinfo=tzlocal()) + task_data["end_at"] = iron_core.IronClient.toRfc3339( + task.end_at) + if task.run_times is not None: + task_data["run_times"] = task.run_times + if task.start_at is not None: + if task.start_at.tzinfo is None: + task.start_at = task.start_at.replace(tzinfo=tzlocal()) + task_data["start_at"] = iron_core.IronClient.toRfc3339( + task.start_at) + tasks_data.append(task_data) + data = json.dumps({type_str: tasks_data}) + headers = {"Content-Type": "application/json"} + + resp = self.client.post(type_str, body=data, headers=headers) + tasks = resp["body"] + if len(tasks[type_str]) > 1: + return [Task(task, scheduled=(type_str == "schedules")) + for task in tasks[type_str]] + else: + return Task(tasks[type_str][0], + scheduled=(type_str == "schedules")) + + def task(self, id, scheduled=False): + if isinstance(id, Task): + scheduled = id.scheduled + id = id.id + if not scheduled: + url = "tasks/%s" % id + else: + url = "schedules/%s" % id + resp = self.client.get(url) + raw_task = resp["body"] + return Task(raw_task) + + def log(self, id): + if isinstance(id, Task): + if id.scheduled: + raise ValueError("Cannot retrieve a scheduled task's log.") + id = id.id + url = "tasks/%s/log" % id + headers = {"Accept": "text/plain"} + resp = self.client.get(url, headers=headers) + return resp["body"] + + def cancel(self, id, scheduled=False): + if isinstance(id, Task): + scheduled = id.scheduled + id = id.id + if not scheduled: + url = "tasks/%s/cancel" % id + else: + url = "schedules/%s/cancel" % id + resp = self.client.post(url) + return True + + ############################################################# + ######################### HELPERS ########################### + ############################################################# + @staticmethod + def encode_multipart_formdata(fields, files): + """ + fields is a sequence of (name, value) elements for regular form fields. + files is a sequence of (name, filename, value) elements for data to be + uploaded as files + Return (content_type, body) ready for httplib.HTTP instance + """ + BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$' + CRLF = '\r\n' + L = [] + for (key, value) in fields: + L.append('--' + BOUNDARY) + L.append('Content-Disposition: form-data; name="%s"' % key) + L.append('') + L.append(value) + for (key, filename, value) in files: + L.append('--' + BOUNDARY) + L.append('Content-Disposition: form-data; name="%s"; filename="%s"' + % (key, filename)) + L.append('Content-Type: %s' + % IronWorker.get_content_type(filename)) + L.append('') + L.append(value) + L.append('--' + BOUNDARY + '--') + L.append('') + body = CRLF.join(L) + content_type = 'multipart/form-data; boundary=%s' % BOUNDARY + return content_type, str(body) + + @staticmethod + def get_content_type(filename): + return mimetypes.guess_type(filename)[0] or 'application/octet-stream' diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..11d64d3 --- /dev/null +++ b/setup.py @@ -0,0 +1,39 @@ +from setuptools import setup + +setup( + name='iron-worker', + py_modules=["iron_worker"], + packages=["testDir"], + version='0.1.0', + install_requires=["iron_core"], + description='The Python client for IronWorker, a cloud service for background processing.', + author='Iron.io', + author_email="support@iron.io", + url='https://www.github.com/iron-io/iron_worker_python', + keywords=['iron', 'ironio', 'iron.io', 'iron-io', 'ironworker', 'iron-worker', 'iron_worker', 'worker', 'cloud', 'task queue', 'background processing'], + classifiers = [ + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Intended Audience :: Developers", + "Operating System :: OS Independent", + "Development Status :: 2 - Pre-Alpha", + "License :: OSI Approved :: BSD License", + "Natural Language :: English", + "Topic :: Internet", + "Topic :: Internet :: WWW/HTTP", + "Topic :: Software Development :: Libraries :: Python Modules", + + ], + long_description="""\ +IronWorker Python Library +------------------------- + +This package offers a client interface to the Iron.io IronWorker service. It +offers a full, native interface to the IronWorker API, including creating +and uploading code package, queuing and scheduling tasks, viewing task logs, +and more. + +IronWorker is a background processing and task queuing system that lets your +applications use the cloud to do their heavy lifting. Find out more at +http://www.iron.io/products/worker.""" +) diff --git a/test.py b/test.py new file mode 100644 index 0000000..29ec108 --- /dev/null +++ b/test.py @@ -0,0 +1,142 @@ +from iron_worker import * +import unittest +from datetime import datetime +from datetime import timedelta + + +class TestIronWorker(unittest.TestCase): + + def setUp(self): + self.code_name = "test_code" + + self.worker = IronWorker() + + self.package = CodePackage() + self.package.merge("testDir", True) + self.package.executable = "hello.py" + self.worker.upload(self.package, name=self.code_name) + + def test_getCodeDetails(self): + codes = self.worker.codes() + + code = self.worker.code(codes[0].id) + self.assertEqual(codes[0].id, code.id) + + def test_postTask(self): + payload = { + "dict": {"a": 1, "b": 2}, + "var": "alpha", + "list": ['apples', 'oranges', 'bananas'] + } + resp = self.worker.queue(code_name=self.code_name, payload=payload) + + task_id = resp.id + + tasks = self.worker.tasks() + task_ids = [] + for task in tasks: + task_ids.append(task.id) + + self.assertIn(task_id, task_ids) + + def test_getTaskDetails(self): + payload = { + "dict": {"a": 1, "b": 2}, + "var": "alpha", + "list": ['apples', 'oranges', 'bananas'] + } + resp = self.worker.queue(code_name=self.code_name, payload=payload) + + task_id = resp.id + + tasks = self.worker.tasks() + task_ids = [] + for task in tasks: + task_ids.append(task.id) + + self.assertIn(task_id, task_ids) + + task = self.worker.task(resp) + + self.assertEqual(task_id, task.id) + + def test_zcancelTask(self): + tasks = self.worker.tasks() + + for task in tasks: + self.worker.cancel(task) + + new_tasks = self.worker.tasks() + real_tasks = [] + for task in new_tasks: + if task.status not in ['cancelled', 'error']: + real_tasks.append(task) + self.assertEqual(len(real_tasks), 0) + + def test_postSchedule(self): + resp = self.worker.queue(code_name=self.code_name, delay=120) + + schedules = self.worker.tasks() + schedule_ids = [] + for schedule in schedules: + schedule_ids.append(schedule.id) + + time.sleep(2) + + self.assertIn(resp.id, schedule_ids) + + def test_postScheduleAndPayload(self): + resp = self.worker.queue(code_name=self.code_name, delay=120, + payload={"foo": "bar"}) + + schedules = self.worker.tasks() + schedule_ids = [] + for schedule in schedules: + schedule_ids.append(schedule.id) + + time.sleep(2) + + self.assertIn(resp.id, schedule_ids) + + def test_postAdvancedSchedule(self): + delta = timedelta(hours=1) + start_at = datetime.now() + delta # one hour from now + resp = self.worker.queue( + code_name=self.code_name, + payload={"schedule": "AWESOME SCHEDULE!"}, + start_at=start_at, run_every=3600, run_times=8) + + schedules = self.worker.tasks(scheduled=True) + schedule_ids = [] + for schedule in schedules: + schedule_ids.append(schedule.id) + + time.sleep(2) + + self.assertIn(resp.id, schedule_ids) + + def test_zcancelSchedule(self): + schedules = self.worker.tasks(scheduled=True) + + for schedule in schedules: + self.worker.cancel(schedule) + + new_schedules = self.worker.tasks(scheduled=True) + real_schedules = [] + for schedule in new_schedules: + if schedule.status not in ['cancelled', 'error']: + real_schedules.append(schedule) + + self.assertEqual(len(real_schedules), 0) + + def test_zdeleteCode(self): + codes = self.worker.codes() + + for code in codes: + self.worker.deleteCode(code) + + new_codes = self.worker.codes() + self.assertEqual(len(new_codes), 0) + +if __name__ == '__main__': + unittest.main() diff --git a/testDir/hello.py b/testDir/hello.py new file mode 100644 index 0000000..92c61b1 --- /dev/null +++ b/testDir/hello.py @@ -0,0 +1 @@ +print "HELLO WORLD"