Skip to content

Commit

Permalink
Merge pull request #24 from DostEducation/feature/15-create-flow-run-…
Browse files Browse the repository at this point in the history
…log-service

Feature/15 create flow run log service
  • Loading branch information
Sachinbisht27 authored May 3, 2024
2 parents 931a0be + c597493 commit ba17671
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 8 deletions.
21 changes: 20 additions & 1 deletion api/models/user_flows.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,33 @@
from datetime import date
from flask_sqlalchemy.query import Query as BaseQuery
from sqlalchemy import func

from api import db
from api.mixins import TimestampMixin


class UserFlowsQuery(BaseQuery):
def get_by_flow_uuid_and_phone(self, flow_uuid, user_phone):
return self.filter(
UserFlows.flow_uuid == flow_uuid,
UserFlows.user_phone == user_phone,
func.DATE(UserFlows.flow_start_time) == date.today(),
).first()


class UserFlows(TimestampMixin, db.Model):
query_class = UserFlowsQuery

class FlowRunStatus:
IN_PROGRESS = "in-progress"
COMPLETED = "completed"
ENDED = "ended"

__tablename__ = "user_flows"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"))
user_phone = db.Column(db.BigInteger, nullable=False, index=True)
flow_uuid = db.Column(db.String(255))
flow_uuid = db.Column(db.String(255), index=True)
flow_name = db.Column(db.String(255))
flow_type = db.Column(db.String(255))
flow_run_status = db.Column(db.String(255))
Expand Down
1 change: 1 addition & 0 deletions api/services/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .flow_run_log_service import *
from .user_creation_service import *
from .user_indicator_response_service import *
from .webhook_transaction_log_service import *
62 changes: 62 additions & 0 deletions api/services/flow_run_log_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from datetime import datetime, timedelta

from api import models
from api.utils import db_utils
from api.utils.loggingutils import logger


class FlowRunLogService:
def __init__(self, user):
self.user = user
self.class_model = models.UserFlows

def create_user_flow_log(self, json_data):
try:
flow_uuid = json_data.get("flow_uuid")
flow_name = json_data.get("flow_name")
flow_type = json_data.get("flow_type")
flow_completed = json_data.get("flow_completed")

today_flow_log = self.class_model.query.get_by_flow_uuid_and_phone(
flow_uuid, self.user.phone
)

user_flow_log = None

if not today_flow_log:
user_flow_log = self.create_log(flow_uuid, flow_name, flow_type)
elif today_flow_log and flow_completed:
user_flow_log = self.update_log(today_flow_log)

return user_flow_log
except Exception as e:
logger.error(
f"Error while creating new user flow log. json data: {json_data}."
f"Error message: {e}"
)
raise

def create_log(self, flow_uuid, flow_name, flow_type):
user_flow_log = self.class_model(
user_id=self.user.id,
user_phone=self.user.phone,
flow_uuid=flow_uuid,
flow_name=flow_name,
flow_type=flow_type,
flow_run_status=self.class_model.FlowRunStatus.IN_PROGRESS,
flow_start_time=datetime.utcnow() + timedelta(minutes=330),
is_active=True,
)

db_utils.save(user_flow_log)
logger.info(f"Created a user flow log for phone number {self.user.phone}.")
return user_flow_log

def update_log(self, today_flow_log):
today_flow_log.flow_run_status = self.class_model.FlowRunStatus.COMPLETED
today_flow_log.flow_end_time = datetime.utcnow() + timedelta(minutes=330)
today_flow_log.is_active = False

db_utils.save(today_flow_log)
logger.info(f"Updated user flow log for phone number {self.user.phone}.")
return today_flow_log
4 changes: 2 additions & 2 deletions api/services/user_indicator_response_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@


class UserIndicatorResponseService:
def __init__(self, user, user_flow_id):
def __init__(self, user, user_flow):
self.key = "indicator_question" # Prefix for the indicators key in payload
self.user_id = user.id
self.user_phone = user.phone
self.user_flow_id = user_flow_id
self.user_flow_id = user_flow.id
self.class_model = models.UserIndicatorResponses

def process_user_indicator_responses(self, data):
Expand Down
15 changes: 10 additions & 5 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from api import app
from api.services import (
FlowRunLogService,
UserIndicatorResponseService,
UserCreationService,
WebhookTransactionLogService,
Expand Down Expand Up @@ -42,22 +43,26 @@ def handle_webhook(json_data):
contact_data = json_data["contact"]
if contact_data:
user = handle_contact_field_data(contact_data)
process_user_indicators(user, json_data)
user_flow = handle_user_flow_logs(user, json_data)
process_user_indicators(user, user_flow, json_data)


def handle_contact_field_data(contact_data):
user_creation_service = UserCreationService()
return user_creation_service.create_new_user(contact_data)


def handle_user_flow_logs(user, json_data):
flow_run_log_service = FlowRunLogService(user)
return flow_run_log_service.create_user_flow_log(json_data)


def process_user_indicators(
user,
user_flow,
json_data,
):

user_flow_id = (
1 # Placeholder for user_flow_id, should be obtained from user_flow details.
)
user_indicator_res_service = UserIndicatorResponseService(user, user_flow_id)
user_indicator_res_service = UserIndicatorResponseService(user, user_flow)
user_indicator_res_service.process_user_indicator_responses(json_data)
return True
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Adding index to the flou uuid cloumn in user flow table.
Revision ID: cf56b46971fa
Revises: 06b59f5dd95e
Create Date: 2024-05-03 15:19:29.399203
"""

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "cf56b46971fa"
down_revision = "06b59f5dd95e"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("user_flows", schema=None) as batch_op:
batch_op.create_index(
batch_op.f("ix_user_flows_flow_uuid"), ["flow_uuid"], unique=False
)

with op.batch_alter_table("user_indicator_responses", schema=None) as batch_op:
batch_op.alter_column(
"id",
existing_type=sa.INTEGER(),
type_=sa.BigInteger(),
existing_nullable=False,
autoincrement=True,
)

# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("user_indicator_responses", schema=None) as batch_op:
batch_op.alter_column(
"id",
existing_type=sa.BigInteger(),
type_=sa.INTEGER(),
existing_nullable=False,
autoincrement=True,
)

with op.batch_alter_table("user_flows", schema=None) as batch_op:
batch_op.drop_index(batch_op.f("ix_user_flows_flow_uuid"))

# ### end Alembic commands ###

0 comments on commit ba17671

Please sign in to comment.