Skip to content

Commit

Permalink
fix: Support PyMongo 4+
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkLark86 committed Jun 18, 2024
1 parent 3f34c72 commit 84796a9
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 41 deletions.
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@
"requests>=2.7.0,<3.0",
"boto3>=1.26,<2.0",
"websockets>=10.3,<10.4",
"mongolock>=1.3.4,<1.4",
"PyYAML>=6.0.1",
"lxml>=5.2.2,<5.3",
"lxml_html_clean>=0.1.1,<0.2",
"python-twitter>=3.5,<3.6",
"chardet<6.0",
"pymongo>=3.8,<3.12",
"pymongo>=4.7.3,<4.8",
"croniter<2.1",
"python-dateutil<2.10",
"unidecode>=0.04.21,<=1.3.8",
Expand All @@ -67,6 +66,8 @@
"itsdangerous>=1.1,<2.0",
"pymemcache>=4.0,<4.1",
"xmlsec>=1.3.13,<1.3.15",
# Async libraries
"motor>=3.4.0,<4.0",
]

package_data = {
Expand Down
2 changes: 1 addition & 1 deletion superdesk/emails/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def send_activity_emails(activity, recipients):
subject = render_template("notification_subject.txt", notification=notification)

send_email.delay(subject=subject, sender=admins[0], recipients=recipients, text_body=text_body, html_body=html_body)
email_timestamps.update({"_id": message_id}, {"_id": message_id, "_created": now}, upsert=True)
email_timestamps.update_one({"_id": message_id}, {"$set": {"_id": message_id, "_created": now}}, upsert=True)


def send_article_killed_email(article, recipients, transmitted_at):
Expand Down
42 changes: 35 additions & 7 deletions superdesk/eve_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,14 @@ def get(self, endpoint_name, req, lookup, **kwargs):
backend = self._lookup_backend(endpoint_name, fallback=True)
is_mongo = self._backend(endpoint_name) == backend

cursor, count = backend.find(endpoint_name, req, lookup, perform_count=req.if_modified_since)
cursor, _ = backend.find(endpoint_name, req, lookup, perform_count=False)

if req.if_modified_since and count:
try:
has_items = cursor[0] is not None
except IndexError:
has_items = False

if req.if_modified_since and has_items:
# fetch all items, not just updated
req.if_modified_since = None
cursor, count = backend.find(endpoint_name, req, lookup, perform_count=False)
Expand All @@ -137,7 +142,7 @@ def get(self, endpoint_name, req, lookup, **kwargs):
if is_mongo and source_config.get("collation"):
cursor.collation(Collation(locale=app.config.get("MONGO_LOCALE", "en_US")))

self._cursor_hook(cursor=cursor, req=req)
self._cursor_hook(cursor=cursor, endpoint_name=endpoint_name, req=req, lookup=lookup)
return cursor

def get_from_mongo(self, endpoint_name, req, lookup, perform_count=False):
Expand All @@ -151,8 +156,8 @@ def get_from_mongo(self, endpoint_name, req, lookup, perform_count=False):
"""
req.if_modified_since = None
backend = self._backend(endpoint_name)
cursor, _ = backend.find(endpoint_name, req, lookup, perform_count=perform_count)
self._cursor_hook(cursor=cursor, req=req)
cursor, _ = backend.find(endpoint_name, req, lookup, perform_count=False)
self._cursor_hook(cursor=cursor, endpoint_name=endpoint_name, req=req, lookup=lookup)
return cursor

def find_and_modify(self, endpoint_name, **kwargs):
Expand All @@ -166,7 +171,7 @@ def find_and_modify(self, endpoint_name, **kwargs):
if kwargs.get("query"):
kwargs["query"] = backend._mongotize(kwargs["query"], endpoint_name)

result = backend.driver.db[endpoint_name].find_and_modify(**kwargs)
result = backend.driver.db[endpoint_name].find_one_and_update(**kwargs)
cache.clean([endpoint_name])
return result

Expand Down Expand Up @@ -464,9 +469,32 @@ def _set_parent(self, endpoint_name, doc, lookup):
if parent:
lookup["parent"] = parent

def _cursor_hook(self, cursor, req):
def construct_count_function(self, resource, req, lookup):
backend = self._backend(resource)

client_sort = backend._convert_sort_request_to_dict(req)
spec = backend._convert_where_request_to_dict(resource, req)

if lookup:
spec = backend.combine_queries(spec, lookup)

spec = backend._mongotize(spec, resource)
client_projection = backend._client_projection(req)

datasource, spec, projection, sort = backend._datasource_ex(resource, spec, client_projection, client_sort)
target = backend.pymongo(resource).db[datasource]

def count_function():
return target.count_documents(spec)

return count_function

def _cursor_hook(self, cursor, endpoint_name, req, lookup):
"""Apply additional methods for cursor"""

if not hasattr(cursor, "count"):
setattr(cursor, "count", self.construct_count_function(endpoint_name, req, lookup))

if not req or not req.args:
return

Expand Down
4 changes: 3 additions & 1 deletion superdesk/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import logging

from datetime import datetime
from mongolock import MongoLock, MongoLockException

# from mongolock import MongoLock, MongoLockException
from superdesk.mongolock import MongoLock, MongoLockException
from werkzeug.local import LocalProxy
from flask import current_app as app
from superdesk.logging import logger
Expand Down
129 changes: 129 additions & 0 deletions superdesk/mongolock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# Copied from https://github.com/lorehov/mongolock, to support newer PyMongo lib

import time
import contextlib
from datetime import datetime, timedelta

from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from pymongo.collection import Collection


class MongoLockException(Exception):
pass


class MongoLockLocked(Exception):
pass


class MongoLock(object):
def __init__(self, host="localhost", db="mongolock", collection="lock", client=None, acquire_retry_step=0.1):
"""Create a new instance of MongoLock.
:Parameters:
- `host` (optional) - use it to manually specify mongodb connection string
- `db` (optional) - db name
- `collection` (optional) - collection name or :class:`pymongo.Collection` instance
- `client` - instance of :class:`MongoClient` or :class:`MongoReplicaSetClient`,
- `acquire_retry_step` (optional)- time in seconds between retries while trying to acquire the lock,
if specified - `host` parameter will be skipped
"""
self.acquire_retry_step = acquire_retry_step
if isinstance(collection, Collection):
self.collection = collection
else:
if client is None:
self.client = client
else:
self.client = MongoClient(host)
self.collection = self.client[db][collection]

@contextlib.contextmanager
def __call__(self, key, owner, timeout=None, expire=None):
"""See `lock` method."""
if not self.lock(key, owner, timeout, expire):
status = self.get_lock_info(key)
raise MongoLockLocked(
"Timeout, lock owned by {owner} since {ts}, expire time is {expire}".format(
owner=status["owner"], ts=status["created"], expire=status["expire"]
)
)
try:
yield
finally:
self.release(key, owner)

def lock(self, key, owner, timeout=None, expire=None):
"""Lock given `key` to `owner`.
:Parameters:
- `key` - lock name
- `owner` - name of application/component/whatever which asks for lock
- `timeout` (optional) - how long to wait if `key` is locked
- `expire` (optional) - when given, lock will be released after that number of seconds.
Raises `MongoLockTimeout` if can't achieve a lock before timeout.
"""
expire = datetime.utcnow() + timedelta(seconds=expire) if expire else None
try:
self.collection.insert_one(
{"_id": key, "locked": True, "owner": owner, "created": datetime.utcnow(), "expire": expire}
)
return True
except DuplicateKeyError:
start_time = datetime.utcnow()
while True:
if self._try_get_lock(key, owner, expire):
return True

if not timeout or datetime.utcnow() >= start_time + timedelta(seconds=timeout):
return False
time.sleep(self.acquire_retry_step)

def release(self, key, owner):
"""Release lock with given name.
`key` - lock name
`owner` - name of application/component/whatever which held a lock
Raises `MongoLockException` if no such a lock.
"""
status = self.collection.find_and_modify(
{"_id": key, "owner": owner}, {"locked": False, "owner": None, "created": None, "expire": None}
)

def get_lock_info(self, key):
"""Get lock status."""
return self.collection.find_one({"_id": key})

def is_locked(self, key):
lock_info = self.get_lock_info(key)
return not (
not lock_info
or not lock_info["locked"]
or (lock_info["expire"] is not None and lock_info["expire"] < datetime.utcnow())
)

def touch(self, key, owner, expire=None):
"""Renew lock to avoid expiration."""
lock = self.collection.find_one({"_id": key, "owner": owner})
if not lock:
raise MongoLockException("Can't find lock for {key}: {owner}".format(key=key, owner=owner))
if not lock["expire"]:
return
if not expire:
raise MongoLockException("Can't touch lock without expire for {0}: {1}".format(key, owner))
expire = datetime.utcnow() + timedelta(seconds=expire)
self.collection.update_one({"_id": key, "owner": owner}, {"$set": {"expire": expire}})

def _try_get_lock(self, key, owner, expire):
dtnow = datetime.utcnow()
result = self.collection.update_one(
{
"$or": [
{"_id": key, "locked": False},
{"_id": key, "expire": {"$lt": dtnow}},
]
},
{"$set": {"locked": True, "owner": owner, "created": dtnow, "expire": expire}},
)
return result and result.acknowledged and result.modified_count == 1
3 changes: 1 addition & 2 deletions superdesk/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,7 @@ def get_all(self):
return self.get_from_mongo(None, {}).sort("_id")

def find_and_modify(self, query, update, **kwargs):
res = self.backend.find_and_modify(self.datasource, query=query, update=update, **kwargs)
return res
return self.backend.find_and_modify(self.datasource, filter=query, update=update, **kwargs)

def get_all_batch(self, size=500, max_iterations=10000, lookup=None):
"""Gets all items using multiple queries.
Expand Down
24 changes: 23 additions & 1 deletion superdesk/storage/desk_media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@

from typing import Optional
from flask import current_app as app
from flask_babel import _
import logging
import json
import mimetypes
import bson
import bson.errors
import gridfs
import os.path
import hashlib

from eve.io.mongo.media import GridFSMediaStorage

from superdesk.errors import SuperdeskApiError
from . import SuperdeskMediaStorage


Expand Down Expand Up @@ -108,10 +111,29 @@ def put(self, content, filename=None, content_type=None, metadata=None, resource
if filename:
filename = "{}/{}".format(folder, filename)

if hasattr(content, "read"):
data = content.read()
if hasattr(data, "encode"):
data = data.encode("utf-8")
hash_data = hashlib.md5(data)
if hasattr(content, "seek"):
content.seek(0)
elif isinstance(content, bytes):
hash_data = hashlib.md5(content)
elif isinstance(content, str):
hash_data = hashlib.md5(content.encode("utf-8"))
else:
raise SuperdeskApiError.badRequestError(_("Unsupported content type"))

try:
logger.info("Adding file {} to the GridFS".format(filename))
return self.fs(resource).put(
content, content_type=content_type, filename=filename, metadata=metadata, **kwargs
content,
content_type=content_type,
filename=filename,
metadata=metadata,
md5=hash_data.hexdigest(),
**kwargs,
)
except gridfs.errors.FileExists:
logger.info("File exists filename=%s id=%s" % (filename, kwargs["_id"]))
Expand Down
9 changes: 8 additions & 1 deletion superdesk/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ def drop_mongo(app):
dbname = app.config[name]
dbconn = app.data.mongo.pymongo(prefix=prefix).cx
dbconn.drop_database(dbname)
dbconn.close()


def setup_config(config):
Expand Down Expand Up @@ -356,6 +355,14 @@ def inner(*a, **kw):

def setup(context=None, config=None, app_factory=get_app, reset=False):
if not hasattr(setup, "app") or setup.reset or config:
if hasattr(setup, "app"):
# Close all PyMongo Connections (new ones will be created with ``app_factory`` call)
for key, val in setup.app.extensions["pymongo"].items():
val[0].close()

if hasattr(setup.app, "async_app"):
setup.app.async_app.stop()

cfg = setup_config(config)
setup.app = app_factory(cfg)
setup.reset = reset
Expand Down
14 changes: 7 additions & 7 deletions tests/commands/data_updates_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def number_of_data_updates_applied(self):

def test_dry_data_update(self):
superdesk.commands.data_updates.DEFAULT_DATA_UPDATE_FW_IMPLEMENTATION = """
count = mongodb_collection.find({}).count()
count = mongodb_collection.count_documents({})
assert count == 0, count
"""
self.assertEqual(self.number_of_data_updates_applied(), 0)
Expand All @@ -79,18 +79,18 @@ def test_data_update(self):
# create migrations
for index in range(40):
superdesk.commands.data_updates.DEFAULT_DATA_UPDATE_FW_IMPLEMENTATION = """
assert mongodb_collection
count = mongodb_collection.find({}).count()
assert mongodb_collection is not None
count = mongodb_collection.count_documents({})
assert count == %d, count
assert mongodb_database
assert mongodb_database is not None
""" % (
index
)
superdesk.commands.data_updates.DEFAULT_DATA_UPDATE_BW_IMPLEMENTATION = """
assert mongodb_collection
count = mongodb_collection.find({}).count()
assert mongodb_collection is not None
count = mongodb_collection.count_documents({})
assert count == %d, count
assert mongodb_database
assert mongodb_database is not None
""" % (
index + 1
)
Expand Down
Loading

0 comments on commit 84796a9

Please sign in to comment.