Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/parallelize ngrams #1267

Merged
merged 15 commits into from
Sep 22, 2023
171 changes: 76 additions & 95 deletions backend/visualization/ngram.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,20 @@
from collections import Counter

import numpy as np

from addcorpus.models import CorpusConfiguration
from datetime import datetime
from es.search import get_index, search
from ianalyzer.elasticsearch import elasticsearch
from visualization import query, termvectors
from es import download


def get_ngrams(es_query, corpus, field,
ngram_size=2, positions='any', freq_compensation=True, subfield='none', max_size_per_interval=50,
number_of_ngrams=10, date_field = 'date'):
def get_ngrams(results, number_of_ngrams=10):
"""Given a query and a corpus, get the words that occurred most frequently around the query term"""
ngrams = []
ngrams = get_top_n_ngrams(results, number_of_ngrams)

bins = get_time_bins(es_query, corpus)
time_labels = [format_time_label(start_year, end_year) for start_year, end_year in bins]

positions_dict = {
'any': list(range(ngram_size)),
'first': [0],
'second': [1],
'third': [2],
'fourth': [3],
}
term_positions = positions_dict[positions]

# find ngrams

docs, total_frequencies = tokens_by_time_interval(
corpus, es_query, field, bins, ngram_size, term_positions, freq_compensation, subfield, max_size_per_interval,
date_field
)
if freq_compensation:
ngrams = get_top_n_ngrams(docs, total_frequencies, number_of_ngrams)
else:
ngrams = get_top_n_ngrams(docs, dict(), number_of_ngrams)

return { 'words': ngrams, 'time_points' : time_labels }
return { 'words': ngrams, 'time_points': sorted([r['time_interval'] for r in results]) }


def format_time_label(start_year, end_year):
Expand Down Expand Up @@ -95,105 +74,107 @@ def get_time_bins(es_query, corpus):
return bins


def tokens_by_time_interval(corpus, es_query, field, bins, ngram_size, term_positions, freq_compensation, subfield, max_size_per_interval, date_field):
def tokens_by_time_interval(corpus, es_query, field, bin, ngram_size, term_position, freq_compensation, subfield, max_size_per_interval, date_field):
index = get_index(corpus)
client = elasticsearch(corpus)
ngrams_per_bin = []
positions_dict = {
'any': list(range(ngram_size)),
'first': [0],
'second': [1],
'third': [2],
'fourth': [3],
}
term_positions = positions_dict[term_position]
ngram_ttfs = dict()

query_text = query.get_query_text(es_query)
field = field if subfield == 'none' else '.'.join([field, subfield])

for (start_year, end_year) in bins:
start_date = datetime(start_year, 1, 1)
end_date = datetime(end_year, 12, 31)

# filter query on this time bin
date_filter = query.make_date_filter(start_date, end_date, date_field)
narrow_query = query.add_filter(es_query, date_filter)

