-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix load scopus csv authors affiliations #59
base: master
Are you sure you want to change the base?
Changes from all commits
74cccdb
bf2d4e5
657ff7b
941f20e
6acad51
19db43f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,17 +17,17 @@ def name(self): | |
|
||
|
||
class ScopusCsvAuthor(Author): | ||
def __init__(self, name, affiliation): | ||
def __init__(self, name, affiliations): | ||
self._name = name | ||
self._affiliation = affiliation | ||
self._affiliations = affiliations | ||
|
||
@property | ||
def name(self): | ||
return self._name | ||
|
||
@property | ||
def affiliations(self): | ||
return [ScopusCsvAffiliation(self._affiliation)] | ||
return self._affiliations | ||
|
||
|
||
class ScopusCsvDocument(Document): | ||
|
@@ -42,23 +42,122 @@ def __init__(self, entry): | |
|
||
@property | ||
def title(self) -> Optional[str]: | ||
return self.entry.get("Title") or None | ||
return self.entry.get("Title") | ||
|
||
def _parse_authors(self, auths: str) -> List[str]: | ||
""" | ||
helper method. parse two formats of Authors field. | ||
Formats: | ||
|
||
1. deliminated by , or ; | ||
|
||
if either deliminator isn't present, will assume | ||
only one author in field | ||
""" | ||
if ";" in auths: | ||
author_list = auths.split("; ") | ||
elif "," in auths: | ||
author_list = auths.split(", ") | ||
# add comma between auth last, first to match format | ||
# in authors with affiliations | ||
author_list = [", ".join(auth.rsplit(" ", 1)) for auth in author_list] | ||
|
||
else: # single author... | ||
author_list = [auths] | ||
return author_list | ||
|
||
def _get_authors_ids(self) -> List[str]: | ||
""" | ||
helper method to parse two formats of | ||
'Author(s) ID' field | ||
|
||
1. AUTHOR_ID;AUTHOR_ID_2;LAST_AUTHOR_ID; | ||
2. AUTHOR_ID; AUTHOR_ID_2; LAST AUTHOR_ID | ||
""" | ||
auths_id = self.entry.get("Author(s) ID") | ||
|
||
if auths_id == "": | ||
return [] | ||
|
||
if auths_id[-1] == ";": | ||
auths_id = auths_id[:-1] | ||
|
||
auths_ids = auths_id.split(";") | ||
auths_ids = [auth_id.lstrip().rstrip() for auth_id in auths_ids] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return auths_ids | ||
|
||
def _try_to_add_ids_to_authors(self, auths: List[str]) -> List[str]: | ||
auths_ids = self._get_authors_ids() | ||
|
||
if len(auths_ids) == len(auths) and len(auths) > 0: | ||
auths_w_ids = [f"{name} (ID: {auth_id})" for name, auth_id in zip(auths, auths_ids)] | ||
return auths_w_ids | ||
return auths | ||
|
||
def _parse_affiliations(self, affs) -> List[str]: | ||
if affs == "": | ||
return [] | ||
return [aff.lstrip().rstrip() for aff in affs.split(";")] | ||
|
||
@property | ||
def authors(self) -> List[ScopusCsvAuthor]: | ||
def authors(self) -> Optional[List[ScopusCsvAuthor]]: | ||
auths = self.entry.get("Authors") | ||
no_authors_formats = ["[No Authors Found]", "[No author name available]"] | ||
|
||
if auths == "" or auths in no_authors_formats: | ||
return None | ||
|
||
# use auths to search in auths_with_affs string. | ||
auths = self._parse_authors(auths) | ||
# use auths_with_ids for unique field. | ||
authors_with_ids = self._try_to_add_ids_to_authors(auths) | ||
|
||
affs = self.entry.get("Affiliations") | ||
affs = self._parse_affiliations(affs) | ||
# if single author, no way to know if ',' in author name | ||
# within auths_affs field (can't search string), | ||
# use 'Affiliations' field. | ||
if len(auths) == 1: | ||
return [ | ||
ScopusCsvAuthor(authors_with_ids[0], [ScopusCsvAffiliation(aff) for aff in affs]) | ||
] | ||
|
||
auths_affs = self.entry.get("Authors with affiliations") | ||
auths_id = self.entry.get("Author(s) ID", "") | ||
# author_last, first initial, affiliation; ..... | ||
if not auths_affs: | ||
return [] | ||
auths_affs = auths_affs.split("; ") | ||
auths = [", ".join(auth_aff.split(", ")[0:2]) for auth_aff in auths_affs] | ||
affs = [", ".join(auth_aff.split(", ")[2:]) for auth_aff in auths_affs] | ||
# try to add id to author name | ||
auths_id = auths_id.split(";")[:-1] # remove empty string last el | ||
if len(auths) == len(auths_id): | ||
auths = [f"{name} (ID: {auth_id})" for name, auth_id in zip(auths, auths_id)] | ||
return [ScopusCsvAuthor(a, b) for a, b in zip(auths, affs)] | ||
if auths_affs == "": # can't map affiliations to authors | ||
return [ScopusCsvAuthor(auth, None) for auth in authors_with_ids] | ||
|
||
indexes_of_authors = [auths_affs.index(auth) for auth in auths] | ||
auth_to_affs_mapping = {} | ||
|
||
for num, index in enumerate(indexes_of_authors): | ||
# auth = auths[num] | ||
|
||
if num < len(indexes_of_authors) - 1: | ||
next_index = indexes_of_authors[num + 1] | ||
cur_auth_affils = auths_affs[index:next_index] | ||
else: | ||
cur_auth_affils = auths_affs[index:] | ||
# cur_auth_affils = substring.replace(f"{auth}, ", "") | ||
# could be multiple affiliates, but no clear deliminator | ||
|
||
affs_filtered = [a for a in affs if a in cur_auth_affils] | ||
affs_filtered = sorted(affs_filtered, key=lambda x: len(x)) | ||
# edge case is str in affs is substr of aff in cur_auth_affs | ||
|
||
# removes edge case where aff is substring of other aff | ||
disclude = [] | ||
short_string = affs_filtered[0] | ||
for j in range(0, len(affs_filtered) - 1): | ||
long_strings = affs_filtered[j + 1 :] | ||
for ls in long_strings: | ||
if short_string in ls: | ||
disclude.append(short_string) | ||
short_string = affs_filtered[j + 1] | ||
|
||
auth_to_affs_mapping[authors_with_ids[num]] = [ | ||
ScopusCsvAffiliation(a) for a in affs_filtered if a not in disclude | ||
] | ||
return [ScopusCsvAuthor(a, b) for a, b in auth_to_affs_mapping.items()] | ||
|
||
@property | ||
def publisher(self) -> Optional[str]: | ||
|
@@ -79,7 +178,7 @@ def publication_year(self) -> Optional[int]: | |
def keywords(self) -> Optional[List[str]]: | ||
keywords = self.entry.get("Author Keywords") | ||
if not keywords: | ||
return None | ||
return [] | ||
return keywords.split("; ") | ||
|
||
@property | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
import os | ||
from litstudy.sources.scopus_csv import load_scopus_csv | ||
|
||
|
||
def pytest_generate_tests(metafunc): | ||
path = os.path.dirname(__file__) + "/resources/scopus.csv" | ||
docs = load_scopus_csv(path) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this call |
||
if "doc" in metafunc.fixturenames: | ||
metafunc.parametrize("doc", docs) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be possible to rename this from Also, should it not be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can just become
if auths_ids:
. If, in the future, the scopus format changes again thenself.entry.get("Author(s) ID")
could returnNone
. Theif auths_ids:
will catch both empty strings""
andNone
s.