Skip to content

Commit

Permalink
Executor Logs (#1499)
Browse files Browse the repository at this point in the history
* Added code for handling the webhook event after completing payment and added test cases.

* Added code for handling the webhook event after completing payment and added test cases.

* Removed meta version related changes.

* Removed meta version related changes.

* Added code for executor logs and fixed respective test cases

* Added code for executor logs and fixed respective test cases

* Added code for executor logs and fixed respective test cases

* Added code for executor logs and fixed respective test cases

* Added code for executor logs and fixed respective test cases and added test cases for the same.

* Made changes related to executor logs and added respective test cases.

* removed unused import statements
  • Loading branch information
maheshsattala authored Sep 11, 2024
1 parent 1884022 commit 272fc4e
Show file tree
Hide file tree
Showing 16 changed files with 457 additions and 71 deletions.
4 changes: 2 additions & 2 deletions kairon/async_callback/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from kairon.shared.callback.data_objects import CallbackData, CallbackConfig, CallbackLog, CallbackExecutionMode
from kairon.shared.cloud.utils import CloudUtility
from kairon.shared.constants import EventClass

from kairon.shared.data.constant import TASK_TYPE

async_task_executor = ThreadPoolExecutor(max_workers=64)

Expand All @@ -29,7 +29,7 @@ def run_pyscript(script: str, predefined_objects: dict):
lambda_response = CloudUtility.trigger_lambda(EventClass.pyscript_evaluator, {
'source_code': script,
'predefined_objects': predefined_objects
})
}, task_type=TASK_TYPE.CALLBACK.value)
if CloudUtility.lambda_execution_failed(lambda_response):
err = lambda_response['Payload'].get('body') or lambda_response
raise AppException(f"{err}")
Expand Down
15 changes: 14 additions & 1 deletion kairon/events/executors/base.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
from abc import abstractmethod


from kairon.shared.constants import EventClass
from kairon.shared.data.constant import EVENT_STATUS, TASK_TYPE


class ExecutorBase:

"""Base class to create executors"""

@abstractmethod
def execute_task(self, event_class: EventClass, data: dict):
def execute_task(self, event_class: EventClass, data: dict, **kwargs):
raise NotImplementedError("Provider not implemented")

def log_task(self, event_class: EventClass, task_type: TASK_TYPE, data: dict, status: EVENT_STATUS, **kwargs):
from bson import ObjectId
from kairon.shared.cloud.utils import CloudUtility

executor_log_id = kwargs.pop("executor_log_id") if kwargs.get("executor_log_id") else ObjectId().__str__()
CloudUtility.log_task(
event_class=event_class, task_type=task_type, data=data, status=status,
executor_log_id=executor_log_id, **kwargs
)
return executor_log_id
29 changes: 26 additions & 3 deletions kairon/events/executors/dramatiq.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import time

import ujson as json

from kairon.events.executors.base import ExecutorBase
from kairon.exceptions import AppException
from kairon.shared.constants import EventClass
from kairon.shared.data.constant import EVENT_STATUS
from kairon.shared.events.broker.factory import BrokerFactory


Expand All @@ -11,9 +15,28 @@ class DramatiqExecutor(ExecutorBase):
Executor to enqueue tasks on broker which are later executed by dramatiq workers.
"""

def execute_task(self, event_class: EventClass, data: dict):
def execute_task(self, event_class: EventClass, data: dict, **kwargs):
"""
Retrieves broker and enqueues message.
"""
msg = BrokerFactory.get_instance().enqueue(event_class, **data)
return json.dumps(msg.asdict())
task_type = kwargs.get("task_type")
start_time = time.time()
executor_log_id = self.log_task(event_class=event_class, task_type=task_type, data=data,
status=EVENT_STATUS.INITIATED, from_executor=True)
response = {}
try:
msg = BrokerFactory.get_instance().enqueue(event_class, **data)
response = json.dumps(msg.asdict())
except Exception as e:
exception = str(e)
self.log_task(event_class=event_class, task_type=task_type, data=data,
status=EVENT_STATUS.FAIL, response=response,
executor_log_id=executor_log_id, elapsed_time=time.time() - start_time,
exception=exception, from_executor=True)
raise AppException(exception)
self.log_task(event_class=event_class, task_type=task_type, data=data,
status=EVENT_STATUS.COMPLETED, response=json.loads(response),
executor_log_id=executor_log_id, elapsed_time=time.time() - start_time,
from_executor=True)

return response
5 changes: 3 additions & 2 deletions kairon/events/executors/lamda.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ class LambdaExecutor(ExecutorBase):
"""
Executor to execute the code on lambda using boto3.
"""
def execute_task(self, event_class: EventClass, data: dict):
def execute_task(self, event_class: EventClass, data: dict, **kwargs):
"""
Builds event payload and triggers lambda for that particular event.
"""
task_type = kwargs.get("task_type")
env_data = Utility.build_lambda_payload(data)
response = CloudUtility.trigger_lambda(event_class, env_data)
response = CloudUtility.trigger_lambda(event_class, env_data, task_type=task_type, from_executor=True)
if CloudUtility.lambda_execution_failed(response):
raise AppException(response)
return response
34 changes: 27 additions & 7 deletions kairon/events/executors/standalone.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import time

from loguru import logger

from kairon import Utility
from kairon.events.definitions.factory import EventFactory
from kairon.events.executors.base import ExecutorBase
from kairon.exceptions import AppException
from kairon.shared.concurrency.actors.factory import ActorFactory
from kairon.shared.constants import EventClass, EventExecutor, ActorType
from kairon.shared.data.constant import EVENT_STATUS


class StandaloneExecutor(ExecutorBase):
Expand All @@ -15,18 +19,34 @@ class StandaloneExecutor(ExecutorBase):
It is recommended that this type of executor should only be used with workers.
"""

