From 6c227500010a6a1c1321343ecb6b03f7c4b61a18 Mon Sep 17 00:00:00 2001 From: Corey Oordt Date: Sun, 12 Nov 2023 07:50:05 -0600 Subject: [PATCH] Initial commit --- .changelog-config.yaml | 128 +++++++ .composition.yaml | 128 +++++++ .editorconfig | 23 ++ .github/ISSUE_TEMPLATE.md | 15 + .github/changelog_templates/commit.md.jinja | 8 + .../version_heading.md.jinja | 4 + .gitignore | 174 +++++++++ .pre-commit-config.yaml | 74 ++++ .secrets.baseline | 129 +++++++ CHANGELOG.md | 5 + CONTRIBUTING.md | 40 ++ LICENSE | 29 ++ MANIFEST.in | 10 + README.md | 194 ++++++++++ RELEASING.md | 50 +++ mkdocs.yml | 78 ++++ psmq/__init__.py | 5 + psmq/config.py | 113 ++++++ psmq/connection.py | 137 +++++++ psmq/exceptions.py | 88 +++++ psmq/manager.py | 61 +++ psmq/message.py | 45 +++ psmq/psmq_library.lua | 362 ++++++++++++++++++ psmq/queue.py | 203 ++++++++++ psmq/serialize.py | 16 + psmq/validation.py | 134 +++++++ pyproject.toml | 218 +++++++++++ tests/__init__.py | 1 + tests/test_psmq.py | 4 + tests/test_queue_lua.py | 274 +++++++++++++ tools/gen-codeowners.sh | 21 + 31 files changed, 2771 insertions(+) create mode 100644 .changelog-config.yaml create mode 100644 .composition.yaml create mode 100644 .editorconfig create mode 100644 .github/ISSUE_TEMPLATE.md create mode 100644 .github/changelog_templates/commit.md.jinja create mode 100644 .github/changelog_templates/version_heading.md.jinja create mode 100644 .gitignore create mode 100644 .pre-commit-config.yaml create mode 100644 .secrets.baseline create mode 100644 CHANGELOG.md create mode 100644 CONTRIBUTING.md create mode 100644 LICENSE create mode 100644 MANIFEST.in create mode 100644 README.md create mode 100644 RELEASING.md create mode 100644 mkdocs.yml create mode 100644 psmq/__init__.py create mode 100644 psmq/config.py create mode 100644 psmq/connection.py create mode 100644 psmq/exceptions.py create mode 100644 psmq/manager.py create mode 100644 psmq/message.py create mode 100644 psmq/psmq_library.lua create mode 100644 psmq/queue.py create mode 100644 psmq/serialize.py create mode 100644 psmq/validation.py create mode 100644 pyproject.toml create mode 100644 tests/__init__.py create mode 100644 tests/test_psmq.py create mode 100644 tests/test_queue_lua.py create mode 100755 tools/gen-codeowners.sh diff --git a/.changelog-config.yaml b/.changelog-config.yaml new file mode 100644 index 0000000..46cce22 --- /dev/null +++ b/.changelog-config.yaml @@ -0,0 +1,128 @@ +# For more configuration information, please see https://coordt.github.io/generate-changelog/ + +# User variables for reference in other parts of the configuration. +variables: + repo_url: https://github.com/callowayproject/psmq + changelog_filename: CHANGELOG.md + +# Pipeline to find the most recent tag for incremental changelog generation. +# Leave empty to always start at first commit. +starting_tag_pipeline: + - action: ReadFile + kwargs: + filename: "{{ changelog_filename }}" + - action: FirstRegExMatch + kwargs: + pattern: (?im)^## (?P\d+\.\d+(?:\.\d+)?)\s+\(\d+-\d{2}-\d{2}\)$ + named_subgroup: rev + +# Used as the version title of the changes since the last valid tag. +unreleased_label: Unreleased + +# Process the commit's first line for use in the changelog. +summary_pipeline: + - action: strip_spaces + - action: Strip + comment: Get rid of any periods so we don't get double periods + kwargs: + chars: . + - action: SetDefault + args: + - no commit message + - action: capitalize + - action: append_dot + +# Process the commit's body for use in the changelog. +body_pipeline: + - action: ParseTrailers + comment: Parse the trailers into metadata. + kwargs: + commit_metadata: save_commit_metadata + +# Process and store the full or partial changelog. +output_pipeline: + - action: IncrementalFileInsert + kwargs: + filename: "{{ changelog_filename }}" + last_heading_pattern: (?im)^## \d+\.\d+(?:\.\d+)?\s+\([0-9]+-[0-9]{2}-[0-9]{2}\)$ + +# Full or relative paths to look for output generation templates. +template_dirs: + - ".github/changelog_templates/" + +# Group the commits within a version by these commit attributes. +group_by: + - metadata.category + +# Only tags matching this regular expression are used for the changelog. +tag_pattern: ^[0-9]+\.[0-9]+(?:\.[0-9]+)?$ + +# Tells ``git-log`` whether to include merge commits in the log. +include_merges: false + +# Ignore commits whose summary line matches any of these regular expression patterns. +ignore_patterns: + - '[@!]minor' + - '[@!]cosmetic' + - '[@!]refactor' + - '[@!]wip' + - ^$ + - ^Merge branch + - ^Merge pull + - ^Version updated + +# Set the commit's category metadata to the first classifier that returns ``True``. +commit_classifiers: + - action: SummaryRegexMatch + category: New + kwargs: + pattern: (?i)^(?:new|add)[^\n]*$ + - action: SummaryRegexMatch + category: Updates + kwargs: + pattern: (?i)^(?:update|change|rename|remove|delete|improve|refactor|chg|modif)[^\n]*$ + - action: SummaryRegexMatch + category: Fixes + kwargs: + pattern: (?i)^(?:fix)[^\n]*$ + - action: + category: Other + +# Tokens in git commit trailers that indicate authorship. +valid_author_tokens: + - author + - based-on-a-patch-by + - based-on-patch-by + - co-authored-by + - co-committed-by + - contributions-by + - from + - helped-by + - improved-by + - original-patch-by + +# Rules applied to commits to determine the type of release to suggest. +release_hint_rules: + - match_result: dev + no_match_result: no-release + branch: ^((?!master|main).)*$ + - match_result: patch + no_match_result: no-release + grouping: Other + branch: master|main + - match_result: patch + no_match_result: no-release + grouping: Fixes + branch: master|main + - match_result: minor + no_match_result: no-release + grouping: Updates + branch: master|main + - match_result: minor + no_match_result: no-release + grouping: New + branch: master|main + - match_result: major + no_match_result: no-release + grouping: Breaking Changes + branch: master|main diff --git a/.composition.yaml b/.composition.yaml new file mode 100644 index 0000000..127b490 --- /dev/null +++ b/.composition.yaml @@ -0,0 +1,128 @@ +checkout: null +commit: ea986dafce834eade60512e88fa1d1280b1d7ece +context: + _copy_without_render: + - .github/**/*.yaml + _docs_requirements: + black: '>=23.3.0' + mkdocs: '>=1.4.3' + mkdocs-gen-files: '>=0.5.0' + mkdocs-literate-nav: '>=0.6.0' + mkdocs-markdownextradata-plugin: '>=0.2.5' + mkdocs-material: '>=9.1.0' + mkdocs-section-index: '>=0.3.5' + mkdocs-techdocs-core: '>=1.2.0' + mkdocstrings: '>=0.21.2' + mkdocstrings-python: '>=1.0.0' + friendly_name: Python Simple Message Queue + github_user: callowayproject + packaging_tool: pip/setuptools + project_name: psmq + project_short_description: Python Simple Message Queue + project_slug: psmq + site_name: Python Simple Message Queue + version: 0.1.0 +directory: cookiecutter-package +merge_strategies: + '*.json': comprehensive + '*.toml': comprehensive + '*.yaml': comprehensive + '*.yml': comprehensive +no_input: false +overwrite: [] +overwrite_exclude: [] +password: null +skip_generation: [] +skip_hooks: false +skip_if_file_exists: false +template: datascience-cookiecomposer-templates/ +--- +checkout: null +commit: ea986dafce834eade60512e88fa1d1280b1d7ece +context: + _copy_without_render: + - .github/**/*.jinja + _dev_requirements: + bump2version: '>=1.0.1' + generate-changelog: '>=0.7.6' + git-fame: '>=1.12.2' + pip-tools: '' + poetry: '' + _docs_requirements: {} + _prod_requirements: + environs: '>=9.3.5' + _test_requirements: + coverage: '>=6.1.2' + poetry: '>=1.5.1' + pre-commit: '>=2.15.0' + pytest: '>=6.0.0' + pytest-cov: '>=3.0.0' + author: Corey Oordt + email: coreyoordt@gmail.com + friendly_name: Python Simple Message Queue + github_user: callowayproject + packaging_tool: pip/setuptools + project_name: psmq + project_short_description: Python Simple Message Queue + project_slug: psmq + site_name: Python Simple Message Queue + version: 0.1.0 +directory: cookiecutter-mkdocs +merge_strategies: + '*.json': comprehensive + '*.toml': comprehensive + '*.yaml': do-not-merge + '*.yml': do-not-merge +no_input: false +overwrite: [] +overwrite_exclude: [] +password: null +skip_generation: [] +skip_hooks: false +skip_if_file_exists: false +template: datascience-cookiecomposer-templates/ +--- +checkout: null +commit: ea986dafce834eade60512e88fa1d1280b1d7ece +context: + _copy_without_render: + - .github/**/*.jinja + _dev_requirements: + bump2version: '>=1.0.1' + generate-changelog: '>=0.7.6' + git-fame: '>=1.12.2' + pip-tools: '' + poetry: '' + _docs_requirements: {} + _prod_requirements: + environs: '>=9.3.5' + _test_requirements: + coverage: '>=6.1.2' + poetry: '>=1.5.1' + pre-commit: '>=2.15.0' + pytest: '>=6.0.0' + pytest-cov: '>=3.0.0' + author: Corey Oordt + email: coreyoordt@gmail.com + friendly_name: Python Simple Message Queue + github_user: callowayproject + packaging_tool: pip/setuptools + project_name: psmq + project_short_description: Python Simple Message Queue + project_slug: psmq + site_name: Python Simple Message Queue + version: 0.1.0 +directory: cookiecutter-boilerplate +merge_strategies: + '*.json': comprehensive + '*.toml': comprehensive + '*.yaml': comprehensive + '*.yml': comprehensive +no_input: false +overwrite: [] +overwrite_exclude: [] +password: null +skip_generation: [] +skip_hooks: false +skip_if_file_exists: false +template: datascience-cookiecomposer-templates/ diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..585e2ab --- /dev/null +++ b/.editorconfig @@ -0,0 +1,23 @@ +# http://editorconfig.org + +root = true + +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true + +[*.{py,rst,ini}] +indent_style = space +indent_size = 4 + +[*.{html,css,scss,json,yml}] +indent_style = space +indent_size = 2 + +[*.md] +trim_trailing_whitespace = false + +[Makefile] +indent_style = tab diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 0000000..7ef5034 --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,15 @@ +* psmq version: +* Python version: +* Operating System: + +### Description + +Describe what you were trying to get done. +Tell us what happened, what went wrong, and what you expected to happen. + +### What I Did + +``` +Paste the command(s) you ran and the output. +If there was a crash, please include the traceback here. +``` diff --git a/.github/changelog_templates/commit.md.jinja b/.github/changelog_templates/commit.md.jinja new file mode 100644 index 0000000..19c9dde --- /dev/null +++ b/.github/changelog_templates/commit.md.jinja @@ -0,0 +1,8 @@ +- {{ commit.summary }} [{{ commit.short_sha }}]({{ repo_url }}/commit/{{ commit.sha }}) + {{ commit.body|indent(2, first=True) }} + {% for key, val in commit.metadata["trailers"].items() %} + {% if key not in VALID_AUTHOR_TOKENS %} + **{{ key }}:** {{ val|join(", ") }} + + {% endif %} +{% endfor %} diff --git a/.github/changelog_templates/version_heading.md.jinja b/.github/changelog_templates/version_heading.md.jinja new file mode 100644 index 0000000..b8e3b07 --- /dev/null +++ b/.github/changelog_templates/version_heading.md.jinja @@ -0,0 +1,4 @@ +## {{ version.label }} ({{ version.date_time.strftime("%Y-%m-%d") }}) +{% if version.previous_tag %} +[Compare the full difference.]({{ repo_url }}/compare/{{ version.previous_tag }}...{{ version.tag }}) +{% endif %} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..61569bc --- /dev/null +++ b/.gitignore @@ -0,0 +1,174 @@ +### macOS ### +# General +.DS_Store +.AppleDouble +.LSOverride + +### Python ### +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs +docsrc/_build/ +docsrc/_autosummary + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints +*/.ipynb_checkpoints/* + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +okta.env + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# Mr Developer +.mr.developer.cfg +.project +.pydevproject + +# Pycharm/Intellij +.idea + +# Complexity +output/*.html +output/*/index.html + +# Testing artifacts +junit-*.xml +flake8-errors.txt + +RELEASE.txt +site-packages +reports +*.env +todo.txt + +# VSCode +.vscode/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..fba9feb --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,74 @@ +repos: + - repo: https://github.com/charliermarsh/ruff-pre-commit + # Ruff version. + rev: 'v0.0.285' + hooks: + - id: ruff + args: [--fix, --exit-non-zero-on-fix] + - repo: https://github.com/python/black + rev: 23.7.0 + hooks: + - id: black + - repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.4.0 + hooks: + - id: check-added-large-files + - id: check-case-conflict + - id: check-executables-have-shebangs + - id: check-json + exclude: test.* + - id: check-merge-conflict + - id: check-shebang-scripts-are-executable + - id: check-symlinks + - id: check-toml + - id: check-yaml + exclude: | + (?x)^( + test.*| + mkdocs.yml + )$ + args: [--allow-multiple-documents] + - id: debug-statements + - id: end-of-file-fixer + exclude: "^tests/resources/" + - id: fix-byte-order-marker + - id: fix-encoding-pragma + args: ["--remove"] + - id: requirements-txt-fixer + - repo: https://github.com/Yelp/detect-secrets + rev: v1.4.0 + hooks: + - id: detect-secrets + args: ['--baseline', '.secrets.baseline'] + additional_dependencies: ["gibberish-detector"] + - repo: https://github.com/pre-commit/mirrors-mypy + rev: v1.0.0 + hooks: + - id: mypy + args: [--no-strict-optional, --ignore-missing-imports] + additional_dependencies: ["pydantic", "toml", "types-all"] + - repo: https://github.com/jsh9/pydoclint + rev: 0.2.4 + hooks: + - id: pydoclint + args: + - "--config=pyproject.toml" + - repo: https://github.com/econchick/interrogate + rev: 1.5.0 # or master if you're bold + hooks: + - id: interrogate + exclude: test.* + - repo: local + hooks: + - id: check-dependencies + name: check-dependencies + language: python + entry: make -C requirements all + files: ^requirements.*?\.(in|txt)$ + pass_filenames: false + additional_dependencies: + - pip-tools + - repo: https://github.com/python-jsonschema/check-jsonschema + rev: 0.21.0 + hooks: + - id: check-azure-pipelines diff --git a/.secrets.baseline b/.secrets.baseline new file mode 100644 index 0000000..bee178d --- /dev/null +++ b/.secrets.baseline @@ -0,0 +1,129 @@ +{ + "version": "1.4.0", + "plugins_used": [ + { + "name": "ArtifactoryDetector" + }, + { + "name": "AWSKeyDetector" + }, + { + "name": "AzureStorageKeyDetector" + }, + { + "name": "Base64HighEntropyString", + "limit": 4.5 + }, + { + "name": "BasicAuthDetector" + }, + { + "name": "CloudantDetector" + }, + { + "name": "DiscordBotTokenDetector" + }, + { + "name": "GitHubTokenDetector" + }, + { + "name": "HexHighEntropyString", + "limit": 3.0 + }, + { + "name": "IbmCloudIamDetector" + }, + { + "name": "IbmCosHmacDetector" + }, + { + "name": "JwtTokenDetector" + }, + { + "name": "KeywordDetector", + "keyword_exclude": "" + }, + { + "name": "MailchimpDetector" + }, + { + "name": "NpmDetector" + }, + { + "name": "PrivateKeyDetector" + }, + { + "name": "SendGridDetector" + }, + { + "name": "SlackDetector" + }, + { + "name": "SoftlayerDetector" + }, + { + "name": "SquareOAuthDetector" + }, + { + "name": "StripeDetector" + }, + { + "name": "TwilioKeyDetector" + } + ], + "filters_used": [ + { + "path": "detect_secrets.filters.allowlist.is_line_allowlisted" + }, + { + "path": "detect_secrets.filters.common.is_ignored_due_to_verification_policies", + "min_level": 2 + }, + { + "path": "detect_secrets.filters.heuristic.is_indirect_reference" + }, + { + "path": "detect_secrets.filters.heuristic.is_likely_id_string" + }, + { + "path": "detect_secrets.filters.heuristic.is_lock_file" + }, + { + "path": "detect_secrets.filters.heuristic.is_not_alphanumeric_string" + }, + { + "path": "detect_secrets.filters.heuristic.is_potential_uuid" + }, + { + "path": "detect_secrets.filters.heuristic.is_prefixed_with_dollar_sign" + }, + { + "path": "detect_secrets.filters.heuristic.is_sequential_string" + }, + { + "path": "detect_secrets.filters.heuristic.is_swagger_file" + }, + { + "path": "detect_secrets.filters.heuristic.is_templated_secret" + } + ], + "results": { + ".secrets.baseline": [ + { + "type": "Hex High Entropy String", + "filename": ".secrets.baseline", + "hashed_secret": "7c57cfdd241f019222b465b023af098ebd9007cd", + "is_verified": false, + "line_number": 115 + }, + { + "type": "Secret Keyword", + "filename": ".secrets.baseline", + "hashed_secret": "7c57cfdd241f019222b465b023af098ebd9007cd", + "is_verified": false, + "line_number": 115 + } + ] + }, + "generated_at": "2023-01-03T19:33:38Z" +} diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..8d3fb7c --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,5 @@ +# Changelog + +## 0.1.0 (2023-08-27) + +* Initial creation diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..bd9a953 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,40 @@ +# Contributing + +## Getting started with development + +### Setup + +There are several ways to create your isolated environment. This is the default method. + +Run the following in a terminal: + +``` +# Clone the repository +git clone https://github.com/callowayproject/psmq.git + +# Enter the repository +cd psmq + +# Create then activate a virtual environment +python -m venv venv +source venv/bin/activate + +# Install the development requirements +python -m pip install -r requirements/dev.txt +``` + +### Run tests + +Once setup, you should be able to run tests: +``` +pytest +``` + +## Install Pre-commit Hooks + + +Pre-commit hooks are scripts that run every time you make a commit. If any of the scripts fail, it stops the commit. You can see a listing of the checks in the ``.pre-commit-config.yaml`` file. + +``` +pre-commit install +``` diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..1c867c5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,29 @@ +BSD License + +Copyright (c) 2023, Corey Oordt +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, this + list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + +* Neither the name of psmq nor the names of its + contributors may be used to endorse or promote products derived from this + software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. +IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, +INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE +OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED +OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/MANIFEST.in b/MANIFEST.in new file mode 100644 index 0000000..7274818 --- /dev/null +++ b/MANIFEST.in @@ -0,0 +1,10 @@ +include AUTHORS.md +include CODE_OF_CONDUCT.md +include CHANGELOG.md +include CONTRIBUTING.md +include LICENSE +include README.md + +graft tests +prune __pycache__ +global-exclude *.py[cod] diff --git a/README.md b/README.md new file mode 100644 index 0000000..ce53399 --- /dev/null +++ b/README.md @@ -0,0 +1,194 @@ +# Python Simple Message Queue + +Python Simple Message Queue + +QueueManager: + +- Configured to set up a daemonized Redis server with persistent storage +- Sentinel process to monitor queues +- creates queues +- deletes queues + +Queue: + +- Each queue is independent within a single Redis instance. + +- The underlying queue is run by the `pmsq_library.lua` script. + +- Queue ops: + + - pop: Get the message with the highest score. The message is deleted from the queue table and the message table. + + - push: Add a message to the queue. The message is added to the message table and the queue table. Message data must be binary. + + - peek: Get a copy of the message with the highest score. The message is not deleted from the queue table or the message table. + + - delete: Delete a message from the queue. The message is deleted from the queue table and the message table. + + - get: Get the message with the highest score from the queue. The message is not deleted from the queue table or the message table. The "visibiltiy timeout" is added to the message_id's score in the queue table. + + - Messages retrieved with `get` _must_ be deleted with `delete` or they will be returned to the queue after the visibility timeout expires. + +Message: + +The result of a pop, peek, or get operation is a Message object. The Message object has the following fields: + +- message_id: A unique identifier for the message. This is a sortable string based on the message's timestamp and a random string. +- sent: The timestamp when the message was sent. +- data: The message data. This is a binary string, unless the deserializer was set when getting the queue. +- first_retrieved: the timestamp when the message was first retrieved by a client. Peek operations do not update this field. +- retrieval_count: The number of times a client has retrieved the message. Peek operations do not update this field. This field is updated when the message is retrieved with `get` or `pop`. + +## Basic usage + +### Get or create a queue + +```python +from psmq import QueueManager +from pathlib import Path + +file_queues = QueueManager(db_dir=Path("~./.config/pmsq").expanduser().resolve()) +file_test_queue = file_queues.get_queue("test_queue") +``` + +### Push a message to a queue + +This example manually encodes the message to binary using msgpack and json. The message data must be binary. + +```python +from psmq import QueueManager +import umsgpack +import json + +memory_queues = QueueManager() +test_queue = memory_queues.get_queue("test_queue") + +msgpack_data = umsgpack.packb({"foo": "bar"}) +json_data = json.dumps({"foo": "bar"}).encode("utf-8") +test_queue.push(msgpack_data) +test_queue.push(json_data) +``` + +This example sets the default serializers and deserializers to msgpack and json. The message data can be any python object. +```python +from psmq import QueueManager +import umsgpack +import json + +json_serializer = lambda x: json.dumps(x).encode("utf-8") +json_deserializer = lambda x: json.loads(x.decode("utf-8")) + +memory_queues = QueueManager() +msgpack_test_queue = memory_queues.get_queue( + "msgpack_test_queue", + serializer=umsgpack.packb, + deserializer=umsgpack.unpackb +) +json_test_queue = memory_queues.get_queue( + "json_test_queue", + serializer=json_serializer, + deserializer=json_deserializer +) + +message_data = {"foo": "bar"} +msgpack_test_queue.push(message_data) +json_test_queue.push(message_data) +``` + +### Pop a message from a queue + +This example receives and deletes a message from the queue. If the processing of this message fails, the message is lost. + +```python +from psmq import QueueManager +import umsgpack + +memory_queues = QueueManager() +test_queue = memory_queues.get_queue( + "msgpack_test_queue", + serializer=umsgpack.packb, + deserializer=umsgpack.unpackb +) + +message = test_queue.pop() +if message: + print(message) +else: + print("No messages in queue.") +``` + +### Get a message from a queue + +This example receives a message from the queue. The message is not deleted from the queue. The message is deleted after successful processing. If this process crashes, the message appears in the queue after the visibility timeout expires. + +```python +from psmq import QueueManager +import umsgpack + +memory_queues = QueueManager() +test_queue = memory_queues.get_queue( + "msgpack_test_queue", + serializer=umsgpack.packb, + deserializer=umsgpack.unpackb +) + +message = test_queue.get() +if message: + print(message) + message.delete() +else: + print("No messages in queue.") +``` + + + + +*Put a meaningful, short, plain-language description of:* + +- *what this project is trying to accomplish.* +- *why it matters.* +- *the problem(s) this project solves.* +- *how this software can improve the lives of its audience.* +- *what sets this apart from related-projects. Linking to another doc or page is OK if this can't be expressed in a sentence or two.* + +**Technology stack:** *Indicate the technological nature of the software, including primary programming language(s) and whether the software is intended as standalone or as a module in a framework or other ecosystem.* + +**Status:** *Alpha, Beta, 1.1, etc. It's OK to write a sentence, too. The goal is to let interested people know where this project is at. This is also a good place to link to the CHANGELOG.md.* + +**Links:** to production or demo instances + + +## Dependencies + +*Describe any dependencies that must be installed for this software to work. This includes programming languages, databases or other storage mechanisms, build tools, frameworks, and so forth. If specific versions of other software are required, or known not to work, call that out.* + +## Installation + +_Detailed instructions on how to install, configure, and get the project running. This should be frequently tested to ensure reliability. Alternatively, link to a separate INSTALL document._ + +## Configuration + +_If the software is configurable, describe it in detail, either here or in other documentation to which you link._ + +## Usage + +_Show users how to use the software. Be specific. Use appropriate formatting when showing code snippets._ + +## How to test the software + +_If the software includes automated tests, detail how to run those tests._ + +## Known issues + +_Document any known significant shortcomings with the software._ + +## Getting help + +_Instruct users how to get help with this software; this might include links to an issue tracker, wiki, mailing list, etc._ + + +## Getting involved + +_This section should detail why people should get involved and describe key areas you are currently focusing on; e.g., trying to get feedback on features, fixing certain bugs, building important pieces, etc._ + +_General instructions on how to contribute should be stated with a link to CONTRIBUTING.md._ diff --git a/RELEASING.md b/RELEASING.md new file mode 100644 index 0000000..3a1b04a --- /dev/null +++ b/RELEASING.md @@ -0,0 +1,50 @@ +# Releasing + +## Setting Requirements + +Your requirements are split into parts: dev, prod, and test. They exist in the directory ``requirements``. ``prod`` requirements are required for your app to work properly. ``dev`` requirements are packages used to help develop this package, which include things for building documentation, packaging the app and generating the changelog. ``test`` requirements are the libraries required to run tests. ``docs`` requirements are required to build documentation. + +As you develop, you will likely only modify ``requirements/prod.in``. + + +## Releasing your app + +Once you have developed and tested your app, or revisions to it, you need to release new version. + +### Deciding the version + +First decide how to increase the version. Using `semantic versioning`_: + +> Given a version number MAJOR.MINOR.PATCH, increment the: +> +> 1. MAJOR version when you make incompatible API changes, +> 2. MINOR version when you add functionality in a backwards-compatible manner, and +> 3. PATCH version when you make backwards-compatible bug fixes. + +This is a judgement call, but here are some guidelines: + +1. A database change should be a MINOR version bump at least. +2. If the PATCH version is getting above ``10``\ , as in ``1.0.14``\ , it is acceptable to do a MINOR version. +3. Dropping or adding support of a version of Python or another dependency should be at least a MINOR version. + +.. _semantic versioning: https://semver.org/ + +### Versioning and releasing + + +Once you've decided how much of a version bump you are going to do, you will run one of three commands: + +``make release-patch`` will automatically change the patch version, e.g. ``1.1.1`` to ``1.1.2``\ . + +``make release-minor`` will automatically change the minor version, e.g. ``1.1.1`` to ``1.2.0``\ . + +``make release-major`` will automatically change the major version, e.g. ``1.1.1`` to ``2.0.0``\ . + +Each of these commands do several things: + +1. Update the ``CHANGELOG.md`` file with the changes made since the last version, using the Git commit messages. +2. Increments the appropriate version number in the appropriate way. +3. Commits all the changes. +4. Creates a Git tag with the version number. +5. Pushes the repository and tags to the GitHub server. +6. Jenkins recognizes the new tag and publishes the package on PyPI diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 0000000..b4780d8 --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,78 @@ +site_name: Python Simple Message Queue +repo_url: https://github.com/callowayproject/psmq +edit_uri: edit/main/docs/ +theme: + name: material + logo: assets/chr_logo.png + favicon: assets/favicon.png + features: + - navigation.tabs + - navigation.sections + palette: + - media: "(prefers-color-scheme: light)" + scheme: default + toggle: + icon: material/toggle-switch-off-outline + name: Switch to dark mode + - media: "(prefers-color-scheme: dark)" + scheme: slate + toggle: + icon: material/toggle-switch + name: Switch to light mode +use_directory_urls: false +markdown_extensions: + - abbr + - admonition + - attr_list + - codehilite + - footnotes + - pymdownx.details + - pymdownx.emoji: + emoji_index: !!python/name:materialx.emoji.twemoji + emoji_generator: !!python/name:materialx.emoji.to_svg + - pymdownx.highlight + - pymdownx.snippets + - pymdownx.superfences: + custom_fences: + - name: mermaid + class: mermaid + format: !!python/name:pymdownx.superfences.fence_code_format + - pymdownx.tabbed + - toc: + permalink: true + - tables + +plugins: + - markdownextradata: {} + - search + - techdocs-core + - gen-files: + scripts: + - docs/gen_doc_stubs.py + - literate-nav: + nav_file: SUMMARY.md + - section-index + - mkdocstrings: + handlers: + python: + import: + - https://docs.python.org/3/objects.inv + options: + docstring_options: + ignore_init_summary: true + docstring_section_style: spacy + separate_signature: true + line_length: 60 + docstring_style: google + members_order: alphabetical + merge_init_into_class: true + group_by_category: true + show_category_heading: false + show_submodules: no + show_root_members_full_path: false + +extra_javascript: + - "https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.0/MathJax.js?config=TeX-MML-AM_CHTML" + +extra_css: + - assets/extra.css diff --git a/psmq/__init__.py b/psmq/__init__.py new file mode 100644 index 0000000..65215c2 --- /dev/null +++ b/psmq/__init__.py @@ -0,0 +1,5 @@ +"""Top-level package for psmq.""" + +__version__: str = "0.1.0" + +from .manager import QueueManager diff --git a/psmq/config.py b/psmq/config.py new file mode 100644 index 0000000..f01e72d --- /dev/null +++ b/psmq/config.py @@ -0,0 +1,113 @@ +""" +Definitions of data structures and constants. +""" +from dataclasses import dataclass + +DEFAULT_VT = 60 +"""The default visibility timeout for new queues.""" + +DEFAULT_DELAY = 0 +"""The default delivery delay for new queues.""" + +DEFAULT_MAX_SIZE = 65565 +"""The default maximum size of a message for new queues.""" + +DEFAULT_RETRIES = 5 +"""The default number of message retries for new queues.""" + + +@dataclass +class QueueOrder: + """The order in which to return messages from a queue.""" + + +@dataclass +class QueueAttributes: + """ + The attributes, counter and stats of a queue. + """ + + viz_timeout: int + """The length of time, in seconds, that a message received from a queue will + be invisible to other receiving components when they ask to receive messages.""" + + initial_delay: int + "The time in seconds that the delivery of all new messages in the queue will be delayed" + + maxsize: int + "The maximum size of a message in bytes" + + totalrecv: int + "Total number of messages received from (taken off of) the queue" + + totalsent: int + "Total number of messages sent to this queue" + + created: int + "Timestamp (epoch in seconds) when the queue was created" + + modified: int + "Timestamp (epoch in seconds) when the queue was last modified with :meth:`.Queue.set_attributes`" + + msgs: int + "Current number of messages in the queue" + + hiddenmsgs: int + """ + Current number of hidden / not visible messages. + + A message typically is hidden while "in flight". This number can be a good measurement for + how many messages are currently being processed. + """ + + +@dataclass +class QueueDefinition: + """ + A representation of RSMQ's internal :class:`.Queue` definition. + """ + + qname: str + "The name of the queue" + + vt: int + """The length of time, in seconds, that a message received from a queue will + be invisible to other receiving components when they ask to receive messages.""" + + delay: int + "The time in seconds that the delivery of all new messages in the queue will be delayed" + + maxsize: int + "The maximum size of a message in bytes" + + ts: int + """The current Redis server timestamp in seconds since epoch. + + This value is updated each time :attr:`.Queue.definition` is accessed.""" + + ts_usec: int + """The current Redis server timestamp in microseconds since epoch. + + This value is updated each time :attr:`.Queue.definition` is accessed.""" + + +@dataclass +class Message: + """ + Receive the next message from the queue. + """ + + message_id: str + "The internal message id." + + message: str + "The message's contents." + + sent: int + "Timestamp of when this message was sent/created." + + fr: int + "Timestamp of when this message was first received." + + rc: int + "Number of times this message was received." diff --git a/psmq/connection.py b/psmq/connection.py new file mode 100644 index 0000000..a8a5ab0 --- /dev/null +++ b/psmq/connection.py @@ -0,0 +1,137 @@ +"""Connection managers for queues.""" +import sqlite3 +from pathlib import Path +from sqlite3 import Connection +from typing import Optional + +from redislite import Redis + +from psmq.exceptions import QueueDoesNotExist + +from itertools import islice + + +def list_to_dict(l: list) -> dict: + """Convert a list of alternating key-value pairs into a dict.""" + + def batched(iterable, n) -> tuple: + it = iter(iterable) + while batch := tuple(islice(it, n)): + yield batch + + r_str = [] + for item in l: + r_str.append(item.decode("utf8") if isinstance(item, bytes) else item) + return dict(batched(r_str, 2)) + + +def setup_queue_db(): + """ + Idempotently setup the database for a queue. + + create 3 tables: queue, messages, and config + + Args: + name: The name of the queue + location: The path to the enclosing directory for the queue db file + visibility_timeout: The visibility timeout + delay: The delay + max_size: The maximum size of a message + retries: The number of retries + """ + pass + + +def get_db_connection(name: str, location: Optional[Path] = None, raise_if_missing: bool = False) -> Connection: + """ + Create a connection to a persistent or ephemeral database. + + Args: + name: The name of the queue + location: The path to the enclosing directory for the queue db file. ``None`` if ephemeral db. + raise_if_missing: If ``True``, do not attempt to create the db and raise an error + + Raises: + QueueDoesNotExist: If ``raise_if_missing`` is ``True`` and the DB is missing. + + Returns: + The sqlite connection + """ + if location is None and raise_if_missing: + # Ephemeral DBs are always missing + raise QueueDoesNotExist(name) + elif location and raise_if_missing: + queue_file = location / f"{name}.db" + if not queue_file.exists(): + raise QueueDoesNotExist(name) + db_name = location / f"{name}.db" if location else ":memory:" + return sqlite3.connect(db_name) + + +class RedisLiteConnection: + """Queues backed by RedisLite.""" + + def __init__(self, db_location: Path): + self.psmq_library_file = Path(__file__).parent.joinpath("psmq_library.lua").read_text() + self._db_location = db_location + self.queue_set_key = "QUEUES" + self._connection = Redis(self._db_location / "queue.rdb") + self._connection.function_load(self.psmq_library_file) + + def list_queues(self) -> set: + """List all queues.""" + return self._connection.smembers(self.queue_set_key) + + def create_queue(self, name: str, vt: int = 60, delay: int = 0, max_size: int = 65565) -> bool: + """ + Create a queue. + + Args: + name: the name of the queue + vt: the visibility timeout + delay: the initial delay + max_size: the maximum size of a message + + Returns: + True if the queue was created, False if it already exists + """ + result = self._connection.fcall("create_queue", 4, name, vt, delay, max_size) + return bool(result) + + def get_queue_info(self, queue_name: str) -> dict: + """Get the config for a queue.""" + from .queue import QueueConfiguration, QueueMetadata + + results = self._connection.fcall("get_queue_info", 1, queue_name) + config = QueueConfiguration(int(results[0]), int(results[1]), int(results[2])) + metadata = QueueMetadata( + **{ + "created": int(results[3]), + "modified": int(results[4]), + "totalrecv": int(results[5]), + "totalsent": int(results[6]), + "msgs": int(results[7]), + "hiddenmsgs": int(results[8]), + } + ) + return {"config": config, "metadata": metadata} + + def set_queue_visibility_timeout(self, queue_name: str, vt: int) -> None: + """Set the visibility timeout for a queue.""" + self._connection.fcall("set_queue_viz_timeout", 2, queue_name, vt) + + def set_queue_initial_delay(self, queue_name: str, delay: int) -> None: + """Set the initial delay for a queue.""" + self._connection.fcall("set_queue_initial_delay", 2, queue_name, delay) + + def set_queue_max_size(self, queue_name: str, max_size: int) -> None: + """Set the max size for a queue.""" + self._connection.fcall("set_queue_max_size", 2, queue_name, max_size) + + def push_message(self, queue_name: str, message: bytes, delay: int = 0, ttl: int = 0) -> str: + """Send a message to a queue.""" + return self._connection.fcall("push_message", 3, queue_name, message, delay).decode("utf8") + + def get_message(self, queue_name: str): + """Get a message from a queue.""" + result = self._connection.fcall("get_message", 1, queue_name) diff --git a/psmq/exceptions.py b/psmq/exceptions.py new file mode 100644 index 0000000..f964258 --- /dev/null +++ b/psmq/exceptions.py @@ -0,0 +1,88 @@ +"""Exceptions raised.""" + + +class PSMQError(Exception): + """ + Base class for all PSMQ exceptions. + """ + + pass + + +class QueueAlreadyExists(PSMQError): + """ + A queue already exists with that name. + + Raised when attempting to create a :class:`~psmq.queue.Queue` with the same + name as an existing :class:`~psmq.queue.Queue` on the Redis server + """ + + def __init__(self, qname: str): + super().__init__(f"Queue '{qname}' already exists.") + + +class QueueDoesNotExist(PSMQError): + """ + Raised when accessing a :class:`~psmq.queue.Queue` that does not exist. + """ + + def __init__(self, qname: str): + super().__init__(f"Queue '{qname}' does not exist.") + + +class NoMessageInQueue(PSMQError): + """ + Raised when a call does not revceive a message. + + """ + + def __init__(self, qname: str): + super().__init__(f"There are no messages in queue '{qname}'.") + + +class InvalidQueueName(PSMQError): + """ + The base exception for errors relating to :class:`~psmq.queue.Queue` names. + """ + + pass + + +class InvalidCharacter(InvalidQueueName): + """ + Raised when creating a :class:`~psmq.queue.Queue` with invalid characters in its name. + + The valid characters are defined in :attr:`psmq.validation.QNAME_INVALID_CHARS_RE` + """ + + def __init__(self, character: str): + super().__init__(f"The '{character}' character is not allowed in queue names.") + + +class QueueNameTooLong(InvalidQueueName): + """ + Raised when creating a :class:`~psmq.queue.Queue` with too many characters in its name. + + The maximum length is defined in :attr:`psmq.validation.QNAME_MAX_LENGTH` + """ + + def __init__(self, max_length: int): + super().__init__(f"The queue name must be shorter than {max_length} characters.") + + +class ValueTooLow(ValueError): + """ + Raised when a numerical value is lower than a specified minumum. + """ + + def __init__(self, min_val): + super().__init__(f"The value must not be lower than {min_val}.") + + +class ValueTooHigh(ValueError): + """ + Raised when a numerical value is greater than a specified maximum. + """ + + def __init__(self, max_val): + super().__init__(f"The value must not be higher than {max_val}.") diff --git a/psmq/manager.py b/psmq/manager.py new file mode 100644 index 0000000..c9fb328 --- /dev/null +++ b/psmq/manager.py @@ -0,0 +1,61 @@ +"""Queue mangement.""" +import logging +from pathlib import Path +from typing import Optional + +from .connection import RedisLiteConnection +from .exceptions import QueueDoesNotExist +from .queue import Queue, QueueConfiguration +from .serialize import SerializerFunc, DeserializerFunc, default_serializer, default_deserializer + +logger = logging.getLogger(__name__) + + +class QueueManager: + """A manager for queues.""" + + def __init__(self, db_dir: Path): + """ + Args: + db_dir: The path to the directory containing the queue db files + """ + self.db_dir = db_dir + self.redis_queues = RedisLiteConnection(db_location=db_dir) + + def get_queue( + self, + name: str, + default_config: Optional[QueueConfiguration] = None, + serializer: Optional[SerializerFunc] = None, + deserializer: Optional[DeserializerFunc] = None, + ) -> Queue: + """ + Get or create a queue by name. + + Args: + name: The name of the queue + default_config: The default configuration to use if the queue does not exist + serializer: Optional method to serialize messages + deserializer: Optional method to deserialize messages + + Raises: + InvalidQueueName: If ``name`` is invalid + + Returns: + The queue + """ + self.redis_queues.create_queue( + name, default_config.visibility_timeout, default_config.initial_delay, default_config.max_size + ) + + if not serializer: + serializer = default_serializer + if not deserializer: + deserializer = default_deserializer + + return Queue( + connection=self.redis_queues, + name=name, + serializer=serializer, + deserializer=deserializer, + ) diff --git a/psmq/message.py b/psmq/message.py new file mode 100644 index 0000000..6085e6f --- /dev/null +++ b/psmq/message.py @@ -0,0 +1,45 @@ +"""Messages for PSMQ.""" +from dataclasses import dataclass +from typing import Any, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from .queue import Queue + + +@dataclass +class Message: + """A message received from a Queue.""" + + queue: "Queue" + "The queue this message was received from." + + message_id: str + "The internal message id." + + data: Any + "The message's contents." + + sent: int + "Timestamp of when this message was sent/created." + + first_retrieved: int + "Timestamp of when this message was first received." + + retrieval_count: int + "The number of times this message has been retrieved." + + ttl: Optional[int] = None + "The message's time-to-live in microseconds." + + def delete(self) -> None: + """ + Delete this message from the queue. + """ + self.queue.delete(self.message_id) + + @property + def expires(self) -> Optional[int]: + """ + Timestamp of when this message will expire. This is calculated from the message's `ttl`. + """ + return None if self.ttl is None else self.sent + self.ttl diff --git a/psmq/psmq_library.lua b/psmq/psmq_library.lua new file mode 100644 index 0000000..4a018bd --- /dev/null +++ b/psmq/psmq_library.lua @@ -0,0 +1,362 @@ +#!lua name=psmq +-- This is the library of functions that are used by the psmq scripts. + +-- "QUEUES" (queue_set_key) is a set of all the queues. +-- The global SET, which stores all used queue names. When a queue is created the name is added to this set as a +-- member. When a queue is deleted the member will be removed from this set. + +-- :Q (queue_info_key) is a hash of queue info. +-- This hash keeps all data for a single queue. +-- +-- FIELDS +-- +-- {msgid}: The message +-- {msgid}:rc: The receive counter for a single message. Will be incremented on each receive. +-- {msgid}:fr: The timestamp when this message was received for the first time. Will be created on the first receive. +-- totalsent: The total number of messages sent to this queue. +-- totalrecv: The total number of messages received from this queue. +-- vt: The visibility timeout for this queue. +-- initial_delay: The initial delay for messages in this queue. +-- maxsize: The maximum size of this queue. +-- created: The timestamp when this queue was created. +-- modified: The timestamp when this queue was last modified. + +-- (queue_key) A sorted set (ZSET) of all messages of a single queue +-- +-- SCORE Next possible timestamp (epoch time in ms) this message can be received. +-- +-- MEMBER The `{msgid}' + + + +-- +-- Utility functions +-- + +local alphabet = { + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, + "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", + "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z" +} + +local queue_defaults = { viz_timeout = 60, initial_delay = 0, max_size = 65565, retries = 5 } + + +-- convert the number to base36 +local function b36encode(keys) + local num = tonumber(unpack(keys)) + + -- Check for number + if type(num) == "nil" then + return redis.error_reply("ERR Number must be a number, not a string. Silly user.") + end + + -- We can only accept positive numbers + if num < 0 then + return redis.error_reply("ERR Number must be a positive value.") + end + + -- Special case for numbers less than 36 + if num < 36 then + return alphabet[num + 1] + end + + -- Process large numbers now + local result = "" + while num ~= 0 do + local i = num % 36 + result = alphabet[i + 1] .. result + num = math.floor(num / 36) + end + return result +end + +redis.register_function("b36encode", b36encode) + + +-- convert the base36 string to a number +local function b36decode(keys) + local b36 = keys[1] + return tonumber(b36, 36) +end + +redis.register_function("b36decode", b36decode) + + +-- get time in microseconds and milliseconds +local function get_time() + local time = redis.call("TIME") + local timestamp_microsec = time[1] * 1000000 + time[2] + local timestamp_millisec = math.floor(timestamp_microsec / 1000) + return { microsec = timestamp_microsec, millisec = timestamp_millisec } +end + +redis.register_function("get_time", get_time) + + +-- make a message id +local function make_message_id(keys) + local timestamp_microsec = keys[1] + local message_id = { b36encode({ timestamp_microsec }) } + for i = 2, 23 do + message_id[i] = alphabet[math.random(1, 36)] + end + return table.concat(message_id) +end + +redis.register_function("make_message_id", make_message_id) + +-- +-- Queue functions +-- + +-- Create a queue. +-- Returns 1 if the queue was created, 0 if it already existed. +local function create_queue(keys) + local queues_set_key = "QUEUES" + local queue_name = keys[1] + local queue_info_key = queue_name .. ":Q" + local viz_timeout = keys[2] or queue_defaults.viz_timeout + local initial_delay = keys[3] or queue_defaults.initial_delay + local max_size = keys[4] or queue_defaults.max_size + assert(type(viz_timeout) ~= "nil", "Visibility timeout is nil") + assert(type(initial_delay) ~= "nil", "initial delay is nil") + assert(type(max_size) ~= "nil", "max size is nil") + + -- Create the queue. + local already_existed = redis.call("SADD", queues_set_key, queue_name) + if already_existed == 0 then + return 0 + end + + -- Create the queue info hash since it doesn't exist. + local time = get_time() + local queue_info = { + "vt", viz_timeout, + "delay", initial_delay, + "maxsize", max_size, + "created", time.millisec, + "modified", time.millisec, + "totalrecv", 0, + "totalsent", 0, + } + redis.call("HMSET", queue_info_key, unpack(queue_info)) + return 1 +end + +redis.register_function("create_queue", create_queue) + +-- Get queue info. +local function get_queue_info(keys) + local queue_name = keys[1] + + create_queue({ queue_name }) + + local queue_info_key = queue_name .. ":Q" + local queue_info_keys = { + "vt", + "delay", + "maxsize", + "created", + "modified", + "totalrecv", + "totalsent" + } + + local raw_info = redis.call("HMGET", queue_info_key, unpack(queue_info_keys)) + local queue_info = {} + for i, v in ipairs(raw_info) do + queue_info[queue_info_keys[i]] = v + end + + local num_msgs = redis.call("ZCARD", queue_name) + local time_info = get_time() + local num_hidden = redis.call("ZCOUNT", queue_name, time_info.millisec, "+inf") + + queue_info.num_msgs = num_msgs + queue_info.num_hidden = num_hidden + + return queue_info +end + +local function get_queue_info_for_redis(keys) + local queue_info = get_queue_info(keys) + local unp = {} + for k, v in pairs(queue_info) do + unp[#unp + 1] = k + unp[#unp + 1] = v + end + return unp +end + +redis.register_function("get_queue_info", get_queue_info_for_redis) + +-- Set queue visibility timeout +local function set_queue_viz_timeout(keys) + local queue_name = keys[1] + local viz_timeout = keys[2] + + create_queue({ queue_name }) + + local queue_info_key = queue_name .. ":Q" + redis.call("HSET", queue_info_key, "vt", viz_timeout) +end + +redis.register_function("set_queue_viz_timeout", set_queue_viz_timeout) + + +-- Set queue initial delay +local function set_queue_initial_delay(keys) + local queue_name = keys[1] + local initial_delay = keys[2] + + create_queue({ queue_name }) + + local queue_info_key = queue_name .. ":Q" + redis.call("HSET", queue_info_key, "delay", initial_delay) +end + +redis.register_function("set_queue_initial_delay", set_queue_initial_delay) + +-- Set queue max size +local function set_queue_max_size(keys) + local queue_name = keys[1] + local max_size = keys[2] + + create_queue({ queue_name }) + + local queue_info_key = queue_name .. ":Q" + redis.call("HSET", queue_info_key, "maxsize", max_size) +end + +redis.register_function("set_queue_max_size", set_queue_max_size) + +-- +-- Message functions +-- + +-- Send a message to a queue. +local function push_message(keys) + local queue_key = keys[1] + local message = keys[2] + local delay = keys[3] + + -- Create the queue in case it doesn't exist. + create_queue({ queue_key }) + local queue_info = get_queue_info({ queue_key }) + assert(type(queue_info.delay) ~= "nil", "Queue delay is nil") + local queue_info_key = queue_key .. ":Q" + local time = get_time() + local message_id = make_message_id({ time.microsec }) + local message_score = time.millisec + ((delay or queue_info.delay) * 1000) + + -- Add the message to the queue. + redis.call("ZADD", queue_key, message_score, message_id) + + -- Add the message to the queue info hash. + redis.call("HSET", queue_info_key, message_id, message) + + -- Increase the total message count for the queue. + redis.call("HINCRBY", queue_info_key, "totalsent", 1) + + return message_id +end + +redis.register_function("push_message", push_message) + + +-- Get the next message from the queue. +local function get_message(keys) + local queue_key = keys[1] + local time = get_time() + local queue_info = get_queue_info({ queue_key }) + local viz_timeout = (keys[2] or queue_info.vt) * 1000 + local queue_info_key = queue_key .. ":Q" + local msg = redis.call("ZRANGE", queue_key, "-inf", time.millisec, "BYSCORE", "LIMIT", "0", "1") + + if #msg == 0 then + return {} + end + + local message_id = msg[1] + local message_rc_key = message_id .. ":rc" -- rc = receive count + local message_fr_key = message_id .. ":fr" -- fr = first received + + -- Increase the score of the message by viz_timeout. + + redis.call("ZADD", queue_key, "INCR", viz_timeout, msg[1]) + + -- increment the total received count for the queue + redis.call("HINCRBY", queue_info_key, "totalrecv", 1) + + -- get the message and increment the receive count + local msg_body = redis.call("HGET", queue_info_key, message_id) + local rc = redis.call("HINCRBY", queue_info_key, message_rc_key, 1) + + local output = { msg_id = message_id, msg_body = msg_body, rc = rc } + + -- if this is the first time receiving the message, record the timestamp as the first received + if rc == 1 then + redis.call("HSET", queue_info_key, message_fr_key, time.millisec) + output["fr"] = time.millisec + else + local fr = redis.call("HGET", queue_info_key, message_fr_key) + output["fr"] = fr + end + + return output +end + +local function get_message_for_redis(keys) + local queue_info = get_message(keys) + local unp = {} + for k, v in pairs(queue_info) do + unp[#unp + 1] = k + unp[#unp + 1] = v + end + return unp +end + +redis.register_function("get_message", get_message_for_redis) + + +-- Delete a message from the queue. +local function delete_message(keys) + local queue_key = keys[1] + local message_id = keys[2] + local queue_info_key = queue_key .. ":Q" + local message_rc_key = message_id .. ":rc" -- rc = receive count + local message_fr_key = message_id .. ":fr" -- fr = first received + + -- remove the message from the queue + redis.call("ZREM", queue_key, message_id) + redis.call("HDEL", queue_info_key, message_id, message_rc_key, message_fr_key) +end + +redis.register_function("delete_message", delete_message) + + +-- Pop a message from the queue. +local function pop_message(keys) + local queue_key = keys[1] + local time = get_time() + + local message = get_message({ queue_key, time.millisec, 0 }) + + local message_id = message.msg_id + delete_message({ queue_key, message_id }) + + return message +end + +local function pop_message_for_redis(keys) + local queue_info = pop_message(keys) + local unp = {} + for k, v in pairs(queue_info) do + unp[#unp + 1] = k + unp[#unp + 1] = v + end + return unp +end + +redis.register_function("pop_message", pop_message_for_redis) diff --git a/psmq/queue.py b/psmq/queue.py new file mode 100644 index 0000000..db939b8 --- /dev/null +++ b/psmq/queue.py @@ -0,0 +1,203 @@ +"""Queues and message handling.""" +from dataclasses import dataclass +from typing import Optional, Any, List, TYPE_CHECKING + +from .connection import RedisLiteConnection +from .message import Message + +if TYPE_CHECKING: + from .manager import SerializerFunc, DeserializerFunc + + +@dataclass +class QueueConfiguration: + """Configuration for a queue.""" + + visibility_timeout: int = 60 + """The length of time, in seconds, that a message received from a queue will + be invisible to other receiving components when they ask to receive messages.""" + + initial_delay: int = 0 + "The time in seconds that the delivery of all new messages in the queue will be delayed" + + max_size: int = 65565 + "The maximum size of a message in bytes" + + retries: int = 5 + "The number of times to retry a message before giving up." + + ttl: Optional[int] = None + "The optional time to live for a message in milliseconds." + + +@dataclass +class QueueMetadata: + """Metadata for a queue.""" + + totalrecv: int + "Total number of messages received from (taken off of) the queue" + + totalsent: int + "Total number of messages sent to this queue" + + created: int + "Timestamp (epoch in seconds) when the queue was created" + + modified: int + "Timestamp (epoch in seconds) when the queue was last modified with :meth:`.Queue.set_attributes`" + + msgs: int + "Current number of messages in the queue" + + hiddenmsgs: int + """ + Current number of hidden / not visible messages. + + A message typically is hidden while "in flight". This number can be a good measurement for + how many messages are currently being processed. + """ + + +class Queue: + """ + Representation of a specific Queue in Redis. + """ + + def __init__( + self, + connection: RedisLiteConnection, + name: str, + serializer: Optional["SerializerFunc"] = None, + deserializer: Optional["DeserializerFunc"] = None, + ): + """ + Construct a connection to a Queue in Redis. + + Args: + connection: The root connection object + name: The name of the queue + serializer: Optional method to serialize messages + deserializer: Optional method to deserialize messages + """ + self.connection = connection + self.name = name + q_info = self.connection.get_queue_info(name) + self._configuration = q_info["config"] + self._metadata = q_info["metadata"] + self.serializer = serializer + self.deserializer = deserializer + + def push(self, message: Any, delay: Optional[int] = None, ttl: Optional[int] = None) -> str: + """ + Send a message to the queue. + + Args: + message: The message to send + delay: The time in seconds that + the delivery of the message will be delayed. Allowed values: 0-9999999 + (around 115 days) + ttl: The time to live for the message in milliseconds. Allowed values: 0-9999999 + + Returns: + The message id + """ + pass + + def push_many(self, messages: List[Any], delay: Optional[int] = None, ttl: Optional[int] = None) -> list: + """ + Send multiple messages, all pipelined together. + + Args: + messages: The messages to send + delay: The time in seconds that + the delivery of the message will be delayed. Allowed values: 0-9999999 + (around 115 days) + ttl: The time to live for the message in milliseconds. Allowed values: 0-9999999 + + Returns: + All message ids + """ + pass + + def delete(self, msg_id: str) -> None: + """ + Delete a message if it exists. + + Args: + msg_id: The ID of the message to delete + """ + pass + + def get(self, visibility_timeout: Optional[int] = None, raise_on_empty: bool = False) -> Optional[Message]: + """ + Receive a message. + + Args: + visibility_timeout: optional (Default: queue settings) The length of time, in seconds, + that the received message will be invisible to others. Allowed values: + 0-9999999 (around 115 days) + raise_on_empty: optional (Default: False) Raise an exception if there is no message. + + Raises: + NoMessageInQueue: If the queue was empty and ``raise_on_empty`` is ``True`` + + Returns: + The message if available, or ``None`` + """ + pass + + def pop(self, raise_on_empty: bool = False) -> Optional[Message]: + """ + Receive a message and delete it immediately. + + Args: + raise_on_empty: optional (Default: False) Raise an exception if there is no message. + + Raises: + NoMessageInQueue: If the queue was empty and ``raise_on_empty`` is ``True`` + + Returns: + The message if available, or ``None`` + """ + msg = self.get(raise_on_empty=raise_on_empty) + if msg: + self.delete(msg.message_id) + return msg + + +def handle_message_result(result: tuple) -> Message: + """ + Handle a message received from the queue and format it properly. + + Args: + result: The raw message data received from Redis + + Returns: + A populated Message object + """ + message_id, message, rc, fr = result + message = decode_message(message) + sent = base36decode(message_id[:10]) + return Message(message_id, message, sent, fr, rc) + + +def make_message_id(usec: int) -> str: + """ + Need to create a unique id based on the redis timestamp and a random number. + + The first part is the Redis time base-36 encoded which lets redis order the messages correctly + even when they are in the same millisecond. + + Args: + usec: The Redis timestamp in microseconds as an integer + + Returns: + A time-sortable, unique message id + """ + import random + + charset = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789" + + msg_id = [random.choice(charset) for _ in range(22)] # nosec + msg_id.insert(0, base36encode(usec)) + return "".join(msg_id) diff --git a/psmq/serialize.py b/psmq/serialize.py new file mode 100644 index 0000000..20ffe0e --- /dev/null +++ b/psmq/serialize.py @@ -0,0 +1,16 @@ +"""Serialize and deserialize messages.""" +from typing import Callable, Any +import json + +SerializerFunc = Callable[[Any], bytes] +DeserializerFunc = Callable[[bytes], Any] + + +def default_serializer(message: Any) -> bytes: + """Serialize a message using JSON.""" + return json.dumps(message).encode() + + +def default_deserializer(message: bytes) -> Any: + """Deserialize a message using JSON.""" + return json.loads(message.decode()) diff --git a/psmq/validation.py b/psmq/validation.py new file mode 100644 index 0000000..30f6bdc --- /dev/null +++ b/psmq/validation.py @@ -0,0 +1,134 @@ +"""Validation functions.""" +import re +from typing import Optional + +from .exceptions import ( + InvalidCharacter, + QueueNameTooLong, + ValueTooHigh, + ValueTooLow, + InvalidQueueName, +) + +#: :obj:`re.Pattern`: A compiled regular expression that detects all invalid characters +QNAME_INVALID_CHARS_RE = re.compile(r"[^A-Za-z0-9._-]") + +#: int: The maximum number of characters allowed in a :class:`~rsmq.queue.Queue` name +QNAME_MAX_LENGTH = 160 + + +def validate_queue_name(qname: str, raise_on_error: bool = False) -> bool: + """ + Verify that the passed in queue name is valid. + + Args: + qname: The name of the queue to validate + raise_on_error: If ``False``, return a ``bool`` instead of raising exceptions on errors + + Raises: + InvalidQueueName: If ``qname`` is missing and ``raise_on_error`` is ``True`` + QueueNameTooLong: If ``qname`` is longer than :data:`~.QNAME_MAX_LENGTH` and ``raise_on_error`` is ``True`` + InvalidCharacter: If ``qname`` contains characters found in :data:`~.QNAME_INVALID_CHARS_RE` + and ``raise_on_error`` is ``True`` + + Returns: + ``True`` if valid, ``False`` otherwise if ``raise_on_error`` is ``False`` + """ + if not qname: + if raise_on_error: + return False + raise InvalidQueueName("Queue name cannot be empty.") + + if len(qname) > QNAME_MAX_LENGTH: + if raise_on_error: + return False + raise QueueNameTooLong(QNAME_MAX_LENGTH) + + invalid_chars = QNAME_INVALID_CHARS_RE.search(qname) + if invalid_chars: + if raise_on_error: + return False + raise InvalidCharacter(qname[invalid_chars.span()[0]]) + return True + + +def validate_int( + value, + min_value: Optional[int] = None, + max_value: Optional[int] = None, + quiet: bool = False, +) -> bool: + """ + Validate value is integer and between min and max values (if specified). + + Raises: + TypeError: If ``value`` is not an ``int`` + ValueTooLow: If ``value`` is lower than a specified ``min_value`` + ValueTooHigh: If ``value`` is greater than a specified ``max_value`` + + Args: + value: The value to validate + min_value: If specified, the integer must be greater than or equal to this value + max_value: If specified, the integer must be less than or equal to this value + quiet: If True, return a ``bool`` instead of raising exceptions on errors + + Returns: + ``True`` if valid, ``False`` otherwise if ``quiet`` is ``True`` + """ + if value is None or not isinstance(value, int): + if quiet: + return False + raise TypeError("An integer value is required.") + + if min_value is not None and value < min_value: + if quiet: + return False + raise ValueTooLow(min_value) + + if max_value is not None and value > max_value: + if quiet: + return False + raise ValueTooHigh(max_value) + + return True + + +def validate_float( + value, + min_value: Optional[float] = None, + max_value: Optional[float] = None, + quiet: bool = False, +) -> bool: + """ + Validate value is integer and between min and max values (if specified). + + Raises: + TypeError: If ``value`` is not an ``int`` + ValueTooLow: If ``value`` is lower than a specified ``min_value`` + ValueTooHigh: If ``value`` is greater than a specified ``max_value`` + + Args: + value: The value to validate + min_value: If specified, the float must be greater than or equal to this value + max_value: If specified, the float must be less than or equal to this value + quiet: If True, return a ``bool`` instead of raising exceptions on errors + + Returns: + ``True`` if valid, ``False`` otherwise if ``quiet`` is ``True`` + """ + if value is None or not isinstance(value, (int, float)): + if quiet: + return False + raise TypeError("An integer or float value is required.") + + if min_value is not None and value < min_value: + if quiet: + return False + raise ValueTooLow(min_value) + + if max_value is not None and value > max_value: + if quiet: + return False + raise ValueTooHigh(max_value) + + return True diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..7db6056 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,218 @@ +[build-system] + +requires = ["setuptools", ] +build-backend = "setuptools.build_meta" + +[project] +name = "psmq" +description = "Python Simple Message Queue" +authors = [ + {name = "Corey Oordt", email = "coreyoordt@gmail.com" } +] +classifiers =[ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "Natural Language :: English", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", +] +requires-python = ">=3.9" +readme = "README.md" +keywords = ["psmq", ] +dynamic = ["version"] +dependencies = [ + "redislite", +] + +#[project.scripts] +#psmq = "psmq.cli:cli" + +[project.urls] +homepage = "https://github.com/callowayproject/psmq" +repository = "https://github.com/callowayproject/psmq" +documentation = "https://callowayproject.github.io/psmq" + +[project.optional-dependencies] +dev = [ + "bump-my-version", + "git-fame", + "generate-changelog", +] +test = [ + "coverage>=6.1.2", + "pre-commit>=2.15.0", + "pytest-cov>=3.0.0", + "pytest>=6.0.0", + "u-msgpack-python", +] +docs =[ + "black>=23.3.0", + "mkdocs>=1.4.3", + "mkdocs-gen-files>=0.5.0", + "mkdocs-literate-nav>=0.6.0", + "mkdocs-material>=9.1.0", + "mkdocs-markdownextradata-plugin>=0.2.5", + "mkdocs-section-index>=0.3.5", + "mkdocstrings>=0.21.2", + "mkdocstrings-python>=1.0.0", +] + +[tool.setuptools.dynamic] +version = { attr = "psmq.__version__" } + +[tool.setuptools] +zip-safe = false +include-package-data = true + +[tool.setuptools.packages.find] +exclude = ["example*", "tests*", "docs*", "build"] + +[tool.bumpversion] +current_version = "0.1.0" +commit = true +commit_args = "--no-verify" +tag = true +tag_name = "{new_version}" +allow_dirty = true +parse = "(?P\\d+)\\.(?P\\d+)\\.(?P\\d+)(\\.(?Pdev)\\d+\\+[-_a-zA-Z0-9]+)?" +serialize = [ + "{major}.{minor}.{patch}.{dev}{distance_to_latest_tag}+{short_branch_name}", + "{major}.{minor}.{patch}" +] +message = "Version updated from {current_version} to {new_version}" + +[tool.bumpversion.parts.dev] +values = ["release", "dev"] + +[[tool.bumpversion.files]] +filename = "psmq/__init__.py" + +[[tool.bumpversion.files]] +filename = "CHANGELOG.md" +search = "Unreleased" + +[[tool.bumpversion.files]] +filename = "CHANGELOG.md" +search = "{current_version}...HEAD" +replace = "{current_version}...{new_version}" + + +[tool.coverage.run] +branch = true +omit = ["**/test_*.py"] + + +[tool.coverage.report] +omit = [ + "*site-packages*", + "*tests*", + "*.tox*", +] +show_missing = true +exclude_lines = [ + "raise NotImplementedError", + "pragma: no-coverage", +] + +[tool.pytest.ini_options] +norecursedirs = [ + ".*", + "build", + "dist", + "{arch}", + "*.egg", + "venv", + "requirements*", + "lib", +] +python_files = "test*.py" + +[tool.interrogate] +ignore-init-method = true +ignore-init-module = false +ignore-magic = true +ignore-semiprivate = false +ignore-private = false +ignore-property-decorators = false +ignore-module = false +ignore-nested-functions = true +ignore-nested-classes = true +ignore-setters = false +fail-under = 95 +exclude = ["setup.py", "docs", "build"] +ignore-regex = ["^get$", "^mock_.*", ".*BaseClass.*"] +verbose = 0 +quiet = false +whitelist-regex = [] +color = true + +[tool.black] +line-length = 119 + +[tool.ruff] +# Enable pycodestyle (`E`) and Pyflakes (`F`) codes by default. +# "UP" "TRY" "PLR" +select = ["E", "W", "F", "I", "N", "B", "BLE", "C", "D", "E", "F", "I", "N", "S", "T", "W", "RUF", "NPY", "PD", "PGH", "ANN", "C90", "PLC", "PLE", "PLW", "TCH"] +ignore = [ + "ANN002", "ANN003", "ANN101", "ANN102", "ANN204", "ANN401", + "S101", "S104", + "D106", "D107", "D200", "D212", +] + +# Allow autofix for all enabled rules (when `--fix`) is provided. +fixable = ["E", "W", "F", "I", "N", "B", "BLE", "C", "D", "E", "F", "I", "N", "S", "T", "W", "RUF", "NPY", "PD", "PGH", "ANN", "C90", "PL", "PLC", "PLE", "PLW", "TCH"] +unfixable = [] + +# Exclude a variety of commonly ignored directories. +exclude = [ + ".bzr", + ".direnv", + ".eggs", + ".git", + ".hg", + ".mypy_cache", + ".nox", + ".pants.d", + ".pytype", + ".ruff_cache", + ".svn", + ".tox", + ".venv", + "__pypackages__", + "_build", + "buck-out", + "build", + "dist", + "node_modules", + "venv", +] + +# Same as Black. +line-length = 119 + +# Allow unused variables when underscore-prefixed. +dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" + +typing-modules = ["typing", "types", "typing_extensions", "mypy", "mypy_extensions"] + +[tool.ruff.per-file-ignores] +"tests/*"=["S101", "PLR0913", "PLR0915", "PGH003", "ANN001", "ANN202", "ANN201", "PLR0912", "TRY301", "PLW0603", "PLR2004", "ANN101", "S106", "TRY201", "ANN003", "ANN002", "S105", "TRY003", "D103", "D104"] + +[tool.ruff.mccabe] +# Unlike Flake8, default to a complexity level of 10. +max-complexity = 10 + +[tool.ruff.isort] +order-by-type = true + +[tool.ruff.pydocstyle] +convention = "google" + +[tool.pydoclint] +style = "google" +exclude = '\.git|tests' +require-return-section-when-returning-none = false +arg-type-hints-in-docstring = false +check-return-types = false +quiet = true diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..3409006 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""psmq tests.""" diff --git a/tests/test_psmq.py b/tests/test_psmq.py new file mode 100644 index 0000000..b7065a9 --- /dev/null +++ b/tests/test_psmq.py @@ -0,0 +1,4 @@ +"""Tests for `psmq` package.""" + +import pytest +from psmq import psmq diff --git a/tests/test_queue_lua.py b/tests/test_queue_lua.py new file mode 100644 index 0000000..fa1fbed --- /dev/null +++ b/tests/test_queue_lua.py @@ -0,0 +1,274 @@ +"""Test the queue functions in lua.""" +from pathlib import Path +import datetime +import pytest +import redis + +from psmq.connection import RedisLiteConnection, list_to_dict + + +@pytest.fixture +def conn(tmp_path: Path): + """Get a RedisLite connection.""" + return RedisLiteConnection(tmp_path) + + +def test_b36_encode(conn: RedisLiteConnection): + """The b36encode function should properly encode numbers to base36.""" + r = conn._connection.fcall("b36encode", 1, "35") + assert r.decode("utf8") == "Z" + r = conn._connection.fcall("b36encode", 1, "36") + assert r.decode("utf8") == "10" + + with pytest.raises(redis.ResponseError): + conn._connection.fcall("b36encode", 1, "foo") + + with pytest.raises(redis.ResponseError): + conn._connection.fcall("b36encode", 1, -1) + + +def test_b36_decode(conn: RedisLiteConnection): + """The b36decode function should properly decode base36-encoded strings.""" + r = conn._connection.fcall("b36decode", 1, "Z") + assert r == 35 + + r = conn._connection.fcall("b36decode", 1, "10") + assert r == 36 + + r = conn._connection.fcall("b36decode", 1, 10) + assert r == 36 + + +def test_make_message_id(conn: RedisLiteConnection): + """A message id should be sortable and unique.""" + ts_usec = int(datetime.datetime.now().timestamp() * 1_000_000) + r = conn._connection.fcall("make_message_id", 1, ts_usec).decode("utf8") + assert len(r) > 22 + ts_encoding = r[:-22] + assert int(ts_encoding, 36) == ts_usec + + +def test_create_queue(conn: RedisLiteConnection): + """A queue is created.""" + r = conn._connection.fcall("create_queue", 4, "test_queue", 10, 0, 0) + assert r == 1 + assert conn._connection.sismember("QUEUES", "test_queue") + r = conn._connection.fcall("create_queue", 4, "test_queue", 10, 0, 0) + assert r == 0 + + +def test_get_queue_info(conn: RedisLiteConnection): + """We get information about a queue.""" + ts = int(datetime.datetime.now().timestamp()) + r = list_to_dict(conn._connection.fcall("get_queue_info", 1, "test_queue")) + + assert int(r["vt"]) == 60 # vt + assert int(r["delay"]) == 0 # delay + assert int(r["maxsize"]) == 65565 # maxsize + assert int(r["created"]) >= ts # created + assert int(r["modified"]) >= ts # modified + assert int(r["totalrecv"]) == 0 # totalrecv + assert int(r["totalsent"]) == 0 # totalsent + assert int(r["num_msgs"]) == 0 # nummsgs + assert int(r["num_hidden"]) == 0 # hiddenmsgs + # assert int(r[7]) >= ts # ts + # assert int(r[8]) >= ts * 1_000 # ts_msec + assert conn._connection.sismember("QUEUES", "test_queue") + + r = conn._connection.fcall("create_queue", 4, "test_queue2", 10, 10, 10) + assert r == 1 + r = list_to_dict(conn._connection.fcall("get_queue_info", 1, "test_queue2")) + assert int(r["vt"]) == 10 # vt + assert int(r["delay"]) == 10 # delay + assert int(r["maxsize"]) == 10 # maxsize + + +def test_set_queue_vt(conn: RedisLiteConnection): + """Can set the visibility timeout.""" + r = conn._connection.fcall("create_queue", 4, "test_queue", 10, 10, 10) + assert r == 1 + conn._connection.fcall("set_queue_viz_timeout", 2, "test_queue", 20) + r = list_to_dict(conn._connection.fcall("get_queue_info", 1, "test_queue")) + assert int(r["vt"]) == 20 # vt + + +def test_set_queue_initial_delay(conn: RedisLiteConnection): + """Can set the initial delay.""" + r = conn._connection.fcall("create_queue", 4, "test_queue", 10, 10, 10) + assert r == 1 + conn._connection.fcall("set_queue_initial_delay", 2, "test_queue", 20) + r = list_to_dict(conn._connection.fcall("get_queue_info", 1, "test_queue")) + assert int(r["delay"]) == 20 # delay + + +def test_set_queue_max_size(conn: RedisLiteConnection): + """Can set the max size for a queue.""" + r = conn._connection.fcall("create_queue", 4, "test_queue", 10, 10, 10) + assert r == 1 + conn._connection.fcall("set_queue_max_size", 2, "test_queue", 20) + r = list_to_dict(conn._connection.fcall("get_queue_info", 1, "test_queue")) + assert int(r["maxsize"]) == 20 # maxsize + + +def test_create_queue_defaults(conn: RedisLiteConnection): + """You can create a queue with default values.""" + r = conn._connection.fcall("create_queue", 1, "test_queue") + assert r == 1 + assert conn._connection.sismember("QUEUES", "test_queue") + r = conn._connection.hgetall("test_queue:Q") + assert set(r.keys()) == {b"created", b"delay", b"maxsize", b"modified", b"totalrecv", b"totalsent", b"vt"} + assert int(r[b"delay"]) == 0 + assert int(r[b"maxsize"]) == 65565 + assert int(r[b"vt"]) == 60 + + +def test_push_message(conn: RedisLiteConnection): + """You can send a message to an existing queue.""" + ts_msec = int((datetime.datetime.now().timestamp()) * 1_000) + r = conn._connection.fcall("create_queue", 1, "test_queue") + assert r == 1 + msg_id = conn._connection.fcall("push_message", 3, "test_queue", "foo", 0).decode("utf8") + msg = conn._connection.hget("test_queue:Q", msg_id).decode("utf8") + assert msg == "foo" + messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(messages) == 1 + assert messages[0][0].decode("utf8") == msg_id + assert int(messages[0][1]) >= ts_msec + + +def test_push_message_missing_queue(conn: RedisLiteConnection): + """You can send a message to a non-existing queue.""" + ts_msec = int((datetime.datetime.now().timestamp()) * 1_000) + msg_id = conn._connection.fcall("push_message", 3, "test_queue", "foo", 0).decode("utf8") + msg = conn._connection.hget("test_queue:Q", msg_id).decode("utf8") + assert msg == "foo" + + messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(messages) == 1 + assert messages[0][0].decode("utf8") == msg_id + assert int(messages[0][1]) >= ts_msec + + +def test_get_message(conn: RedisLiteConnection): + """You can get a message from an existing queue.""" + viz_timeout = 10 + msg_id = conn._connection.fcall("push_message", 3, "test_queue", "foo", 0).decode("utf8") + + # Get the sorted messages before the get_message call + pre_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(pre_messages) == 1 + + # Get and verify the message + msg = list_to_dict(conn._connection.fcall("get_message", 2, "test_queue", viz_timeout)) + assert msg["msg_id"] == msg_id + assert msg["msg_body"] == "foo" + assert msg["rc"] == 1 + + # Get the sorted messages after the get_message call + post_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(post_messages) == 1 + + # Verify the score was updated by the viz_timeout * 1,000 + sent_ts = int(pre_messages[0][1]) + delayed_ts = int(post_messages[0][1]) + assert delayed_ts - sent_ts == viz_timeout * 1_000 + + # verify the queue stats + queue_stats = conn._connection.hgetall("test_queue:Q") + assert queue_stats[b"totalrecv"] == b"1" + assert queue_stats[b"totalsent"] == b"1" + + +def test_get_message_uses_default_vt(conn: RedisLiteConnection): + """You can get a message from an existing queue and it uses the queue's default visibility timeout.""" + viz_timeout = 10 + conn._connection.fcall("create_queue", 4, "test_queue", viz_timeout, 0, 0) + conn._connection.fcall("push_message", 2, "test_queue", "foo").decode("utf8") + + # Get the sorted messages before the get_message call + pre_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(pre_messages) == 1 + + # Get and verify the message + conn._connection.fcall("get_message", 1, "test_queue") + + # Get the sorted messages after the get_message call + post_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(post_messages) == 1 + + # Verify the score was updated by the queue's viz_timeout * 1,000 + sent_ts = int(pre_messages[0][1]) + delayed_ts = int(post_messages[0][1]) + assert delayed_ts - sent_ts == viz_timeout * 1_000 + + # verify the queue stats + queue_stats = conn._connection.hgetall("test_queue:Q") + assert queue_stats[b"totalrecv"] == b"1" + assert queue_stats[b"totalsent"] == b"1" + + +def test_delete_message(conn: RedisLiteConnection): + """Deleting a message should remove it from the queue.""" + msg_id = conn._connection.fcall("push_message", 3, "test_queue", "foo", 0).decode("utf8") + + # Get the sorted messages before the get_message call + pre_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(pre_messages) == 1 + + # Delete the message + conn._connection.fcall("delete_message", 2, "test_queue", msg_id) + + # Get the sorted messages after the get_message call + post_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(post_messages) == 0 + + # verify the queue stats + queue_stats = conn._connection.hgetall("test_queue:Q") + assert queue_stats[b"totalrecv"] == b"0" + assert queue_stats[b"totalsent"] == b"1" + assert msg_id.encode("utf8") not in queue_stats + + +def test_delete_missing_message(conn: RedisLiteConnection): + """Deleting a non-existing message should do nothing.""" + conn._connection.fcall("create_queue", 1, "test_queue") + + # Get the sorted messages before the get_message call + pre_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(pre_messages) == 0 + + # Delete the message + conn._connection.fcall("delete_message", 2, "test_queue", "foo") + + # Get the sorted messages after the get_message call + post_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(post_messages) == 0 + + # verify the queue stats + queue_stats = conn._connection.hgetall("test_queue:Q") + assert queue_stats[b"totalrecv"] == b"0" + assert queue_stats[b"totalsent"] == b"0" + + +def test_pop_message(conn: RedisLiteConnection): + """Popping a message should remove it from the queue.""" + msg_id = conn._connection.fcall("push_message", 3, "test_queue", "foo", 0).decode("utf8") + + # Get the sorted messages before the get_message call + pre_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(pre_messages) == 1 + + # Get and verify the message + msg = list_to_dict(conn._connection.fcall("pop_message", 1, "test_queue")) + assert msg["msg_id"] == msg_id + assert msg["msg_body"] == "foo" + assert msg["rc"] == 1 + + # Get the sorted messages after the get_message call + post_messages = conn._connection.zrange("test_queue", 0, -1, withscores=True) + assert len(post_messages) == 0 + + # verify the queue stats + queue_stats = conn._connection.hgetall("test_queue:Q") + assert queue_stats[b"totalrecv"] == b"1" + assert queue_stats[b"totalsent"] == b"1" diff --git a/tools/gen-codeowners.sh b/tools/gen-codeowners.sh new file mode 100755 index 0000000..af8e556 --- /dev/null +++ b/tools/gen-codeowners.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +# Generate the CODEOWNERS file for all unempty files +# +# Requires git-fame: pip install git-fame +# +# Usage: gen-codeowners.sh path1 [path2 ...] + +owners(){ + for f in $(git ls-files "$*"); do + LINECOUNT=$(wc -l "$f" | awk '{print $1}') + if [[ $LINECOUNT -gt 0 ]]; then + echo -n "$f " + # author emails if loc distribution >= 30% + git fame -esnwMC --incl "$f" | tr '/' '|' \ + | awk -v filename="$f" -F '|' '(NR>6 && $6>=30) {print $2}' \ + | xargs echo + fi + done +} +owners "$*" | tee CODEOWNERS