Skip to content

Commit

Permalink
Merge pull request #7 from TogetherCrew/feat/5-improve-heatmaps-compu…
Browse files Browse the repository at this point in the history
…tation-time

fix: Heatmaps, more efficient querying the database!
  • Loading branch information
amindadgar authored Jul 16, 2024
2 parents cd6be24 + 983954f commit 6511459
Show file tree
Hide file tree
Showing 11 changed files with 411 additions and 84 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="tc-analyzer-lib",
version="1.2.0",
version="1.2.1",
author="Mohammad Amin Dadgar, TogetherCrew",
maintainer="Mohammad Amin Dadgar",
maintainer_email="[email protected]",
Expand Down
23 changes: 14 additions & 9 deletions tc_analyzer_lib/metrics/heatmaps/analytics_hourly.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,10 @@ def analyze(
activity_direction : str
should be always either `emitter` or `receiver`
**kwargs :
additional_filters : dict[str, str]
the additional filtering for `rawmemberactivities` data of each platform
the keys could be `metadata.channel_id` with a specific value
resource_filtering : dict[str, str]
a filtering applied for resources on data
"""
additional_filters: dict[str, str] = kwargs.get("additional_filters", {})
resource_filtering: dict[str, str] = kwargs.get("resource_filtering", {})

if activity_direction not in ["emitter", "receiver"]:
raise AttributeError(
Expand All @@ -64,8 +63,8 @@ def analyze(
filters={
f"{activity}.name": activity_name,
f"{activity}.type": activity_direction,
**additional_filters,
},
resource_filters=resource_filtering,
)

return activity_vector
Expand All @@ -76,6 +75,7 @@ def get_hourly_analytics(
activity: str,
author_id: str | int,
filters: dict[str, dict[str, Any] | str] | None = None,
resource_filters: dict[str, str] | None = None,
) -> list[int]:
"""
Gets the list of documents for the stated day
Expand All @@ -87,12 +87,12 @@ def get_hourly_analytics(
activity : str
to be `interactions` or `actions`
filter : dict[str, dict[str] | str] | None
the filtering that we need to apply
the filtering that we need to apply on actions or interactions
for default it is an None meaning
no filtering would be applied
msg : str
additional information to be logged
for default is empty string meaning no additional string to log
resource_filtering : dict[str, str] | None
the filtering on resources of data
could make the query more efficient if provided
Returns
---------
Expand All @@ -103,12 +103,17 @@ def get_hourly_analytics(
start_day = datetime.combine(day, time(0, 0, 0))
end_day = start_day + timedelta(days=1)

# if no filter for resources then
if resource_filters is None:
resource_filters = {}

pipeline = [
# the day for analytics
{
"$match": {
"date": {"$gte": start_day, "$lt": end_day},
"author_id": author_id,
**resource_filters,
}
},
# Unwind the activity array
Expand Down
8 changes: 2 additions & 6 deletions tc_analyzer_lib/metrics/heatmaps/analytics_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,15 @@ def get_analytics_count(
raw analytics item which holds the user and
the count of interaction in that day
"""
filters: dict[str, dict[str, Any] | str] | None = kwargs.get("filters")
filters: dict[str, dict[str, Any] | str] = kwargs.get("filters", {})
start_day = datetime.combine(day, time(0, 0, 0))
end_day = start_day + timedelta(days=1)

match_filters = {
"date": {"$gte": start_day, "$lt": end_day},
"author_id": author_id,
**filters,
}
if filters is not None:
match_filters = {
**match_filters,
**filters,
}

pipeline = [
{
Expand Down
59 changes: 36 additions & 23 deletions tc_analyzer_lib/metrics/heatmaps/heatmaps.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,24 +70,34 @@ def start(self, from_start: bool = False) -> list[dict]:
# initialize the data array
heatmaps_results = []

iteration_count = self._compute_iteration_counts(
analytics_date=analytics_date,
resources_count=len(self.resources),
)

cursor = self.utils.get_users(is_bot=True)
bot_ids = list(map(lambda user: user["id"], cursor))

index = 0
# index = 0
while analytics_date.date() < datetime.now().date():
for resource_id in self.resources:
# for more efficient retrieval
# we're always using the cursor and re-querying the db

start_day = analytics_date.replace(
hour=0, minute=0, second=0, microsecond=0
start_day = analytics_date.replace(
hour=0, minute=0, second=0, microsecond=0
)
end_day = start_day + timedelta(days=1)

# getting the active resource_ids (activities being done there by users)
period_resources = self.utils.get_active_resources_period(
start_day=start_day,
end_day=end_day,
resource_identifier=self.analyzer_config.resource_identifier,
metadata_filter={
f"metadata.{self.analyzer_config.resource_identifier}": {
"$in": self.resources,
}
},
)
if len(period_resources) == 0:
logging.warning(
"No users interacting on platform for date: "
f"{start_day.date()} - {end_day.date()}"
)
end_day = start_day + timedelta(days=1)

for resource_idx, resource_id in enumerate(period_resources):
user_ids = self.utils.get_active_users(
start_day,
end_day,
Expand All @@ -99,15 +109,16 @@ def start(self, from_start: bool = False) -> list[dict]:
if len(user_ids) == 0:
logging.warning(
f"{log_prefix} No users interacting for the time window: "
f"{start_day.date()} - {end_day.date()}"
f"{start_day.date()} - {end_day.date()} for resource: {resource_id}"
" Skipping the day."
)

for idx, author_id in enumerate(user_ids):
for user_idx, author_id in enumerate(user_ids):
logging.info(
f"{log_prefix} ANALYZING HEATMAPS {index}/{iteration_count} "
f"author index: {idx}/{len(user_ids)} | "
f"DAY: {start_day.date()} - {end_day.date()}"
f"{log_prefix} ANALYZING HEATMAPS {start_day.date()} - {end_day.date()} | "
# f"DAY {index}/{iteration_count} "
f"Author: {user_idx + 1}/{len(user_ids)} "
f"of resource: {resource_idx + 1}/{len(period_resources)}"
)

if author_id in bot_ids:
Expand Down Expand Up @@ -137,7 +148,7 @@ def start(self, from_start: bool = False) -> list[dict]:

heatmaps_results.append(document)

index += 1
# index += 1

# analyze next day
analytics_date += timedelta(days=1)
Expand Down Expand Up @@ -188,8 +199,9 @@ def _process_hourly_analytics(
activity_name=activity_name,
activity_direction=config.direction.value,
author_id=author_id,
additional_filters={
resource_filtering={
f"metadata.{self.analyzer_config.resource_identifier}": resource,
"metadata.bot_activity": False,
},
)
analytics[config.name] = analytics_vector
Expand All @@ -213,8 +225,9 @@ def _process_hourly_analytics(
activity_name=activity_name,
activity_direction=config.direction.value,
author_id=author_id,
additional_filters={
resource_filtering={
f"metadata.{self.analyzer_config.resource_identifier}": resource,
"metadata.bot_activity": False,
**conditions,
},
)
Expand Down Expand Up @@ -250,6 +263,7 @@ def _process_raw_analytics(

additional_filters: dict[str, str] = {
f"metadata.{self.analyzer_config.resource_identifier}": resource,
"metadata.bot_activity": False,
}
# preparing for custom analytics (if available in config)
if config.rawmemberactivities_condition is not None:
Expand All @@ -276,8 +290,7 @@ def _process_raw_analytics(
def _compute_iteration_counts(
self,
analytics_date: datetime,
resources_count: int,
) -> int:
iteration_count = (datetime.now() - analytics_date).days * resources_count
iteration_count = (datetime.now() - analytics_date).days

return iteration_count
91 changes: 68 additions & 23 deletions tc_analyzer_lib/metrics/heatmaps/heatmaps_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ def get_users(self, is_bot: bool = False) -> Cursor:
return cursor

def get_active_users(
self, start_day: datetime, end_day: datetime, metadata_filter: dict = {}
self,
start_day: datetime,
end_day: datetime,
metadata_filter: dict | None = None,
) -> list[str]:
"""
get the users doing activities for a specific period
Expand All @@ -44,35 +47,17 @@ def get_active_users(
the time to filter the data from
end_day : datetime
the end day for filtering data from
metadata_filter : dict
metadata_filter : dict | None
the additional filtering to be applied on data
default is no filtering which an empty dictionary will be passed
default is `None` which means no filtering
Returns
---------
users : list[str]
a list of user ids doing activity in that day
"""
# cursor = self.database["rawmemberactivities"].aggregate(
# [
# {"$match": {"date": {"$gte": start_day, "$lt": end_day}}},
# {"$unwind": "$interactions"},
# {"$unwind": "$interactions.users_engaged_id"},
# {
# "$group": {
# "_id": None,
# "all_ids": {"$addToSet": "$interactions.users_engaged_id"},
# "author_ids": {"$addToSet": "$author_id"},
# }
# },
# {
# "$project": {
# "_id": 0,
# "combined_ids": {"$setUnion": ["$all_ids", "$author_ids"]},
# }
# },
# ]
# )
if metadata_filter is None:
metadata_filter = {}

cursor = self.database["rawmemberactivities"].aggregate(
[
Expand Down Expand Up @@ -109,6 +94,66 @@ def get_active_users(
# making the values to be unique
return list(set(combined_ids))

def get_active_resources_period(
self,
start_day: datetime,
end_day: datetime,
resource_identifier: str,
metadata_filter: dict | None = None,
) -> list[str]:
"""
get the active resource ids for a specific period
Parameters
------------
start_day : datetime
the time to filter the data from
end_day : datetime
the end day for filtering data from
resource_identifier : str
the resource identifier on database for a platform
i.e.: could be `channel_id` for discord
metadata_filter : dict | None
the additional filtering to be applied on data
default is `None` which means no filtering
Returns
---------
resource_ids : list[str]
a list of user ids doing activity in that day
"""
if metadata_filter is None:
metadata_filter = {}

pipeline = [
{
"$match": {
"date": {
"$gte": start_day,
"$lt": end_day,
},
**metadata_filter,
}
},
{
"$group": {
"_id": None,
"unique_resource_ids": {
"$addToSet": f"$metadata.{resource_identifier}"
},
}
},
{"$project": {"_id": 0, "unique_resource_ids": 1}},
]

results = self.database["rawmemberactivities"].aggregate(pipeline)

unique_resource_ids = []
for doc in results:
unique_resource_ids = doc.get("unique_resource_ids", [])

return unique_resource_ids

def get_users_count(self, is_bot: bool = False) -> int:
"""
get the count of users
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_analyzer_week_period_run_once_empty_analytics():
rawinfo_samples = []

# generating random rawinfo data
for i in range(155):
for i in range(160):
author = np.random.choice(acc_id)
replied_user = np.random.choice(acc_id)
# not producing any self-interactions
Expand Down
Loading

0 comments on commit 6511459

Please sign in to comment.