def execute_task(self, event_class: EventClass, data: dict):
def execute_task(self, event_class: EventClass, data: dict, **kwargs):
"""
Executes events based on the event class received.
"""
msg = None
task_type = kwargs.get("task_type")
start_time = time.time()
logger.debug("started executing task in standalone mode")
logger.debug(f"event_class: {event_class}, data: {data}")
executor_log_id = self.log_task(event_class=event_class, task_type=task_type, data=data,
status=EVENT_STATUS.INITIATED, from_executor=True)
definition = EventFactory.get_instance(event_class)(**data)
if Utility.environment['events']['executor']['type'] == EventExecutor.standalone:
actor = ActorFactory.get_instance(ActorType.callable_runner.value)
actor.execute(definition.execute, **data)
msg = "Task Spawned!"
else:
definition.execute(**data)
try:
if Utility.environment['events']['executor']['type'] == EventExecutor.standalone:
actor = ActorFactory.get_instance(ActorType.callable_runner.value)
actor.execute(definition.execute, **data)
msg = "Task Spawned!"
else:
definition.execute(**data)
except Exception as e:
exception = str(e)
self.log_task(event_class=event_class, task_type=task_type, data=data,
status=EVENT_STATUS.FAIL, response=msg,
executor_log_id=executor_log_id, elapsed_time=time.time() - start_time,
exception=exception, from_executor=True)
raise AppException(exception)
self.log_task(event_class=event_class, task_type=task_type, data=data,
status=EVENT_STATUS.COMPLETED, response={"message": msg},
executor_log_id=executor_log_id, elapsed_time=time.time() - start_time,
from_executor=True)
return msg
6 changes: 4 additions & 2 deletions kairon/events/scheduler/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from abc import ABC, abstractmethod
from typing import Text

from kairon.shared.data.constant import TASK_TYPE


class EventSchedulerBase(ABC):

Expand All @@ -9,11 +11,11 @@ def name(self):
return self.__class__.__name__

@abstractmethod
def add_job(self, event_id: Text, cron_exp: Text, event_class: Text, body: dict):
def add_job(self, event_id: Text, task_type: TASK_TYPE, cron_exp: Text, event_class: Text, body: dict):
raise NotImplementedError("Provider not implemented")

@abstractmethod
def update_job(self, event_id: Text, cron_exp: Text, event_class: Text, body: dict):
def update_job(self, event_id: Text, task_type: TASK_TYPE, cron_exp: Text, event_class: Text, body: dict):
raise NotImplementedError("Provider not implemented")

@abstractmethod
Expand Down
15 changes: 9 additions & 6 deletions kairon/events/scheduler/kscheduler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from typing import Text

from apscheduler.events import JobEvent, EVENT_JOB_ADDED
from apscheduler.jobstores.base import JobLookupError
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.schedulers.background import BackgroundScheduler
Expand All @@ -13,6 +12,8 @@
from kairon.events.executors.factory import ExecutorFactory
from kairon.events.scheduler.base import EventSchedulerBase
from kairon.exceptions import AppException
from apscheduler.events import JobEvent, EVENT_JOB_ADDED
from kairon.shared.data.constant import TASK_TYPE

