Skip to content

Commit

Permalink
[elk] Remove support for arthur
Browse files Browse the repository at this point in the history
This code removes the references, methods depedencies
and config params used to fetch data from arthur. This
change is needed since the integration with arthur will
be implemented in a different way (as discussed in
the roadmap).

Signed-off-by: Valerio Cosentino <[email protected]>
  • Loading branch information
valeriocos authored and zhquan committed Mar 5, 2020
1 parent 28693fc commit 1a746e2
Show file tree
Hide file tree
Showing 60 changed files with 38 additions and 634 deletions.
95 changes: 16 additions & 79 deletions grimoire_elk/elk.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@

import inspect
import logging
import pickle

import redis
from elasticsearch import Elasticsearch

from datetime import datetime
from dateutil import parser

from arthur.common import Q_STORAGE_ITEMS
from perceval.backend import find_signature_parameters, Archive
from perceval.errors import RateLimitError
from grimoirelab_toolkit.datetime import datetime_utcnow
Expand All @@ -48,62 +45,9 @@

requests_ses = grimoire_con()

arthur_items = {} # Hash with tag list with all items collected from arthur queue


def feed_arthur():
""" Feed Ocean with backend data collected from arthur redis queue"""

logger.info("Collecting items from redis queue")

db_url = 'redis://localhost/8'

conn = redis.StrictRedis.from_url(db_url)
logger.debug("Redis connection stablished with {}.".format(db_url))

# Get and remove queued items in an atomic transaction
pipe = conn.pipeline()
pipe.lrange(Q_STORAGE_ITEMS, 0, -1)
pipe.ltrim(Q_STORAGE_ITEMS, 1, 0)
items = pipe.execute()[0]

for item in items:
arthur_item = pickle.loads(item)
if arthur_item['tag'] not in arthur_items:
arthur_items[arthur_item['tag']] = []
arthur_items[arthur_item['tag']].append(arthur_item)

for tag in arthur_items:
logger.debug("Items for {}: {}".format(tag, len(arthur_items[tag])))


def feed_backend_arthur(backend_name, backend_params):
""" Feed Ocean with backend data collected from arthur redis queue"""

# Always get pending items from arthur for all data sources
feed_arthur()

logger.debug("Items available for {}".format(arthur_items.keys()))

# Get only the items for the backend
if not get_connector_from_name(backend_name):
raise RuntimeError("Unknown backend {}".format(backend_name))
connector = get_connector_from_name(backend_name)
klass = connector[3] # BackendCmd for the connector

backend_cmd = init_backend(klass(*backend_params))

tag = backend_cmd.backend.tag
logger.debug("Getting items for {}.".format(tag))

if tag in arthur_items:
logger.debug("Found items for {}.".format(tag))
for item in arthur_items[tag]:
yield item


def feed_backend(url, clean, fetch_archive, backend_name, backend_params,
es_index=None, es_index_enrich=None, project=None, arthur=False,
es_index=None, es_index_enrich=None, project=None,
es_aliases=None, projects_json_repo=None, repo_labels=None):
""" Feed Ocean with backend data """

