Skip to content

Commit

Permalink
Merge pull request #95 from TogetherCrew/feat/77-graph-analytics-plat…
Browse files Browse the repository at this point in the history
…form-agnostic

Feat/77 graph analytics platform agnostic
  • Loading branch information
cyri113 authored Jul 1, 2024
2 parents b242c6a + effc3f1 commit 5842533
Show file tree
Hide file tree
Showing 64 changed files with 1,983 additions and 2,808 deletions.
16 changes: 7 additions & 9 deletions automation/automation_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,14 @@
from pybars import Compiler
from tc_messageBroker.rabbit_mq.event import Event
from tc_messageBroker.rabbit_mq.queue import Queue
from utils.get_guild_utils import get_guild_platform_id


class AutomationWorkflow(AutomationBase):
def __init__(self) -> None:
super().__init__()
self.automation_db = AutomationDB()

def start(self, guild_id: str):
def start(self, platform_id: str, guild_id: str):
"""
start the automation workflow for a guild
Expand Down Expand Up @@ -48,7 +47,7 @@ def start(self, guild_id: str):
members_by_category[category] = []

users1, users2 = self._get_users_from_memberactivities(
guild_id, category
platform_id, category
)
users = self._subtract_users(users1, users2)

Expand Down Expand Up @@ -81,7 +80,7 @@ def start(self, guild_id: str):
compiled_message = action.template

data = self._prepare_saga_data(
guild_id, user_id, compiled_message
platform_id, user_id, compiled_message
)
saga_id = self._create_manual_saga(data)
logging.info(
Expand Down Expand Up @@ -111,7 +110,7 @@ def start(self, guild_id: str):

for recipent in at.report.recipientIds:
data = self._prepare_saga_data(
guild_id, recipent, compiled_message
platform_id, recipent, compiled_message
)
saga_id = self._create_manual_saga(data)

Expand Down Expand Up @@ -181,21 +180,20 @@ def _compile_message(self, data: dict[str, str], message: str) -> str:
return compiled_message

def _prepare_saga_data(
self, guild_id: str, user_id: str, message: str
self, platform_id: str, user_id: str, message: str
) -> dict[str, Any]:
"""
prepare the data needed for the saga
Parameters:
------------
guild_id : str
the guild_id having the user
platform_id : str
the platform_id having the user
user_id : str
the user_id to send message
message : str
the message to send the user
"""
platform_id = get_guild_platform_id(guild_id)
data = {
"platformId": platform_id,
"created": False,
Expand Down
16 changes: 8 additions & 8 deletions automation/utils/automation_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ def _get_users_from_guildmembers(
return users_data

def _get_users_from_memberactivities(
self, guild_id: str, category: str
self, db_name: str, category: str
) -> tuple[list[str], list[str]]:
"""
get the users of memberactivities within a specific memberactivities
the users from previous day and previous two days
Parameters:
-------------
guild_id : str
the guild id to get people's id
db_name : str
the database to get people's id
category : str
the category of memberactivities
Expand All @@ -91,13 +91,13 @@ def _get_users_from_memberactivities(
)