logging.getLogger('apscheduler').setLevel(logging.DEBUG)
logging.basicConfig()
Expand All @@ -27,25 +28,27 @@ class KScheduler(EventSchedulerBase):
job_defaults={'coalesce': True, 'misfire_grace_time': 7200})
__scheduler.start()

def update_job(self, event_id: Text, cron_exp: Text, event_class: Text, data: dict, timezone=None):
def update_job(self, event_id: Text, task_type: TASK_TYPE, cron_exp: Text, event_class: Text, data: dict, timezone=None):
try:
func = ExecutorFactory.get_executor().execute_task
args = (event_class, data,)
kwargs = {'task_type': task_type}
trigger = CronTrigger.from_crontab(cron_exp, timezone=timezone)
changes = {
"func": func, "trigger": trigger, "args": args, "name": func.__name__
"func": func, "trigger": trigger, "args": args, "kwargs": kwargs, "name": func.__name__
}
KScheduler.__scheduler.modify_job(event_id, KScheduler.__job_store_name, **changes)
KScheduler.__scheduler.reschedule_job(event_id, KScheduler.__job_store_name, trigger)
except JobLookupError as e:
logger.exception(e)
raise AppException(e)

def add_job(self, event_id: Text, cron_exp: Text, event_class: Text, data: dict, timezone=None):
def add_job(self, event_id: Text, task_type: TASK_TYPE, cron_exp: Text, event_class: Text, data: dict, timezone=None):
func = ExecutorFactory.get_executor().execute_task
args = (event_class, data,)
args = (event_class, task_type, data)
kwargs = {'task_type': task_type}
trigger = CronTrigger.from_crontab(cron_exp, timezone=timezone)
KScheduler.__scheduler.add_job(func, trigger, args, id=event_id, name=func.__name__,
KScheduler.__scheduler.add_job(func, trigger, args, kwargs, id=event_id, name=func.__name__,
jobstore=KScheduler.__job_store_name)

def list_jobs(self):
Expand Down
11 changes: 8 additions & 3 deletions kairon/events/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from kairon.events.executors.factory import ExecutorFactory
from kairon.events.scheduler.kscheduler import KScheduler
from kairon.exceptions import AppException
from kairon.shared.data.constant import TASK_TYPE


class EventUtility:
Expand All @@ -13,10 +14,13 @@ def add_job(event_type: Text, request_data: Dict, is_scheduled: bool):
if is_scheduled:
response = None
event_id = request_data["data"]["event_id"]
KScheduler().add_job(event_class=event_type, event_id=event_id, **request_data)
KScheduler().add_job(event_class=event_type, event_id=event_id,
task_type=TASK_TYPE.EVENT.value, **request_data)
message = 'Event Scheduled!'
else:
response = ExecutorFactory.get_executor().execute_task(event_type, request_data["data"])
response = ExecutorFactory.get_executor().execute_task(event_class=event_type,
task_type=TASK_TYPE.EVENT.value,
data=request_data["data"])
return response, message

@staticmethod
Expand All @@ -25,5 +29,6 @@ def update_job(event_type: Text, request_data: Dict, is_scheduled: bool):
raise AppException("Updating non-scheduled event not supported!")

event_id = request_data["data"]["event_id"]
KScheduler().update_job(event_class=event_type, event_id=event_id, **request_data)
KScheduler().update_job(event_class=event_type, event_id=event_id, task_type=TASK_TYPE.EVENT.value,
**request_data)
return None, 'Scheduled event updated!'
8 changes: 5 additions & 3 deletions kairon/shared/actions/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ..admin.processor import Sysadmin
from ..cloud.utils import CloudUtility
from ..constants import KAIRON_USER_MSG_ENTITY, PluginTypes, EventClass
from ..data.constant import REQUEST_TIMESTAMP_HEADER
from ..data.constant import REQUEST_TIMESTAMP_HEADER, TASK_TYPE
from ..data.data_objects import Slots, KeyVault
from ..plugins.factory import PluginFactory
from ..rest_client import AioRestClient
Expand Down Expand Up @@ -494,7 +494,8 @@ def perform_web_search(search_term: str, **kwargs):
results = []
try:
if trigger_task:
lambda_response = CloudUtility.trigger_lambda(EventClass.web_search, request_body)
lambda_response = CloudUtility.trigger_lambda(EventClass.web_search, request_body,
task_type=TASK_TYPE.ACTION.value)
if CloudUtility.lambda_execution_failed(lambda_response):
err = lambda_response['Payload'].get('body') or lambda_response
raise ActionFailure(f"{err}")
Expand Down Expand Up @@ -679,7 +680,8 @@ def run_pyscript(source_code: Text, context: dict):
pyscript_evaluator_url = Utility.environment['evaluator']['pyscript']['url']
request_body = {"source_code": source_code, "predefined_objects": context}
if trigger_task:
lambda_response = CloudUtility.trigger_lambda(EventClass.pyscript_evaluator, request_body)
lambda_response = CloudUtility.trigger_lambda(EventClass.pyscript_evaluator, request_body,
task_type=TASK_TYPE.ACTION.value)
if CloudUtility.lambda_execution_failed(lambda_response):
err = lambda_response['Payload'].get('body') or lambda_response
raise ActionFailure(f"{err}")
Expand Down
64 changes: 55 additions & 9 deletions kairon/shared/cloud/utils.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import ujson as json
import os
import time

from boto3 import Session
from botocore.exceptions import ClientError
from mongoengine import DoesNotExist

from kairon.shared.utils import Utility
from kairon.exceptions import AppException
from kairon.shared.constants import EventClass
from loguru import logger
from kairon.shared.data.constant import EVENT_STATUS, TASK_TYPE


class CloudUtility:

Expand Down Expand Up @@ -52,26 +57,67 @@ def delete_file(bucket, file):
s3.delete_object(Bucket=bucket, Key=file)

@staticmethod
def trigger_lambda(event_class: EventClass, env_data: dict):
def trigger_lambda(event_class: EventClass, env_data: dict, task_type: TASK_TYPE = TASK_TYPE.ACTION.value,
from_executor: bool = False):
"""
Triggers lambda based on the event class.
"""
start_time = time.time()
region = Utility.environment['events']['executor'].get('region')
if Utility.check_empty_string(region):
region = "us-east-1"
function = Utility.environment['events']['task_definition'][event_class]
session = Session()
lambda_client = session.client("lambda", region_name=region)
response = lambda_client.invoke(
FunctionName=function,
InvocationType='RequestResponse',
LogType='Tail',
Payload=json.dumps(env_data).encode(),
)
response['Payload'] = json.loads(response['Payload'].read())
logger.info(response)
response = {}
executor_log_id = CloudUtility.log_task(event_class=event_class, task_type=task_type, data=env_data,
status=EVENT_STATUS.INITIATED, from_executor=from_executor)
try:
response = lambda_client.invoke(
FunctionName=function,
InvocationType='RequestResponse',
LogType='Tail',
Payload=json.dumps(env_data).encode(),
)
response['Payload'] = json.loads(response['Payload'].read())
logger.info(response)

if CloudUtility.lambda_execution_failed(response):
raise AppException(response)
except Exception as e:
exception = str(e)
CloudUtility.log_task(event_class=event_class, task_type=task_type, data=env_data,
status=EVENT_STATUS.FAIL, response=response, executor_log_id=executor_log_id,
elapsed_time=time.time() - start_time, exception=exception,
from_executor=from_executor)
raise AppException(exception)
CloudUtility.log_task(event_class=event_class, task_type=task_type, data=env_data,
status=EVENT_STATUS.COMPLETED, response=response, executor_log_id=executor_log_id,
elapsed_time=time.time() - start_time, from_executor=from_executor)
return response

@staticmethod
def log_task(event_class: EventClass, task_type: TASK_TYPE, data: dict, status: EVENT_STATUS, **kwargs):
from bson import ObjectId
from kairon.shared.events.data_objects import ExecutorLogs

executor_log_id = kwargs.get("executor_log_id") if kwargs.get("executor_log_id") else ObjectId().__str__()

try:
log = ExecutorLogs.objects(executor_log_id=executor_log_id, task_type=task_type, event_class=event_class,
status=EVENT_STATUS.INITIATED.value).get()
except DoesNotExist:
log = ExecutorLogs(executor_log_id=executor_log_id, task_type=task_type, event_class=event_class)

log.data = data if data else log.data
log.status = status if status else log.status

for key, value in kwargs.items():
if not getattr(log, key, None) and Utility.is_picklable_for_mongo({key: value}):
setattr(log, key, value)
log.save()
return executor_log_id

@staticmethod
def lambda_execution_failed(response):
return (response['StatusCode'] != 200 or
Expand Down
Loading

0 comments on commit 272fc4e

Please sign in to comment.