Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
quadrismegistus committed Dec 10, 2023
1 parent 8fa7e69 commit 780e525
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 84 deletions.
196 changes: 115 additions & 81 deletions ppa/archive/management/commands/generate_textcorpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
into Solr via the **index** manage command.
"""

DEFAULT_BATCH_SIZE=10000

import os
import json
from django.core.management.base import BaseCommand
Expand All @@ -14,11 +16,25 @@
import gzip
import orjsonl
from functools import cached_property
from collections import deque

class Command(BaseCommand):
"""Custom manage command to generate a text corpus from text indexed in Solr"""
"""
Custom manage command to generate a text corpus from text indexed in Solr.
Examples:
python manage.py generate_textcorpus --path ~/ppa_solr_corpus
python manage.py generate_textcorpus --path ~/ppa_solr_corpus --dry-run
python manage.py generate_textcorpus --path ~/ppa_solr_corpus --doc-limit 100000
python manage.py generate_textcorpus --path ~/ppa_solr_corpus --batch-size 1000
# fields we want from pages
"""

# fields we want from pages, in solr syntax: newfieldname:oldfieldname
PAGE_FIELDLIST = [
'page_id:id',
'work_id:group_id_s',
Expand All @@ -29,125 +45,143 @@ class Command(BaseCommand):
'page_text:content'
]


# Argument parsing
def add_arguments(self, parser):
"""Build CLI arguments: --path and --doc-limit"""
"""
Build CLI arguments for command.
"""

# add --path argument for output storage
parser.add_argument(
"--path", required=True, help="Directory path to save corpus file(s)."
"--path",
required=True,
help="Directory path to save corpus file(s)."
)

# add --doc-limit argument (determines how many results to retrieve from solr)
parser.add_argument(
"--doc-limit",
type=int,
default=-1,
help="Limit on the number of documents for corpus generation."
"The default of -1 considers ALL documents.",
)

# add --batch-size argument (for solr querying)
parser.add_argument(
"--batch-size",
type=int,
default=1000,
help="Number of docs to save at one time",
default=DEFAULT_BATCH_SIZE,
help="Number of docs to query from solr at one time",
)

# add --dry-run argument (don't save anything, just iterate)
parser.add_argument(
"--dry-run",
action='store_true',
help="Do not save anything, just iterate over solr",
)



#### SOLR ####

@cached_property
def query_set(self):
"""
A cached representation of the query set; discourages opening multiple connections to database.
"""
return SolrQuerySet()

def iter_solr(self, batch_size=1000, item_type='page', lim=None, progress=True):

def iter_solr(self, item_type='page'):
"""
Iterate over solr documents of a certain `item_type`
Iterate over solr documents of a certain `item_type`.
"""

def query(order=True):
q=self.query_set.search(item_type=item_type)
if order: q=q.order_by('id')
if item_type=='page': q=q.only(*self.PAGE_FIELDLIST)
return q

q=query(order=True)
total = q.count()
# get query
batch_size,lim = self.batch_size,self.doclimit
qset=self.query_set.search(item_type=item_type).order_by('id')
if item_type=='page': qset=qset.only(*self.PAGE_FIELDLIST)
total = qset.count()
if lim and int(total)>lim: total=lim
if batch_size>total: batch_size=total

def iter_func():
steps = range(0, total, batch_size)
for step in steps:
yield from q[step:step+batch_size]
# if '_result_cache' in q.__dict__: del q.__dict__['_result_cache']

iterr = progressbar(iter_func(), max_value=total) if progress else iter_func()
# iterate with progress bar
iterr = (
result
for step in range(0, total, batch_size)
for result in qset[step:step+batch_size]
)
if self.progress:
iterr = progressbar(iterr, max_value=total)
yield from iterr


def iter_works(self):
"""
Simply calls iter_solr() with item_type=="work"
"""
yield from self.iter_solr(item_type='work')

# save pages
def iter_pages(self):
"""
Simply calls iter_solr() with item_type=="page"
"""
yield from self.iter_solr(item_type='page')


### saving to file
def save_metadata(self):
"""
Save the work-level metadata as a json file
"""
# get the data from solr
data = list(self.iter_works())

# save if not a dry run
if not self.is_dry_run:
with open(self.path_meta,'w') as of:
json.dump(data, of, indent=2)

def save_pages(self):
"""
Save the page-level metadata as a jsonl file
"""
### save pages
if self.is_dry_run:
# iterate in place
deque(self.iter_pages(), maxlen=0)
else:
# save to jsonl or jsonl.gz
orjsonl.save(self.path_texts, self.iter_pages())




### running script

def handle(self, *args, **options):
"""
Run the command, generating metadata.jsonl and pages.jsonl
"""
# options
path = options['path']
is_dry_run = options['dry_run']
doclimit = options['doc_limit'] if options['doc_limit']>0 else None
progress = options['verbosity']>0
batch_size = options['batch_size']
by_batch = batch_size > 1

# paths
if not is_dry_run:
os.makedirs(path, exist_ok=True)
path_meta = os.path.join(path,'metadata.json')
path_texts = os.path.join(path,'pages.jsonl.gz')
self.path = options['path']
self.path_meta = os.path.join(self.path,'metadata.json')
self.path_texts = os.path.join(self.path,'pages.jsonl.gz')
self.is_dry_run = options['dry_run']
self.doclimit = options['doc_limit'] if options['doc_limit']>0 else None
self.progress = options['verbosity']>0
self.batch_size = options['batch_size']

# ensure path
if not self.is_dry_run:
os.makedirs(self.path, exist_ok=True)

# save metadata
def iter_works():
yield from self.iter_solr(
item_type='work',
lim=doclimit,
progress=progress,
batch_size=batch_size if by_batch else 1000
)

output_ld = list(iter_works())
if not is_dry_run:
with open(path_meta,'w') as of:
json.dump(output_ld, of, indent=2)

self.save_metadata()

# save pages
def iter_pages():
yield from self.iter_solr(
item_type='page',
lim=doclimit,
progress=progress,
batch_size=batch_size if by_batch else 1000
)
### save pages
if is_dry_run:
print('dry run')
for x in iter_pages():
pass
elif not by_batch:
orjsonl.save(path_texts, iter_pages())
# with gzip.open(path_texts,'wt',encoding='utf-8') as of:
# for d in iter_pages():
# of.write(json.dumps(d)+'\n')
else:
with gzip.open(path_texts,'wt',encoding='utf-8') as of:
batch=[]

def save_batch():
outstr='\n'.join(json.dumps(d) for d in batch) + '\n'
of.write(outstr)

for i,d in enumerate(iter_pages()):
if i and not i%batch_size:
save_batch()
batch=[]
batch.append(d)

if batch:
save_batch()
self.save_pages()
6 changes: 3 additions & 3 deletions ppa/archive/tests/test_generate_textcorpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def patched_solr_queryset(mock_solr_queryset):
) as mock_queryset_cls:
mock_qs = mock_queryset_cls.return_value
mock_qs.only.return_value.count.return_value = len(mock_solr_docs)
mock_qs.only.return_value.get_results.return_value = mock_solr_docs
mock_qs.only.return_value.__getitem__.return_value = mock_solr_docs
yield mock_qs


Expand All @@ -68,13 +68,13 @@ def test_save(tmpdir, patched_solr_queryset):
pages_file = tmpdir.dirpath("pages.jsonl")
assert metadata_file.check()
with open(metadata_file) as f: meta=json.load(f)
assert len(meta) == 10
assert len(meta) == 2

def numlines(fngz):
with gzip.open(fngz,'rt',encoding='utf-8') as f:
return sum(1 for ln in f)

assert numlines(pages_file) > 10
assert numlines(pages_file) == 3



Expand Down

0 comments on commit 780e525

Please sign in to comment.