#search for the query text
search_results = search(
corpus=corpus,
query_model = narrow_query,
client = client,
size = max_size_per_interval,
start_date = datetime(bin[0], 1, 1)
end_date = datetime(bin[1], 12, 31)

# filter query on this time bin
date_filter = query.make_date_filter(start_date, end_date, date_field)
narrow_query = query.add_filter(es_query, date_filter)
#search for the query text
search_results = search(
corpus=corpus,
query_model = narrow_query,
client = client,
size = max_size_per_interval,
)
bin_ngrams = Counter()
for hit in search_results['hits']['hits']:
identifier = hit['_id']
# get the term vectors for the hit
result = client.termvectors(
index=index,
id=identifier,
term_statistics=freq_compensation,
fields = [field]
)

bin_ngrams = Counter()

for hit in search_results['hits']['hits']:
identifier = hit['_id']

# get the term vectors for the hit
result = client.termvectors(
index=index,
id=identifier,
term_statistics=freq_compensation,
fields = [field]
)

terms = termvectors.get_terms(result, field)

if terms:
sorted_tokens = termvectors.get_tokens(terms, sort=True)

for match_start, match_stop, match_content in termvectors.token_matches(sorted_tokens, query_text, index, field, client):
for j in term_positions:
start = match_start - j
stop = match_stop - 1 - j + ngram_size
if start >= 0 and stop <= len(sorted_tokens):
ngram = sorted_tokens[start:stop]
words = ' '.join([token['term'] for token in ngram])
terms = termvectors.get_terms(result, field)
if terms:
sorted_tokens = termvectors.get_tokens(terms, sort=True)
for match_start, match_stop, match_content in termvectors.token_matches(sorted_tokens, query_text, index, field, client):
for j in term_positions:
start = match_start - j
stop = match_stop - 1 - j + ngram_size
if start >= 0 and stop <= len(sorted_tokens):
ngram = sorted_tokens[start:stop]
words = ' '.join([token['term'] for token in ngram])
if freq_compensation:
ttf = sum(token['ttf'] for token in ngram) / len(ngram)
ngram_ttfs[words] = ttf
bin_ngrams.update({ words: 1})

# output per bin: all tokens from this time interval
ngrams_per_bin.append(bin_ngrams)

return ngrams_per_bin, ngram_ttfs
bin_ngrams.update({ words: 1})

results = {
'time_interval': format_time_label(bin[0], bin[1]),
'ngrams': bin_ngrams
}
if freq_compensation:
results['ngram_ttfs'] = ngram_ttfs
return results


def get_top_n_ngrams(counters, total_frequencies = None, number_of_ngrams=10):
def get_top_n_ngrams(results, number_of_ngrams=10):
"""
Converts a list of documents with tokens into n dataseries, listing the
frequency of the top n tokens and their frequency in each document.

Input:
- `docs`: a list of Counter objects with ngram frequencies. The division into counters reflects how the data is grouped,
i.e. by time interval. Each counter object reflects how often ngram tokens have been observed per interval. Presumably,
each token is a string containing an ngram.
but can be any immutable object. The division into documents reflects how the data is grouped (e.g. by time interval).
- `total_frequencies`: dict or `None`. If a dict, it should give the total frequency for every ngram that features in `docs`. In
practice, this is the average frequency of each word in the ngram. If the dict is provided, the frequency of the ngram will be divided
by it.
- `results`: a list of dictionaries with the following fields:
'ngram': Counter objects with ngram frequencies
'time_interval': the time intervals for which the ngrams were counted
(optional): 'ngram-ttf': averaged total term frequencies - only computed if freq_compensation was requested
- `number_of_ngrams`: the number of top ngrams to return

Output:
A list of 10 data series. Each series is a dict with two keys: `'label'` contains the content of a token (presumably an
A list of number_of_ngrams data series. Each series is a dict with two keys: `'label'` contains the content of a token (presumably an
ngram string), `'data'` contains a list of the frequency of that token in each document. Depending on `divide_by_ttf`,
this is absolute or relative to the total term frequencies provided.
"""

total_counter = Counter()
for c in counters:
total_counter.update(c)
for r in results:
total_counter.update(r['ngrams'])
sorted_results = sorted(results, key=lambda r: r['time_interval'])

number_of_results = min(number_of_ngrams, len(total_counter))

if total_frequencies:
def frequency(ngram, counter): return counter[ngram] / total_frequencies[ngram]
if 'ngram_ttfs' in results[0]:
total_frequencies = {}
for r in results:
total_frequencies.update(r['ngram_ttfs'])
def frequency(ngram, counter): return counter.get(ngram, 0.0) / max(1.0, total_frequencies[ngram])
def overall_frequency(ngram): return frequency(ngram, total_counter)
top_ngrams = sorted(total_counter.keys(), key=overall_frequency, reverse=True)[:number_of_results]
else:
def frequency(ngram, counter): return counter[ngram]
def frequency(ngram, counter): return counter.get(ngram, 0)
top_ngrams = [word for word, freq in total_counter.most_common(number_of_results)]


output = [{
'label': ngram,
'data': [frequency(ngram, c)
for c in counters]
'data': [frequency(ngram, c['ngrams'])
for c in sorted_results]
}
for ngram in top_ngrams]

Expand Down
48 changes: 33 additions & 15 deletions backend/visualization/tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from celery import shared_task, group
from celery import chord, group, shared_task
from django.conf import settings
from visualization import wordcloud, ngram, term_frequency
from es import download as es_download
Expand All @@ -9,21 +9,39 @@ def get_wordcloud_data(request_json):
word_counts = wordcloud.make_wordcloud_data(list_of_texts, request_json['field'], request_json['corpus'])
return word_counts

@shared_task()
def get_ngram_data(request_json):
return ngram.get_ngrams(
request_json['es_query'],
request_json['corpus_name'],
request_json['field'],
ngram_size=request_json['ngram_size'],
positions=request_json['term_position'],
freq_compensation=request_json['freq_compensation'],
subfield=request_json['subfield'],
max_size_per_interval=request_json['max_size_per_interval'],
number_of_ngrams=request_json['number_of_ngrams'],
date_field = request_json['date_field']
)
@shared_task
def get_ngram_data_bin(**kwargs):
return ngram.tokens_by_time_interval(**kwargs)

@shared_task
def integrate_ngram_results(results, **kwargs):
return ngram.get_ngrams(results, **kwargs)

def ngram_data_tasks(request_json):
corpus = request_json['corpus_name']
es_query = request_json['es_query']
freq_compensation = request_json['freq_compensation']
bins = ngram.get_time_bins(es_query, corpus)

return chord(group([
get_ngram_data_bin.s(
corpus=corpus,
es_query=es_query,
field=request_json['field'],
bin=b,
ngram_size=request_json['ngram_size'],
term_position=request_json['term_position'],
freq_compensation=freq_compensation,
subfield=request_json['subfield'],
max_size_per_interval=request_json['max_size_per_interval'],
date_field=request_json['date_field']
)
for b in bins
]), integrate_ngram_results.s(
number_of_ngrams=request_json['number_of_ngrams']
)
)()

@shared_task()
def get_histogram_term_frequency_bin(es_query, corpus_name, field_name, field_value, size, include_query_in_result = False):
'''
Expand Down
45 changes: 28 additions & 17 deletions backend/visualization/tests/test_ngrams.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from random import sample
from typing import Counter
from visualization import query, ngram
from visualization.tests.mock_corpora.small_mock_corpus import SmallMockCorpus
from datetime import datetime, date
import pytest

Expand Down Expand Up @@ -83,7 +81,7 @@ def test_top_10_ngrams():
['a', 'c']
]

counts = [Counter(doc) for doc in docs]
time_intervals = ['1820-1830','1830-1840','1840-1850']

target_data = {
'a': [1, 1, 1],
Expand All @@ -96,14 +94,15 @@ def test_top_10_ngrams():
'b': 200,
'c': 150,
}
test_results = [{'ngrams': Counter(doc), 'time_interval': time_intervals[i]} for i, doc in enumerate(docs)]

output_absolute = ngram.get_top_n_ngrams(counts)
output_absolute = ngram.get_top_n_ngrams(test_results)
for word in target_data:
dataset_absolute = next(series for series in output_absolute if series['label'] == word)
assert dataset_absolute['data'] == target_data[word]


output_relative = ngram.get_top_n_ngrams(counts, ttf)
[r.update({'ngram_ttfs': ttf}) for r in test_results]
output_relative = ngram.get_top_n_ngrams(test_results)

for word in target_data:
dataset_relative = next(series for series in output_relative if series['label'] == word)
Expand All @@ -112,14 +111,17 @@ def test_top_10_ngrams():
for w in target_data }
assert dataset_relative['data'] == relative_frequencies[word]


def get_binned_results(corpus, query, time_bins=CENTURY_BINS, ngram_size=2, term_position='any', freq_compensation=None, subfield='none', max_size_per_interval=20, date_field='date'):
return [
ngram.tokens_by_time_interval(
corpus, query, 'content', bin, ngram_size, term_position, freq_compensation, subfield, max_size_per_interval, date_field)
for bin in time_bins
]

def test_absolute_bigrams(small_mock_corpus, index_small_mock_corpus, basic_query):
# search for a word that occurs a few times
frequent_query = query.set_query_text(basic_query, 'to')



# expected bigram frequencies
bigrams = [
{
Expand Down Expand Up @@ -160,12 +162,13 @@ def test_absolute_bigrams(small_mock_corpus, index_small_mock_corpus, basic_quer
}
]

result = ngram.get_ngrams(frequent_query, small_mock_corpus, 'content', freq_compensation=False)
results = get_binned_results(small_mock_corpus, frequent_query)

assert result['time_points'] == ['{}-{}'.format(start, end) for start, end in CENTURY_BINS]
assert sorted([r['time_interval'] for r in results]) == sorted(['{}-{}'.format(start, end) for start, end in CENTURY_BINS])

integrated_results = ngram.get_ngrams(results)
for bigram in bigrams:
data = next((item for item in result['words'] if item['label'] == bigram['label']), None)
data = next((item for item in integrated_results['words'] if item['label'] == bigram['label']), None)
assert data

for bin, freq in enumerate(data['data']):
Expand All @@ -174,7 +177,7 @@ def test_absolute_bigrams(small_mock_corpus, index_small_mock_corpus, basic_quer
else:
assert freq == 0

def test_bigrams_with_quote(small_mock_corpus, index_small_mock_corpus, basic_query):
def test_bigrams_with_quote(small_mock_corpus, basic_query):
cases = [
{
'query': '"to hear"',
Expand Down Expand Up @@ -204,7 +207,7 @@ def test_bigrams_with_quote(small_mock_corpus, index_small_mock_corpus, basic_qu
# search for a word that occurs a few times
case_query = query.set_query_text(basic_query, case['query'])

result = ngram.get_ngrams(case_query, small_mock_corpus, 'content', freq_compensation=False)
result = ngram.get_ngrams(get_binned_results(small_mock_corpus, case_query))

ngrams = case['ngrams']

Expand Down Expand Up @@ -259,8 +262,16 @@ def test_number_of_ngrams(small_mock_corpus, index_small_mock_corpus, basic_quer

max_frequency = 6

for size in range(1, max_frequency + 2):
result = ngram.get_ngrams(frequent_query, small_mock_corpus, 'content', number_of_ngrams= size)
for number_of_ngrams in range(1, max_frequency + 2):
result = ngram.get_ngrams(get_binned_results(small_mock_corpus, frequent_query), number_of_ngrams=number_of_ngrams)
series = result['words']

assert len(series) == min(max_frequency, size)
assert len(series) == min(max_frequency, number_of_ngrams)

def test_freq_compensation(small_mock_corpus, index_small_mock_corpus, basic_query):
frequent_query = query.set_query_text(basic_query, 'to')
results = get_binned_results(small_mock_corpus, frequent_query, freq_compensation=True)
top_grams = ngram.get_top_n_ngrams(results)
assert top_grams


Loading