Skip to content

Commit

Permalink
Merge pull request #114 from openeduhub/develop
Browse files Browse the repository at this point in the history
Merge recent HTTPX-related fixes into `master`
  • Loading branch information
Criamos authored Sep 26, 2024
2 parents a8013ff + eda6ea4 commit 8e43c33
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 89 deletions.
26 changes: 26 additions & 0 deletions .run/merlin_spider.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="merlin_spider" type="PythonConfigurationType" factoryName="Python">
<output_file path="$PROJECT_DIR$/logs/merlin_spider_console.log" is_save="true" />
<module name="oeh-search-etl" />
<option name="ENV_FILES" value="" />
<option name="INTERPRETER_OPTIONS" value="" />
<option name="PARENT_ENVS" value="true" />
<envs>
<env name="PYTHONUNBUFFERED" value="1" />
</envs>
<option name="SDK_HOME" value="" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/" />
<option name="IS_MODULE_SDK" value="true" />
<option name="ADD_CONTENT_ROOTS" value="true" />
<option name="ADD_SOURCE_ROOTS" value="true" />
<EXTENSION ID="PythonCoverageRunConfigurationExtension" runner="coverage.py" />
<option name="SCRIPT_NAME" value="./.venv/bin/scrapy" />
<option name="PARAMETERS" value="crawl oersi_spider -O &quot;../../logs/merlin_spider.json&quot;" />
<option name="SHOW_COMMAND_LINE" value="false" />
<option name="EMULATE_TERMINAL" value="false" />
<option name="MODULE_MODE" value="false" />
<option name="REDIRECT_INPUT" value="false" />
<option name="INPUT_FILE" value="" />
<method v="2" />
</configuration>
</component>
143 changes: 65 additions & 78 deletions converter/es_connector.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import asyncio
import base64
import json
import logging
import pprint
import time
import uuid
from asyncio import Semaphore
from enum import Enum
from typing import List, Optional

import httpx
import requests
import vobject
from requests.auth import HTTPBasicAuth
Expand Down Expand Up @@ -113,8 +110,8 @@ class CreateGroupType(Enum):
nodeApi: NODEV1Api
groupCache: List[str]
enabled: bool
_client_async = httpx.AsyncClient()
_sem: Semaphore = asyncio.Semaphore(25)
r_session: requests.Session = requests.Session()
# see: https://requests.readthedocs.io/en/latest/user/advanced/#session-objects

def __init__(self):
cookie_threshold = env.get("EDU_SHARING_COOKIE_REBUILD_THRESHOLD", True)
Expand All @@ -135,16 +132,16 @@ def get_headers(self, content_type: str | None = "application/json"):
return header_dict

def sync_node(self, spider, type, properties):
groupBy = []
group_by = []
if "ccm:replicationsourceorigin" in properties:
groupBy = ["ccm:replicationsourceorigin"]
group_by = ["ccm:replicationsourceorigin"]
try:
response = EduSharing.bulkApi.sync(
request_body=properties,
match=["ccm:replicationsource", "ccm:replicationsourceid"],
type=type,
group=spider.name,
group_by=groupBy,
group_by=group_by,
reset_version=EduSharing.resetVersion,
)
except ApiException as e:
Expand All @@ -167,16 +164,14 @@ def sync_node(self, spider, type, properties):
raise e
return response["node"]

async def set_node_text(self, uuid, item) -> bool:
def set_node_text(self, uuid, item) -> bool:
if "fulltext" in item:
response = await self._client_async.post(
get_project_settings().get("EDU_SHARING_BASE_URL")
+ "rest/node/v1/nodes/-home-/"
+ uuid
+ "/textContent?mimetype=text/plain",
headers=self.get_headers("multipart/form-data"),
response = self.r_session.post(
url=f"{get_project_settings().get("EDU_SHARING_BASE_URL")}"
f"rest/node/v1/nodes/-home-/{uuid}"
"/textContent?mimetype=text/plain",
data=item["fulltext"].encode("utf-8"),
timeout=None,
headers=self.get_headers("multipart/form-data"),
)
return response.status_code == 200
# does currently not store data
Expand All @@ -197,47 +192,39 @@ def set_permissions(self, uuid, permissions) -> bool:
send_copy=False,
)
return True
except ApiException as e:
except ApiException:
return False