users = (
self.mongo_client[guild_id]["memberactivities"]
self.mongo_client[db_name]["memberactivities"]
.find(
{
"$or": [
{"date": date_yesterday},
{"date": date_two_past_days},
]
"date": {
"$gte": date_two_past_days,
"$lte": date_yesterday,
}
},
projection,
)
Expand Down
40 changes: 20 additions & 20 deletions discord_analyzer/DB_operations/mongo_neo4j_ops.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import logging

from discord_analyzer.DB_operations.mongodb_interaction import MongoDBOps
from discord_analyzer.DB_operations.network_graph import make_neo4j_networkx_query_dict
from discord_analyzer.DB_operations.network_graph import NetworkGraph
from tc_neo4j_lib.neo4j_ops import Neo4jOps, Query
from discord_analyzer.schemas import GraphSchema


class MongoNeo4jDB:
Expand Down Expand Up @@ -30,8 +31,8 @@ def set_mongo_db_ops(self):
def store_analytics_data(
self,
analytics_data: dict,
guild_id: str,
community_id: str,
platform_id: str,
graph_schema: GraphSchema,
remove_memberactivities: bool = False,
remove_heatmaps: bool = False,
):
Expand All @@ -46,10 +47,10 @@ def store_analytics_data(
values of the heatmaps is a list of dictinoaries
and memberactivities is a tuple of memberactivities dictionary list
and memebractivities networkx object dictionary list
guild_id: str
platform_id: str
what the data is related to
community_id : str
the community id to save the data for
graph_schema : GraphSchema
the schema for graph to be saved
remove_memberactivities : bool
remove the whole memberactivity data and insert
default is `False` which means don't delete the existing data
Expand All @@ -69,7 +70,7 @@ def store_analytics_data(
if not self.testing:
# mongodb transactions
self.mongoOps._do_analytics_write_transaction(
guildId=guild_id,
platform_id=platform_id,
delete_heatmaps=remove_heatmaps,
delete_member_acitivities=remove_memberactivities,
acitivties_list=memberactivities_data,
Expand All @@ -81,28 +82,27 @@ def store_analytics_data(
memberactivities_networkx_data is not None
and memberactivities_networkx_data != []
):
queries_list = make_neo4j_networkx_query_dict(
network_graph = NetworkGraph(graph_schema, platform_id)
queries_list = network_graph.make_neo4j_networkx_query_dict(
networkx_graphs=memberactivities_networkx_data,
guildId=guild_id,
community_id=community_id,
)
self.run_operations_transaction(
guildId=guild_id,
platform_id=platform_id,
queries_list=queries_list,
remove_memberactivities=remove_memberactivities,
)
else:
logging.warning("Testing mode enabled! Not saving any data")

def run_operations_transaction(
self, guildId: str, queries_list: list[Query], remove_memberactivities: bool
self, platform_id: str, queries_list: list[Query], remove_memberactivities: bool
) -> None:
"""
do the deletion and insertion operations inside a transaction
Parameters:
------------
guildId : str
platform_id : str
the guild id that the users are connected to it
which we're going to delete the relations of it
queries_list : list
Expand All @@ -111,15 +111,15 @@ def run_operations_transaction(
remove_memberactivities : bool
if True, remove the old data specified in that guild
"""
self.guild_msg = f"GUILDID: {guildId}:"
self.guild_msg = f"platform_id: {platform_id}:"

transaction_queries: list[Query] = []
if remove_memberactivities:
logging.info(
f"{self.guild_msg} Neo4J GuildId accounts relation will be removed!"
f"{self.guild_msg} Neo4J platform_id accounts relation will be removed!"
)
delete_relationship_query = self._create_guild_rel_deletion_query(
guildId=guildId
platform_id=platform_id
)
transaction_queries.append(delete_relationship_query)

Expand All @@ -128,15 +128,15 @@ def run_operations_transaction(
self.neo4j_ops.run_queries_in_batch(transaction_queries, message=self.guild_msg)

def _create_guild_rel_deletion_query(
self, guildId: str, relation_name: str = "INTERACTED_WITH"
self, platform_id: str, relation_name: str = "INTERACTED_WITH"
) -> Query:
"""
create a query to delete the relationships
between DiscordAccount users in a specific guild
Parameters:
-------------
guildId : str
platform_id : str
the guild id that the users are connected to it
relation_name : str
the relation we want to delete
Expand All @@ -149,12 +149,12 @@ def _create_guild_rel_deletion_query(
query_str = f"""
MATCH
(:DiscordAccount)
-[r:{relation_name} {{guildId: '{guildId}'}}]-(:DiscordAccount)
-[r:{relation_name} {{platformId: '{platform_id}'}}]-(:DiscordAccount)
DETACH DELETE r"""

parameters = {
"relation_name": relation_name,
"guild_id": guildId,
"platform_id": platform_id,
}

query = Query(
Expand Down
52 changes: 27 additions & 25 deletions discord_analyzer/DB_operations/mongodb_interaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ def __init__(self):
self.DB_access = DB_access
self.guild_msg = ""

def set_mongo_db_access(self, guildId=None):
def set_mongo_db_access(self, platform_id=None):
"""
set a database access to a specific guild
if guildId was `None` then the mongo_db_access just
if platform_id was `None` then the mongo_db_access just
have the `db_mongo_client` to use
but if wasn't then mongo_db_access
would also have db_client which is connected to a guild
"""
self.mongo_db_access = self.DB_access(db_name=guildId)
self.guild_msg = f"GUILDID: {guildId}:"
self.mongo_db_access = self.DB_access(db_name=platform_id)
self.guild_msg = f"PLATFORMID: {platform_id}:"

def _do_analytics_write_transaction(
self,
guildId,
platform_id,
delete_heatmaps,
delete_member_acitivities,
acitivties_list,
Expand All @@ -53,7 +53,7 @@ def _do_analytics_write_transaction(
def callback_wrapper(session):
self._session_custom_transaction(
session,
guildId,
platform_id,
delete_heatmaps,
delete_member_acitivities,
acitivties_list,
Expand All @@ -71,7 +71,7 @@ def callback_wrapper(session):
def _session_custom_transaction(
self,
session,
guildId,
platform_id,
delete_heatmaps,
delete_member_acitivities,
memberactiivties_list,
Expand All @@ -84,35 +84,37 @@ def _session_custom_transaction(
also insertion of activities_list and heatmaps_list after
"""
self.guild_msg = f"GUILDID: {guildId}:"
self.guild_msg = f"PLATFORMID: {platform_id}:"

if delete_heatmaps:
logging.info(f"{self.guild_msg} Removing Heatmaps data!")
self.empty_collection(session=session, guildId=guildId, activity="heatmaps")
self.empty_collection(
session=session, platform_id=platform_id, activity="heatmaps"
)
if delete_member_acitivities:
logging.info(f"{self.guild_msg} Removing MemberActivities MongoDB data!")
self.empty_collection(
session=session, guildId=guildId, activity="memberactivities"
session=session, platform_id=platform_id, activity="memberactivities"
)

if memberactiivties_list is not None and memberactiivties_list != []:
self.insert_into_memberactivities_batches(
session=session,
acitivities_list=memberactiivties_list,
guildId=guildId,
platform_id=platform_id,
batch_size=batch_size,
)

if heatmaps_list is not None and heatmaps_list != []:
self.insert_into_heatmaps_batches(
session=session,
heatmaps_list=heatmaps_list,
guildId=guildId,
platform_id=platform_id,
batch_size=batch_size,
)

def insert_into_memberactivities_batches(
self, session, acitivities_list, guildId, batch_size=1000
self, session, acitivities_list, platform_id, batch_size=1000
):
"""
insert data into memberactivities collection of mongoDB in batches
Expand All @@ -124,10 +126,10 @@ def insert_into_memberactivities_batches(
batch_size : int
the count of data in batches
default is 1000
guildId : str
the guildId to insert data to it
platform_id : str
the platform_id to insert data to it
"""
memberactivities_collection = session.client[guildId].memberactivities
memberactivities_collection = session.client[platform_id].memberactivities
self._batch_insertion(
collection=memberactivities_collection,
data=acitivities_list,
Expand All @@ -136,7 +138,7 @@ def insert_into_memberactivities_batches(
)

def insert_into_heatmaps_batches(
self, session, heatmaps_list, guildId, batch_size=1000
self, session, heatmaps_list, platform_id, batch_size=1000
):
"""
insert data into heatmaps collection of mongoDB in batches
Expand All @@ -148,10 +150,10 @@ def insert_into_heatmaps_batches(
batch_size : int
the count of data in batches
default is 1000
guildId : str
the guildId to insert data to it
platform_id : str
the platform_id to insert data to it
"""
heatmaps_collection = session.client[guildId].heatmaps
heatmaps_collection = session.client[platform_id].heatmaps

self._batch_insertion(
heatmaps_collection,
Expand Down Expand Up @@ -182,16 +184,16 @@ def _batch_insertion(self, collection, data, message, batch_size):
logging.info(f"{message}: Batch {loop_idx + 1}/{batch_count}")
collection.insert_many(data[batch_idx : batch_idx + batch_size])

def empty_collection(self, session, guildId, activity):
def empty_collection(self, session, platform_id, activity):
"""
empty a specified collection
Parameters:
-------------
session : mongoDB session
the session to needed to delete the data
guildId : str
the guildId to remove its collection data
platform_id : str
the platform_id to remove its collection data
activity : str
`memberactivities` or `heatmaps` or other collections
the collection to access and delete its data
Expand All @@ -201,9 +203,9 @@ def empty_collection(self, session, guildId, activity):
`None`
"""
if activity == "heatmaps":
collection = session.client[guildId].heatmaps
collection = session.client[platform_id].heatmaps
elif activity == "memberactivities":
collection = session.client[guildId].memberactivities
collection = session.client[platform_id].memberactivities
else:
raise NotImplementedError(
"removing heatmaps or memberactivities are just implemented!"
Expand Down
Loading

0 comments on commit 5842533

Please sign in to comment.