Expand Down Expand Up @@ -204,28 +148,21 @@ def feed_backend(url, clean, fetch_archive, backend_name, backend_params,
except AttributeError:
latest_items = backend_cmd.parsed_args.latest_items

# fetch params support
if arthur:
# If using arthur just provide the items generator to be used
# to collect the items and upload to Elasticsearch
aitems = feed_backend_arthur(backend_name, backend_params)
ocean_backend.feed(arthur_items=aitems)
else:
params = {}
if latest_items:
params['latest_items'] = latest_items
if category:
params['category'] = category
if branches:
params['branches'] = branches
if filter_classified:
params['filter_classified'] = filter_classified
if from_date and (from_date.replace(tzinfo=None) != parser.parse("1970-01-01")):
params['from_date'] = from_date
if offset:
params['from_offset'] = offset

ocean_backend.feed(**params)
params = {}
if latest_items:
params['latest_items'] = latest_items
if category:
params['category'] = category
if branches:
params['branches'] = branches
if filter_classified:
params['filter_classified'] = filter_classified
if from_date and (from_date.replace(tzinfo=None) != parser.parse("1970-01-01")):
params['from_date'] = from_date
if offset:
params['from_offset'] = offset

ocean_backend.feed(**params)

except RateLimitError as ex:
logger.error("Error feeding raw from {} ({}): rate limit exceeded".format(backend_name, backend.origin))
Expand Down
1 change: 0 additions & 1 deletion grimoire_elk/enriched/enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ def __init__(self, db_sortinghat=None, db_projects_map=None, json_projects_map=N

# params used to configure the backend
# in perceval backends managed directly inside the backend
# in twitter and others managed in arthur logic
self.backend_params = None
# Label used during enrichment for identities without a known affiliation
self.unaffiliated_group = 'Unknown'
Expand Down
10 changes: 0 additions & 10 deletions grimoire_elk/raw/dockerhub.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,3 @@ def get_perceval_params_from_url(cls, url):
params = url.split()

return params

@classmethod
def get_arthur_params_from_url(cls, url):
# In the url the org and the repository are included

params = url.split()
""" Get the arthur params given a URL for the data source """
params = {"owner": params[0], "repository": params[1]}

return params
13 changes: 2 additions & 11 deletions grimoire_elk/raw/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,6 @@ def get_perceval_params_from_url(cls, url):
""" Get the perceval params given a URL for the data source """
return [url]

@classmethod
def get_arthur_params_from_url(cls, url):
""" Get the arthur params given a URL for the data source """
return {"uri": url, "url": url}

def drop_item(self, item):
""" Drop items not to be inserted in Elastic """
return False
Expand All @@ -155,17 +150,13 @@ def add_update_date(self, item):
item['metadata__timestamp'] = timestamp.isoformat()

def feed(self, from_date=None, from_offset=None, category=None, branches=None,
latest_items=None, arthur_items=None, filter_classified=None):
""" Feed data in Elastic from Perceval or Arthur """
latest_items=None, filter_classified=None):
"""Feed data in Elastic from Perceval"""

if self.fetch_archive:
items = self.perceval_backend.fetch_from_archive()
self.feed_items(items)
return
elif arthur_items:
items = arthur_items
self.feed_items(items)
return

if from_date and from_offset:
raise RuntimeError("Can't not feed using from_date and from_offset.")
Expand Down
20 changes: 4 additions & 16 deletions grimoire_elk/raw/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,27 +88,15 @@ class GitHubOcean(ElasticOcean):

mapping = Mapping

@classmethod
def get_arthur_params_from_url(cls, url):
""" Get the arthur params given a URL for the data source """
params = {}

owner = url.split('/')[-2]
repository = url.split('/')[-1]
# params.append('--owner')
params['owner'] = owner
# params.append('--repository')
params['repository'] = repository
return params

@classmethod
def get_perceval_params_from_url(cls, url):
""" Get the perceval params given a URL for the data source """
params = []

dparam = cls.get_arthur_params_from_url(url)
params.append(dparam['owner'])
params.append(dparam['repository'])
owner = url.split('/')[-2]
repository = url.split('/')[-1]
params.append(owner)
params.append(repository)
return params

def _fix_item(self, item):
Expand Down
19 changes: 0 additions & 19 deletions grimoire_elk/raw/gitlab.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
# Valerio Cosentino <[email protected]>
#

from perceval.backends.core.gitlab import GitLabCommand

from .elastic import ElasticOcean
from ..elastic_mapping import Mapping as BaseMapping

Expand Down Expand Up @@ -72,23 +70,6 @@ class GitLabOcean(ElasticOcean):

mapping = Mapping

@classmethod
def get_arthur_params_from_url(cls, url):
""" Get the arthur params given a URL for the data source """
params = {}

args = cls.get_perceval_params_from_url(url)
parser = GitLabCommand.setup_cmd_parser()

parsed_args = parser.parse(*args)

params['owner'] = parsed_args.owner
params['repository'] = parsed_args.repository
# include only blacklist ids information
params['blacklist_ids'] = parsed_args.blacklist_ids

return params

@classmethod
def get_perceval_params_from_url(cls, url):
""" Get the perceval params given a URL for the data source """
Expand Down
5 changes: 1 addition & 4 deletions grimoire_elk/raw/google_hits.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,4 @@
class GoogleHitsOcean(ElasticOcean):
"""GoogleHits Ocean feeder"""

@classmethod
def get_arthur_params_from_url(cls, url):
""" Get the arthur params given a URL for the data source """
return url
pass
7 changes: 0 additions & 7 deletions grimoire_elk/raw/groupsio.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,3 @@ class GroupsioOcean(MBoxOcean):
"""Groups.io Ocean feeder"""

mapping = Mapping

@classmethod
def get_arthur_params_from_url(cls, url):
# In the url the uri and the data dir are included
params = url.split()

return {"group_name": params[0], "dirpath": "/tmp"}
8 changes: 0 additions & 8 deletions grimoire_elk/raw/jira.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,6 @@ class JiraOcean(ElasticOcean):

mapping = Mapping

@classmethod
def get_arthur_params_from_url(cls, url):
""" Get the arthur params given a URL for the data source """

tokens = url.split(' ', 1)

return {"url": tokens[0]}

def _fix_item(self, item):
# Remove all custom fields to avoid the 1000 fields limit in ES

Expand Down
15 changes: 4 additions & 11 deletions grimoire_elk/raw/mattermost.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,9 @@ def get_perceval_params_from_url(cls, url):
""" Get the perceval params given a URL for the data source """
params = []

dparam = cls.get_arthur_params_from_url(url)
params.append(dparam['url'])
params.append(dparam['channel'])
return params

@classmethod
def get_arthur_params_from_url(cls, url):
# The URL is the mattermost URL and the [id of the channel
data = url.split()

params = {"url": data[0], "channel": data[1]}

url = data[0]
channel = data[1]
params.append(url)
params.append(channel)
return params
10 changes: 0 additions & 10 deletions grimoire_elk/raw/mbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,6 @@ def get_perceval_params_from_url(cls, url):

return params

@classmethod
def get_arthur_params_from_url(cls, url):
# In the url the dirpath and the repository are included

params = url.split()
""" Get the arthur params given a URL for the data source """
params = {"dirpath": params[1], "uri": params[0]}

return params

def _fix_item(self, item):
# Remove all custom fields to avoid the 1000 fields limit in ES

Expand Down
13 changes: 3 additions & 10 deletions grimoire_elk/raw/mediawiki.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,8 @@ def get_perceval_params_from_url(cls, urls):
""" Get the perceval params given the URLs for the data source """
params = []

dparam = cls.get_arthur_params_from_url(urls)
params.append(dparam["url"])

return params

@classmethod
def get_arthur_params_from_url(cls, urls):
# The URLs are the API URL and the wiki URL
# We just need the API URL
data = urls.split()
url = data[0]
params.append(url)

return {"url": data[0]}
return params
8 changes: 0 additions & 8 deletions grimoire_elk/raw/meetup.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,3 @@ def get_perceval_params_from_url(cls, url):
params.append(url)

return params

@classmethod
def get_arthur_params_from_url(cls, url):
# The URL is directly the meetup group

params = {"group": url}

return params
10 changes: 0 additions & 10 deletions grimoire_elk/raw/nntp.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,6 @@ def get_perceval_params_from_url(cls, url):

return params

@classmethod
def get_arthur_params_from_url(cls, url):
# In the url the NNTP host and the group are included

params = url.split()
""" Get the arthur params given a URL for the data source """
params = {"host": params[0], "group": params[1]}

return params

def _fix_item(self, item):
# Remove all custom fields to avoid the 1000 fields limit in ES

Expand Down
7 changes: 0 additions & 7 deletions grimoire_elk/raw/pipermail.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,3 @@ class PipermailOcean(MBoxOcean):
"""Pipermail Ocean feeder"""

mapping = Mapping

@classmethod
def get_arthur_params_from_url(cls, url):
# In the url the uri and the data dir are included
params = url.split()

return {"url": params[0], "dirpath": "/tmp"}
8 changes: 0 additions & 8 deletions grimoire_elk/raw/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,6 @@ class SlackOcean(ElasticOcean):

mapping = Mapping

@classmethod
def get_arthur_params_from_url(cls, url):
# The URL is directly the slack group

params = {"channel": url}

return params

def _fix_item(self, item):
if 'channel_info' in item['data']:
item['data']['channel_info'].pop('previous_names', None)
Loading

0 comments on commit 1a746e2

Please sign in to comment.