async def set_node_binary_data(self, uuid, item) -> bool:
def set_node_binary_data(self, uuid, item) -> bool:
if "binary" in item:
log.info(
get_project_settings().get("EDU_SHARING_BASE_URL")
+ "rest/node/v1/nodes/-home-/"
+ uuid
+ "/content?mimetype="
+ item["lom"]["technical"]["format"]
f"{get_project_settings().get("EDU_SHARING_BASE_URL")}"
f"rest/node/v1/nodes/-home-/{uuid}"
f"/content?mimetype={item["lom"]["technical"]["format"]}"
)
files = {"file": item["binary"]}
response = await self._client_async.post(
get_project_settings().get("EDU_SHARING_BASE_URL")
+ "rest/node/v1/nodes/-home-/"
+ uuid
+ "/content?mimetype="
+ item["lom"]["technical"]["format"],
response = self.r_session.post(
url=f"{get_project_settings().get("EDU_SHARING_BASE_URL")}"
f"rest/node/v1/nodes/-home-/{uuid}"
f"/content?mimetype={item['lom']['technical']['format']}",
headers=self.get_headers(None),
files=files,
timeout=None,
)
return response.status_code == 200
else:
return False

async def set_node_preview(self, uuid, item) -> bool:
def set_node_preview(self, uuid, item) -> bool:
if "thumbnail" in item:
key = "large" if "large" in item["thumbnail"] else "small" if "small" in item["thumbnail"] else None
if key:
files = {"image": base64.b64decode(item["thumbnail"][key])}
response = await self._client_async.post(
get_project_settings().get("EDU_SHARING_BASE_URL")
+ "rest/node/v1/nodes/-home-/"
+ uuid
+ "/preview?mimetype="
+ item["thumbnail"]["mimetype"],
response = self.r_session.post(
url=f"{get_project_settings().get("EDU_SHARING_BASE_URL")}"
f"rest/node/v1/nodes/-home-/{uuid}"
f"/preview?mimetype={item["thumbnail"]["mimetype"]}",
headers=self.get_headers(None),
files=files,
timeout=None,
)
return response.status_code == 200
else:
Expand Down Expand Up @@ -429,7 +416,7 @@ def transform_item(self, uuid, spider, item):
try:
# edusharing requires milliseconds
duration = int(float(duration) * 1000)
except:
except ValueError:
log.debug(
f"The supplied 'technical.duration'-value {duration} could not be converted from "
f"seconds to milliseconds. ('cclom:duration' expects ms)"
Expand Down Expand Up @@ -457,8 +444,8 @@ def transform_item(self, uuid, spider, item):
continue
mapping = EduSharingConstants.LIFECYCLE_ROLES_MAPPING[person["role"].lower()]
# convert to a vcard string
firstName = person["firstName"] if "firstName" in person else ""
lastName = person["lastName"] if "lastName" in person else ""
first_name = person["firstName"] if "firstName" in person else ""
last_name = person["lastName"] if "lastName" in person else ""
title: str = person["title"] if "title" in person else ""
organization = person["organization"] if "organization" in person else ""
url = person["url"] if "url" in person else ""
Expand All @@ -476,8 +463,8 @@ def transform_item(self, uuid, spider, item):
address_type: str = person["address_type"] if "address_type" in person else ""
# create the vCard object first, then add attributes on-demand / if available
vcard = vobject.vCard()
vcard.add("n").value = vobject.vcard.Name(family=lastName, given=firstName)
vcard.add("fn").value = organization if organization else (firstName + " " + lastName).strip()
vcard.add("n").value = vobject.vcard.Name(family=last_name, given=first_name)
vcard.add("fn").value = organization if organization else (first_name + " " + last_name).strip()
# only the "fn"-attribute is required to serialize the vCard. (all other properties are optional)
if address_city or address_country or address_postal_code or address_region or address_street:
# The vCard v3 "ADR" property is used for physical addresses
Expand Down Expand Up @@ -543,7 +530,7 @@ def transform_item(self, uuid, spider, item):
else:
spaces[mapping] = [vcard.serialize(lineLength=10000)]

valuespaceMapping = {
valuespace_mapping = {
"accessibilitySummary": "ccm:accessibilitySummary",
"conditionsOfAccess": "ccm:conditionsOfAccess",
"containsAdvertisement": "ccm:containsAdvertisement",
Expand All @@ -562,11 +549,11 @@ def transform_item(self, uuid, spider, item):
"toolCategory": "ccm:toolCategory",
}
for key in item["valuespaces"]:
spaces[valuespaceMapping[key]] = item["valuespaces"][key]
spaces[valuespace_mapping[key]] = item["valuespaces"][key]
# add raw values if the api supports it
if EduSharing.version["major"] >= 1 and EduSharing.version["minor"] >= 1:
for key in item["valuespaces_raw"]:
splitted = valuespaceMapping[key].split(":")
splitted = valuespace_mapping[key].split(":")
splitted[0] = "virtual"
spaces[":".join(splitted)] = item["valuespaces_raw"][key]

Expand Down Expand Up @@ -624,11 +611,11 @@ def transform_item(self, uuid, spider, item):
pass
pass

mdsId = env.get("EDU_SHARING_METADATASET", allow_null=True, default="mds_oeh")
if mdsId != "default":
spaces["cm:edu_metadataset"] = mdsId
mds_id = env.get("EDU_SHARING_METADATASET", allow_null=True, default="mds_oeh")
if mds_id != "default":
spaces["cm:edu_metadataset"] = mds_id
spaces["cm:edu_forcemetadataset"] = "true"
log.debug("Using metadataset " + mdsId)
log.debug("Using metadataset " + mds_id)
else:
log.debug("Using default metadataset")

Expand Down Expand Up @@ -656,7 +643,7 @@ def create_groups_if_not_exists(self, groups, type: CreateGroupType):
log.info("Group " + uuid + " was found in edu-sharing (cache inconsistency), no need to create")
EduSharing.groupCache.append(uuid)
continue
except ApiException as e:
except ApiException:
log.info("Group " + uuid + " was not found in edu-sharing, creating it")
pass

Expand Down Expand Up @@ -704,14 +691,14 @@ def set_node_permissions(self, uuid, item):
# if not 'groups' in item['permissions'] and not 'mediacenters' in item['permissions']:
# log.error('Invalid state detected: Permissions public is set to false but neither groups or mediacenters are set. Please use either public = true without groups/mediacenters or public = false and set group/mediacenters. No permissions will be set!')
# return
mergedGroups = []
merged_groups = []
if "groups" in item["permissions"]:
if "autoCreateGroups" in item["permissions"] and item["permissions"]["autoCreateGroups"] is True:
self.create_groups_if_not_exists(
item["permissions"]["groups"],
EduSharing.CreateGroupType.Regular,
)
mergedGroups += list(
merged_groups += list(
map(
lambda x: EduSharingConstants.GROUP_PREFIX + x,
item["permissions"]["groups"],
Expand All @@ -726,15 +713,15 @@ def set_node_permissions(self, uuid, item):
item["permissions"]["mediacenters"],
EduSharing.CreateGroupType.MediaCenter,
)
mergedGroups += list(
merged_groups += list(
map(
lambda x: EduSharingConstants.GROUP_PREFIX
+ EduSharingConstants.MEDIACENTER_PROXY_PREFIX
+ x,
item["permissions"]["mediacenters"],
)
)
for group in mergedGroups:
for group in merged_groups:
permissions["permissions"].append(
{
"authority": {
Expand All @@ -753,46 +740,47 @@ def set_node_permissions(self, uuid, item):
)
log.error(item["permissions"])

async def insert_item(self, spider, uuid, item):
async with self._sem:
# inserting items is controlled with a Semaphore, otherwise we'd get PoolTimeout Exceptions when there's a
# temporary burst of items that need to be inserted
node = self.sync_node(spider, "ccm:io", self.transform_item(uuid, spider, item))
self.set_node_permissions(node["ref"]["id"], item)
await self.set_node_preview(node["ref"]["id"], item)
if not await self.set_node_binary_data(node["ref"]["id"], item):
await self.set_node_text(node["ref"]["id"], item)
def insert_item(self, spider, uuid, item):
node = self.sync_node(spider, "ccm:io", self.transform_item(uuid, spider, item))
self.set_node_permissions(node["ref"]["id"], item)
self.set_node_preview(node["ref"]["id"], item)
if not self.set_node_binary_data(node["ref"]["id"], item):
self.set_node_text(node["ref"]["id"], item)

async def update_item(self, spider, uuid, item):
await self.insert_item(spider, uuid, item)
def update_item(self, spider, uuid, item):
self.insert_item(spider, uuid, item)

@staticmethod
def init_cookie():
log.debug("Init edu sharing cookie...")
log.debug("Init edu-sharing cookie...")
settings = get_project_settings()
auth = requests.get(
settings.get("EDU_SHARING_BASE_URL") + "rest/authentication/v1/validateSession",
url=f"{settings.get("EDU_SHARING_BASE_URL")}rest/authentication/v1/validateSession",
auth=HTTPBasicAuth(
settings.get("EDU_SHARING_USERNAME"),
settings.get("EDU_SHARING_PASSWORD"),
username=f"{settings.get("EDU_SHARING_USERNAME")}",
password=f"{settings.get("EDU_SHARING_PASSWORD")}"
),
headers={"Accept": "application/json"},
)
isAdmin = json.loads(auth.text)["isAdmin"]
log.info("Got edu sharing cookie, admin status: " + str(isAdmin))
if isAdmin:
is_admin = json.loads(auth.text)["isAdmin"]
log.info(f"Got edu-sharing cookie, admin status: {is_admin}")
if is_admin:
# --- setting cookies for the (openAPI generated) API client:
cookies = []
for cookie in auth.headers["SET-COOKIE"].split(","):
cookies.append(cookie.split(";")[0])
EduSharing.cookie = ";".join(cookies)
# --- setting cookies for the requests.Session object:
cookie_dict: dict = requests.utils.dict_from_cookiejar(auth.cookies)
EduSharing.r_session.cookies.update(cookie_dict)
return auth

def init_api_client(self):
if EduSharing.cookie is None:
settings = get_project_settings()
auth = self.init_cookie()
isAdmin = json.loads(auth.text)["isAdmin"]
if isAdmin:
is_admin = json.loads(auth.text)["isAdmin"]
if is_admin:
configuration = Configuration()
configuration.host = settings.get("EDU_SHARING_BASE_URL") + "rest"
EduSharing.apiClient = ESApiClient(
Expand Down Expand Up @@ -849,9 +837,8 @@ def init_api_client(self):
return
log.warning(auth.text)
raise Exception(
"Could not authentify as admin at edu-sharing. Please check your settings for repository "
+ settings.get("EDU_SHARING_BASE_URL")
)
f"Could not authenticate as admin at edu-sharing. Please check your settings for repository "
f"{settings.get("EDU_SHARING_BASE_URL")}")

@staticmethod
def build_uuid(url):
Expand Down
19 changes: 12 additions & 7 deletions converter/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,13 +939,18 @@ def process_item(self, raw_item, spider):
db_item = self.find_item(item["sourceId"], spider)
if db_item:
if item["hash"] != db_item[1]:
log.debug(f"hash has changed, continuing pipelines for item {item['sourceId']}")
log.debug(f"EduSharingCheckPipeline: hash has changed. Continuing pipelines for item {item['sourceId']}")
else:
log.debug(f"hash unchanged, skipping item {item['sourceId']}")
# self.update(item['sourceId'], spider)
# for tests, we update everything for now
# activate this later
# raise DropItem()
if "EDU_SHARING_FORCE_UPDATE" in spider.custom_settings and spider.custom_settings["EDU_SHARING_FORCE_UPDATE"]:
log.debug(f"EduSharingCheckPipeline: hash unchanged for item {item['sourceId']}, "
f"but detected active 'force item update'-setting (resetVersion / forceUpdate). "
f"Continuing pipelines ...")
else:
log.debug(f"EduSharingCheckPipeline: hash unchanged, skipping item {item['sourceId']}")
# self.update(item['sourceId'], spider)
# for tests, we update everything for now
# activate this later
# raise DropItem()
return raw_item

class EduSharingTypeValidationPipeline(BasicPipeline):
Expand Down Expand Up @@ -1103,7 +1108,7 @@ async def process_item(self, raw_item, spider):
if "title" in item["lom"]["general"]:
title = str(item["lom"]["general"]["title"])
entryUUID = EduSharing.build_uuid(item["response"]["url"] if "url" in item["response"] else item["hash"])
await self.insert_item(spider, entryUUID, item)
self.insert_item(spider, entryUUID, item)
log.info("item " + entryUUID + " inserted/updated")

# @TODO: We may need to handle Collections
Expand Down
3 changes: 3 additions & 0 deletions converter/spiders/base_classes/lom_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ def __init__(self, **kwargs):
logging.info(
f"resetVersion requested, will force update + reset versions for crawler {self.name}"
)
# populate the custom_settings so we can read the value more comfortably
# when an item passes through the pipeline
self.custom_settings.update({"EDU_SHARING_FORCE_UPDATE": True})
# EduSharing().deleteAll(self)
EduSharing.resetVersion = True
self.forceUpdate = True
Expand Down
Loading

0 comments on commit 8e43c33

Please sign in to comment.