From 780e525e8c0a464f0fcfcb22ba753cedc52cf3d3 Mon Sep 17 00:00:00 2001 From: Ryan Heuser Date: Sun, 10 Dec 2023 08:12:55 -0500 Subject: [PATCH] improvements --- .../commands/generate_textcorpus.py | 196 ++++++++++-------- ppa/archive/tests/test_generate_textcorpus.py | 6 +- 2 files changed, 118 insertions(+), 84 deletions(-) diff --git a/ppa/archive/management/commands/generate_textcorpus.py b/ppa/archive/management/commands/generate_textcorpus.py index 4fe046bd..50826049 100644 --- a/ppa/archive/management/commands/generate_textcorpus.py +++ b/ppa/archive/management/commands/generate_textcorpus.py @@ -4,6 +4,8 @@ into Solr via the **index** manage command. """ +DEFAULT_BATCH_SIZE=10000 + import os import json from django.core.management.base import BaseCommand @@ -14,11 +16,25 @@ import gzip import orjsonl from functools import cached_property +from collections import deque class Command(BaseCommand): - """Custom manage command to generate a text corpus from text indexed in Solr""" + """ + Custom manage command to generate a text corpus from text indexed in Solr. + + Examples: + + python manage.py generate_textcorpus --path ~/ppa_solr_corpus + + python manage.py generate_textcorpus --path ~/ppa_solr_corpus --dry-run + + python manage.py generate_textcorpus --path ~/ppa_solr_corpus --doc-limit 100000 + + python manage.py generate_textcorpus --path ~/ppa_solr_corpus --batch-size 1000 - # fields we want from pages + """ + + # fields we want from pages, in solr syntax: newfieldname:oldfieldname PAGE_FIELDLIST = [ 'page_id:id', 'work_id:group_id_s', @@ -29,12 +45,21 @@ class Command(BaseCommand): 'page_text:content' ] + + # Argument parsing def add_arguments(self, parser): - """Build CLI arguments: --path and --doc-limit""" + """ + Build CLI arguments for command. + """ + + # add --path argument for output storage parser.add_argument( - "--path", required=True, help="Directory path to save corpus file(s)." + "--path", + required=True, + help="Directory path to save corpus file(s)." ) + # add --doc-limit argument (determines how many results to retrieve from solr) parser.add_argument( "--doc-limit", type=int, @@ -42,112 +67,121 @@ def add_arguments(self, parser): help="Limit on the number of documents for corpus generation." "The default of -1 considers ALL documents.", ) + + # add --batch-size argument (for solr querying) parser.add_argument( "--batch-size", type=int, - default=1000, - help="Number of docs to save at one time", + default=DEFAULT_BATCH_SIZE, + help="Number of docs to query from solr at one time", ) + + # add --dry-run argument (don't save anything, just iterate) parser.add_argument( "--dry-run", action='store_true', help="Do not save anything, just iterate over solr", ) + + + #### SOLR #### + @cached_property def query_set(self): + """ + A cached representation of the query set; discourages opening multiple connections to database. + """ return SolrQuerySet() - def iter_solr(self, batch_size=1000, item_type='page', lim=None, progress=True): + + def iter_solr(self, item_type='page'): """ - Iterate over solr documents of a certain `item_type` + Iterate over solr documents of a certain `item_type`. """ - def query(order=True): - q=self.query_set.search(item_type=item_type) - if order: q=q.order_by('id') - if item_type=='page': q=q.only(*self.PAGE_FIELDLIST) - return q - - q=query(order=True) - total = q.count() + # get query + batch_size,lim = self.batch_size,self.doclimit + qset=self.query_set.search(item_type=item_type).order_by('id') + if item_type=='page': qset=qset.only(*self.PAGE_FIELDLIST) + total = qset.count() if lim and int(total)>lim: total=lim if batch_size>total: batch_size=total - def iter_func(): - steps = range(0, total, batch_size) - for step in steps: - yield from q[step:step+batch_size] - # if '_result_cache' in q.__dict__: del q.__dict__['_result_cache'] - - iterr = progressbar(iter_func(), max_value=total) if progress else iter_func() + # iterate with progress bar + iterr = ( + result + for step in range(0, total, batch_size) + for result in qset[step:step+batch_size] + ) + if self.progress: + iterr = progressbar(iterr, max_value=total) yield from iterr + def iter_works(self): + """ + Simply calls iter_solr() with item_type=="work" + """ + yield from self.iter_solr(item_type='work') + + # save pages + def iter_pages(self): + """ + Simply calls iter_solr() with item_type=="page" + """ + yield from self.iter_solr(item_type='page') + + + ### saving to file + def save_metadata(self): + """ + Save the work-level metadata as a json file + """ + # get the data from solr + data = list(self.iter_works()) + + # save if not a dry run + if not self.is_dry_run: + with open(self.path_meta,'w') as of: + json.dump(data, of, indent=2) + + def save_pages(self): + """ + Save the page-level metadata as a jsonl file + """ + ### save pages + if self.is_dry_run: + # iterate in place + deque(self.iter_pages(), maxlen=0) + else: + # save to jsonl or jsonl.gz + orjsonl.save(self.path_texts, self.iter_pages()) + + + + + ### running script + def handle(self, *args, **options): """ Run the command, generating metadata.jsonl and pages.jsonl """ # options - path = options['path'] - is_dry_run = options['dry_run'] - doclimit = options['doc_limit'] if options['doc_limit']>0 else None - progress = options['verbosity']>0 - batch_size = options['batch_size'] - by_batch = batch_size > 1 - - # paths - if not is_dry_run: - os.makedirs(path, exist_ok=True) - path_meta = os.path.join(path,'metadata.json') - path_texts = os.path.join(path,'pages.jsonl.gz') + self.path = options['path'] + self.path_meta = os.path.join(self.path,'metadata.json') + self.path_texts = os.path.join(self.path,'pages.jsonl.gz') + self.is_dry_run = options['dry_run'] + self.doclimit = options['doc_limit'] if options['doc_limit']>0 else None + self.progress = options['verbosity']>0 + self.batch_size = options['batch_size'] + + # ensure path + if not self.is_dry_run: + os.makedirs(self.path, exist_ok=True) # save metadata - def iter_works(): - yield from self.iter_solr( - item_type='work', - lim=doclimit, - progress=progress, - batch_size=batch_size if by_batch else 1000 - ) - - output_ld = list(iter_works()) - if not is_dry_run: - with open(path_meta,'w') as of: - json.dump(output_ld, of, indent=2) - + self.save_metadata() # save pages - def iter_pages(): - yield from self.iter_solr( - item_type='page', - lim=doclimit, - progress=progress, - batch_size=batch_size if by_batch else 1000 - ) - ### save pages - if is_dry_run: - print('dry run') - for x in iter_pages(): - pass - elif not by_batch: - orjsonl.save(path_texts, iter_pages()) - # with gzip.open(path_texts,'wt',encoding='utf-8') as of: - # for d in iter_pages(): - # of.write(json.dumps(d)+'\n') - else: - with gzip.open(path_texts,'wt',encoding='utf-8') as of: - batch=[] - - def save_batch(): - outstr='\n'.join(json.dumps(d) for d in batch) + '\n' - of.write(outstr) - - for i,d in enumerate(iter_pages()): - if i and not i%batch_size: - save_batch() - batch=[] - batch.append(d) - - if batch: - save_batch() \ No newline at end of file + self.save_pages() \ No newline at end of file diff --git a/ppa/archive/tests/test_generate_textcorpus.py b/ppa/archive/tests/test_generate_textcorpus.py index 223ab5b0..8d043ac7 100644 --- a/ppa/archive/tests/test_generate_textcorpus.py +++ b/ppa/archive/tests/test_generate_textcorpus.py @@ -56,7 +56,7 @@ def patched_solr_queryset(mock_solr_queryset): ) as mock_queryset_cls: mock_qs = mock_queryset_cls.return_value mock_qs.only.return_value.count.return_value = len(mock_solr_docs) - mock_qs.only.return_value.get_results.return_value = mock_solr_docs + mock_qs.only.return_value.__getitem__.return_value = mock_solr_docs yield mock_qs @@ -68,13 +68,13 @@ def test_save(tmpdir, patched_solr_queryset): pages_file = tmpdir.dirpath("pages.jsonl") assert metadata_file.check() with open(metadata_file) as f: meta=json.load(f) - assert len(meta) == 10 + assert len(meta) == 2 def numlines(fngz): with gzip.open(fngz,'rt',encoding='utf-8') as f: return sum(1 for ln in f) - assert numlines(pages_file) > 10 + assert numlines(pages_file) == 3