diff --git a/backend/server/core/panoptica_views.py b/backend/server/core/panoptica_views.py new file mode 100644 index 0000000..e94a681 --- /dev/null +++ b/backend/server/core/panoptica_views.py @@ -0,0 +1,9 @@ +from glob import glob +from flask_restful import Resource, request +from flask import current_app + + +class Panoptica_prompt_with_functions(Resource): + + def post(self): + pass \ No newline at end of file diff --git a/external_apps/__init__.py b/external_apps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/external_apps/slackbot/help.py b/external_apps/panoptica_utils/panoptica_utils.py similarity index 97% rename from external_apps/slackbot/help.py rename to external_apps/panoptica_utils/panoptica_utils.py index 12c195c..cc46e02 100644 --- a/external_apps/slackbot/help.py +++ b/external_apps/panoptica_utils/panoptica_utils.py @@ -1,207 +1,204 @@ -""" - -This file implements the commands of the webex bot, via the underlying class `PrevMeetings`. - -""" - -from constants import CONSTANTS -import requests -import json -import datetime -import os -import inspect -import re -from dotenv import load_dotenv -from typing import Callable, Dict -from collections import defaultdict - - -base_url = "https://appsecurity.cisco.com/api" - - -def LoadTranscripts(): - transcriptFileName = "webex_transcripts.json" - return transcriptFileName - -""" - -HELPER FUNCTIONS LISTED BELOW - -""" - -def InitilizeSwaggerFunctions(): - url = CONSTANTS.get("webex_api_endpoint")+"/functions" - - payload = json.dumps({ - "model": "OpenAI", - "dataset":"Swagger", - "description_text":"Used when given a question about panoptica to identify the tag of the api that needs to be referenced." - }) - headers = { - 'Content-Type': 'application/json' - } - - response = requests.request("POST", url, headers=headers, data=payload) - print(response.text) - return json.loads(response.text) - -def GetAPIKeys(): - url = CONSTANTS.get("webex_api_endpoint")+"/swagger_config" - - headers = { - 'Content-Type': 'application/json' - } - - response = requests.request("GET", url, headers=headers) - print(response.text) - return json.loads(response.text) - -def get_role_message_dict(role, content=None, fn_name=None, arguments=None, result=None): - message_dict = {"role":role} - if role == "user": - message_dict["content"] = content - elif role == "assistant": - message_dict["content"] = content - message_dict["function_call"] = {} - message_dict["function_call"]["name"] = fn_name - message_dict["function_call"]["arguments"] = arguments - elif role == "function": - message_dict["name"] = fn_name - message_dict["content"] = f'{{"result": {str(result)} }}' - return message_dict - - -def panoptica_call_functions(full_url): - url = CONSTANTS.get("webex_api_endpoint")+"/call_function" - payload = json.dumps({ - "url": full_url, - "request_type":"GET", - "params": None, - "data":None - }) - headers = { - 'Content-Type': 'application/json' - } - - response = requests.request("POST", url, headers=headers, data=payload) - print(response.text) - return response - - -def parse_docstring(function: Callable) -> Dict: - doc = inspect.getdoc(function) - - function_description = re.search(r'(.*?)Parameters', doc, re.DOTALL).group(1).strip() - parameters_description = re.findall(r'(\w+)\s*:\s*([\w\[\], ]+)\n(.*?)(?=\n\w+\s*:\s*|\nReturns|\nExample$)', doc, re.DOTALL) - - returns_description_match = re.search(r'Returns\n(.*?)(?=\n\w+\s*:\s*|$)', doc, re.DOTALL) - returns_description = returns_description_match.group(1).strip() if returns_description_match else None - - example = re.search(r'Example\n(.*?)(?=\n\w+\s*:\s*|$)', doc, re.DOTALL) - example_description = example.group(1).strip() if example else None - - signature_params = list(inspect.signature(function).parameters.keys()) - properties = {} - required = [] - for name, type, description in parameters_description: - name = name.strip() - type = type.strip() - description = description.strip() - - required.append(name) - properties[name] = { - "type": type, - "description": description, - } - if len(signature_params) != len(required): - print(f'Signature params : {signature_params}, Required params : {required}') - raise ValueError(f"Number of parameters in function signature ({signature_params}) does not match the number of parameters in docstring ({required})") - for param in signature_params: - if param not in required: - raise ValueError(f"Parameter '{param}' in function signature is missing in the docstring") - - parameters = { - "type": "object", - "properties": properties, - "required": required, - } - function_dict = { - "name": function.__name__, - "description": function_description, - "parameters": parameters, - "returns": returns_description, - # "example": example_description, - } - - return function_dict - -def run_with_functions(messages): - url = CONSTANTS.get("webex_api_endpoint")+"/run_function" - - payload = json.dumps({ - "model": "OpenAI", - "messages": messages - }) - headers = { - 'Content-Type': 'application/json' - } - - response = requests.request("POST", url, headers=headers, data=payload) - - print(response.text) - - return json.loads(response.text) - - -def prompt_with_functions(prompt, functions, function_dict): - # setup_database() - #prompt += prompt_append() - prompt = "You are an expert in Panoptica, which is a tool to give security insights in Kubernetes Clusters, API security. You are not aware of anything else. All queries should either use one of the APIs provided or should ask the user to rephrase the qurey: " + prompt - output = [] - fn_names_dict = {} - - if function_dict: - function_dicts = functions - for fn in functions: - fn_names_dict[fn['name']] = fn - else: - for fn in functions: - fn_names_dict[fn.__name__] = fn - function_dicts = [parse_docstring(fun) for fun in functions] - # print(function_dicts) - messages = [get_role_message_dict("user", content=(prompt))] - - response = run_with_functions(messages) - - if response["choices"][0]["finish_reason"] == "stop": - print("Received STOP signal from GPT.") - print() - print() - - elif response["choices"][0]["finish_reason"] == "function_call": - print("Received FUNCTION_CALL signal from GPT.") - fn_name = response["choices"][0]["message"]["function_call"]["name"] - arguments = response["choices"][0]["message"]["function_call"]["arguments"] - #json_arguments = json.loads(arguments) - #function = fn_names_dict[fn_name] - paths = fn_names_dict[fn_name]['path'] - full_url = base_url + paths - print(f"Running the {fn_name} function locally with args {arguments}") - response = panoptica_call_functions(full_url) - #result = function(**json_arguments) - print(f"Finished running {fn_name}. Output is {response._content}") - print() - print() - output.append(response._content.decode('utf-8')) - # output.append(f'You should call the {response.choices[0].message["function_call"].name} function using the following arguments : \n {response.choices[0].message["function_call"].arguments} \n Function raw output : {str(result)}') - messages.append(get_role_message_dict("assistant", fn_name=fn_name, arguments=arguments)) - messages.append(get_role_message_dict("function", fn_name=fn_name)) - response = run_with_functions(messages) - - - return output - -def RunFunction(message,functions): - # print(message,functions) - output = prompt_with_functions(message, functions["tag_dict"]['dashboard-controller'],function_dict=True) +""" + +This file implements the commands of the webex bot, via the underlying class `PrevMeetings`. + +""" + +from constants import CONSTANTS +import requests +import json +import datetime +import os +import inspect +import re +from dotenv import load_dotenv +from typing import Callable, Dict +from collections import defaultdict + + +base_url = "https://appsecurity.cisco.com/api" + + +def LoadTranscripts(): + transcriptFileName = "webex_transcripts.json" + return transcriptFileName + +""" + +HELPER FUNCTIONS LISTED BELOW + +""" + +def InitilizeSwaggerFunctions(): + url = CONSTANTS.get("webex_api_endpoint")+"/functions" + + payload = json.dumps({ + "model": "OpenAI", + "dataset":"Swagger", + "description_text":"Used when given a question about panoptica to identify the tag of the api that needs to be referenced." + }) + headers = { + 'Content-Type': 'application/json' + } + + response = requests.request("POST", url, headers=headers, data=payload) + print(response.text) + return json.loads(response.text) + +def GetAPIKeys(): + url = CONSTANTS.get("webex_api_endpoint")+"/swagger_config" + + headers = { + 'Content-Type': 'application/json' + } + + response = requests.request("GET", url, headers=headers) + print(response.text) + return json.loads(response.text) + +def get_role_message_dict(role, content=None, fn_name=None, arguments=None, result=None): + message_dict = {"role":role} + if role == "user": + message_dict["content"] = content + elif role == "assistant": + message_dict["content"] = content + message_dict["function_call"] = {} + message_dict["function_call"]["name"] = fn_name + message_dict["function_call"]["arguments"] = arguments + elif role == "function": + message_dict["name"] = fn_name + message_dict["content"] = f'{{"result": {str(result)} }}' + return message_dict + +def panoptica_call_functions(full_url): + url = CONSTANTS.get("webex_api_endpoint")+"/call_function" + payload = json.dumps({ + "url": full_url, + "request_type":"GET", + "params": None, + "data":None + }) + headers = { + 'Content-Type': 'application/json' + } + + response = requests.request("POST", url, headers=headers, data=payload) + print(response.text) + return response + +def parse_docstring(function: Callable) -> Dict: + doc = inspect.getdoc(function) + + function_description = re.search(r'(.*?)Parameters', doc, re.DOTALL).group(1).strip() + parameters_description = re.findall(r'(\w+)\s*:\s*([\w\[\], ]+)\n(.*?)(?=\n\w+\s*:\s*|\nReturns|\nExample$)', doc, re.DOTALL) + + returns_description_match = re.search(r'Returns\n(.*?)(?=\n\w+\s*:\s*|$)', doc, re.DOTALL) + returns_description = returns_description_match.group(1).strip() if returns_description_match else None + + example = re.search(r'Example\n(.*?)(?=\n\w+\s*:\s*|$)', doc, re.DOTALL) + example_description = example.group(1).strip() if example else None + + signature_params = list(inspect.signature(function).parameters.keys()) + properties = {} + required = [] + for name, type, description in parameters_description: + name = name.strip() + type = type.strip() + description = description.strip() + + required.append(name) + properties[name] = { + "type": type, + "description": description, + } + if len(signature_params) != len(required): + print(f'Signature params : {signature_params}, Required params : {required}') + raise ValueError(f"Number of parameters in function signature ({signature_params}) does not match the number of parameters in docstring ({required})") + for param in signature_params: + if param not in required: + raise ValueError(f"Parameter '{param}' in function signature is missing in the docstring") + + parameters = { + "type": "object", + "properties": properties, + "required": required, + } + function_dict = { + "name": function.__name__, + "description": function_description, + "parameters": parameters, + "returns": returns_description, + # "example": example_description, + } + + return function_dict + +def run_with_functions(messages): + url = CONSTANTS.get("webex_api_endpoint")+"/run_function" + + payload = json.dumps({ + "model": "OpenAI", + "messages": messages + }) + headers = { + 'Content-Type': 'application/json' + } + + response = requests.request("POST", url, headers=headers, data=payload) + + print(response.text) + + return json.loads(response.text) + +def prompt_with_functions(prompt, functions, function_dict): + # setup_database() + #prompt += prompt_append() + prompt = "You are an expert in Panoptica, which is a tool to give security insights in Kubernetes Clusters, API security. You are not aware of anything else. All queries should either use one of the APIs provided or should ask the user to rephrase the qurey: " + prompt + output = [] + fn_names_dict = {} + + if function_dict: + function_dicts = functions + for fn in functions: + fn_names_dict[fn['name']] = fn + else: + for fn in functions: + fn_names_dict[fn.__name__] = fn + function_dicts = [parse_docstring(fun) for fun in functions] + # print(function_dicts) + messages = [get_role_message_dict("user", content=(prompt))] + + response = run_with_functions(messages) + + if response["choices"][0]["finish_reason"] == "stop": + print("Received STOP signal from GPT.") + print() + print() + + elif response["choices"][0]["finish_reason"] == "function_call": + print("Received FUNCTION_CALL signal from GPT.") + fn_name = response["choices"][0]["message"]["function_call"]["name"] + arguments = response["choices"][0]["message"]["function_call"]["arguments"] + #json_arguments = json.loads(arguments) + #function = fn_names_dict[fn_name] + paths = fn_names_dict[fn_name]['path'] + full_url = base_url + paths + print(f"Running the {fn_name} function locally with args {arguments}") + response = panoptica_call_functions(full_url) + #result = function(**json_arguments) + print(f"Finished running {fn_name}. Output is {response._content}") + print() + print() + output.append(response._content.decode('utf-8')) + # output.append(f'You should call the {response.choices[0].message["function_call"].name} function using the following arguments : \n {response.choices[0].message["function_call"].arguments} \n Function raw output : {str(result)}') + messages.append(get_role_message_dict("assistant", fn_name=fn_name, arguments=arguments)) + messages.append(get_role_message_dict("function", fn_name=fn_name)) + response = run_with_functions(messages) + + + return output + +def RunFunction(message,functions): + # print(message,functions) + output = prompt_with_functions(message, functions["tag_dict"]['dashboard-controller'],function_dict=True) return output \ No newline at end of file diff --git a/external_apps/slackbot/run.py b/external_apps/slackbot/run.py index b2277de..d0a8ed9 100644 --- a/external_apps/slackbot/run.py +++ b/external_apps/slackbot/run.py @@ -4,8 +4,8 @@ import os import json from dotenv import load_dotenv -from slackbot_settings import SLACK_APP_TOKEN, SLACK_BOT_TOKEN -from help import RunFunction, InitilizeSwaggerFunctions, GetAPIKeys +from external_apps.panoptica_utils.panoptica_utils import RunFunction, InitilizeSwaggerFunctions, GetAPIKeys + load_dotenv() functions = InitilizeSwaggerFunctions() diff --git a/external_apps/webex_bot/cmds.py b/external_apps/webex_bot/cmds.py index ad74da3..b59055d 100644 --- a/external_apps/webex_bot/cmds.py +++ b/external_apps/webex_bot/cmds.py @@ -11,7 +11,8 @@ Text, Image, HorizontalAlignment from webexteamssdk.models.cards.actions import OpenUrl -from help import SummarizeTranscripts,SearchTranscripts,ListMeetingTranscripts,ActionablesTranscripts, RunFunction +from help import SummarizeTranscripts,SearchTranscripts,ListMeetingTranscripts,ActionablesTranscripts +from external_apps.panoptica_utils.panoptica_utils import RunFunction, InitilizeSwaggerFunctions, GetAPIKeys class EmptySpace(Command): def __init__(self): diff --git a/external_apps/webex_bot/escherauth.py b/external_apps/webex_bot/escherauth.py deleted file mode 100644 index a22c871..0000000 --- a/external_apps/webex_bot/escherauth.py +++ /dev/null @@ -1,313 +0,0 @@ -#code taken from https://github.com/emartech/escher-python and updated to python3 - -import datetime -import hmac -import requests -import urllib.request, urllib.parse, urllib.error -import re - -from hashlib import sha256, sha512 - -try: - from urllib.parse import urlparse, parse_qsl, urljoin - from urllib.parse import quote -except: - from urllib.parse import urlparse, parse_qsl, urljoin, quote - - -class EscherException(Exception): - pass - - -class EscherRequestsAuth(requests.auth.AuthBase): - def __init__(self, credential_scope, options, client): - self.escher = Escher(credential_scope, options) - self.client = client - - def __call__(self, request): - return self.escher.sign(request, self.client) - - -class EscherRequest(): - _uri_regex = re.compile('([^?#]*)(\?(.*))?') - - def __init__(self, request): - self.type = type(request) - self.request = request - self.prepare_request_uri() - - def request(self): - return self.request - - def prepare_request_uri(self): - if self.type is requests.models.PreparedRequest: - self.request_uri = self.request.path_url - if self.type is dict: - self.request_uri = self.request['uri'] - match = re.match(self._uri_regex, self.request_uri) - self.uri_path = match.group(1) - self.uri_query = match.group(3) - - def method(self): - if self.type is requests.models.PreparedRequest: - return self.request.method - if self.type is dict: - return self.request['method'] - - def host(self): - if self.type is requests.models.PreparedRequest: - return self.request.host - if self.type is dict: - return self.request['host'] - - def path(self): - return self.uri_path - - def query_parts(self): - return parse_qsl((self.uri_query or '').replace(';', '%3b'), True) - - def headers(self): - if self.type is requests.models.PreparedRequest: - headers = [] - for key, value in self.request.headers.items(): - headers.append([key, value]) - return headers - if self.type is dict: - return self.request['headers'] - - def body(self): - if self.type is requests.models.PreparedRequest: - return self.request.body or '' - if self.type is dict: - return self.request.get('body', '') - - def add_header(self, header, value): - if self.type is requests.models.PreparedRequest: - self.request.headers[header] = value - if self.type is dict: - self.request['headers'].append((header, value)) - - -class AuthParams: - def __init__(self, data, vendor_key): - self._init_data(data, 'X-' + vendor_key + '-') - - def _init_data(self, data, prefix): - self._data = {} - for (k, v) in data: - if k.startswith(prefix): - self._data[k.replace(prefix, '').lower()] = v - - def get(self, name): - if name not in self._data: - raise EscherException('Missing authorization parameter: ' + name) - return self._data[name] - - def get_signed_headers(self): - return self.get('signedheaders').lower().split(';') - - def get_algo_data(self): - data = self.get('algorithm').split('-') - if len(data) != 3: - raise EscherException('Malformed Algorithm parameter') - return data - - def get_algo_prefix(self): - return self.get_algo_data()[0] - - def get_hash_algo(self): - return self.get_algo_data()[2].upper() - - def get_credential_data(self): - data = self.get('credentials').split('/', 2) - if len(data) != 3: - raise EscherException('Malformed Credentials parameter') - return data - - def get_credential_key(self): - return self.get_credential_data()[0] - - def get_credential_date(self): - return datetime.datetime.strptime(self.get_credential_data()[1], '%Y%m%d') - - def get_credential_scope(self): - return self.get_credential_data()[2] - - def get_expires(self): - return int(self.get('expires')) - - def get_request_date(self): - return datetime.datetime.strptime(self.get('date'), '%Y%m%dT%H%M%SZ') - - -class AuthenticationValidator: - def validate_mandatory_signed_headers(self, headers_to_sign): - if 'host' not in headers_to_sign: - raise EscherException('Host header is not signed') - - def validate_hash_algo(self, hash_algo): - if hash_algo not in ('SHA256', 'SHA512'): - raise EscherException('Only SHA256 and SHA512 hash algorithms are allowed') - - def validate_dates(self, current_date, request_date, credential_date, expires, clock_skew): - if request_date.strftime('%Y%m%d') != credential_date.strftime('%Y%m%d'): - raise EscherException('The request date and credential date do not match') - - min_date = current_date - datetime.timedelta(seconds=(clock_skew + expires)) - max_date = current_date + datetime.timedelta(seconds=clock_skew) - if request_date < min_date or request_date > max_date: - raise EscherException('Request date is not within the accepted time interval') - - def validate_credential_scope(self, expected, actual): - if actual != expected: - raise EscherException('Invalid credential scope (provided: ' + actual + ', required: ' + expected + ')') - - def validate_signature(self, expected, actual): - if expected != actual: - raise EscherException('The signatures do not match (provided: ' + actual + ', calculated: ' + expected + ')') - - -class Escher: - _normalize_path = re.compile('([^/]+/\.\./?|/\./|//|/\.$|/\.\.$)') - - def __init__(self, credential_scope, options={}): - self.credential_scope = credential_scope - self.algo_prefix = options.get('algo_prefix', 'ESR') - self.vendor_key = options.get('vendor_key', 'Escher') - self.hash_algo = options.get('hash_algo', 'SHA256') - self.current_time = options.get('current_time', datetime.datetime.utcnow()) - self.auth_header_name = options.get('auth_header_name', 'X-Escher-Auth') - self.date_header_name = options.get('date_header_name', 'X-Escher-Date') - self.clock_skew = options.get('clock_skew', 300) - self.algo = self.create_algo() - self.algo_id = self.algo_prefix + '-HMAC-' + self.hash_algo - - def sign(self, r, client, headers_to_sign=[]): - request = EscherRequest(r) - - for header in [self.date_header_name.lower(), 'host']: - if header not in headers_to_sign: - headers_to_sign.append(header) - - signature = self.generate_signature(client['api_secret'], request, headers_to_sign, self.current_time) - request.add_header(self.auth_header_name, ", ".join([ - self.algo_id + ' Credential=' + client['api_key'] + '/' + self.short_date( - self.current_time) + '/' + self.credential_scope, - 'SignedHeaders=' + self.prepare_headers_to_sign(headers_to_sign), - 'Signature=' + signature - ])) - return request.request - - def authenticate(self, r, key_db): - request = EscherRequest(r) - - auth_params = AuthParams(request.query_parts(), self.vendor_key) - validator = AuthenticationValidator() - - validator.validate_mandatory_signed_headers(auth_params.get_signed_headers()) - validator.validate_hash_algo(auth_params.get_hash_algo()) - validator.validate_dates( - self.current_time, - auth_params.get_request_date(), - auth_params.get_credential_date(), - auth_params.get_expires(), - self.clock_skew - ) - validator.validate_credential_scope(self.credential_scope, auth_params.get_credential_scope()) - - if auth_params.get_credential_key() not in key_db: - raise EscherException('Invalid Escher key') - - calculated_signature = self.generate_signature( - key_db[auth_params.get_credential_key()], request, - auth_params.get_signed_headers(), - auth_params.get_request_date() - ) - validator.validate_signature(calculated_signature, auth_params.get('signature')) - - return auth_params.get_credential_key() - - def hmac_digest(self, key, message, is_hex=False): - if not isinstance(key, bytes): - key = key.encode('utf-8') - digest = hmac.new(key, message.encode('utf-8'), self.algo) - if is_hex: - return digest.hexdigest() - return digest.digest() - - def generate_signature(self, api_secret, req, headers_to_sign, current_time): - canonicalized_request = self.canonicalize(req, headers_to_sign) - string_to_sign = self.get_string_to_sign(canonicalized_request, current_time) - - signing_key = self.hmac_digest(self.algo_prefix + api_secret, self.short_date(current_time)) - for data in self.credential_scope.split('/'): - signing_key = self.hmac_digest(signing_key, data) - - return self.hmac_digest(signing_key, string_to_sign, True) - - def canonicalize(self, req, headers_to_sign): - return "\n".join([ - req.method(), - self.canonicalize_path(req.path()), - self.canonicalize_query(req.query_parts()), - self.canonicalize_headers(req.headers(), headers_to_sign), - '', - self.prepare_headers_to_sign(headers_to_sign), - self.algo(req.body().encode('utf-8')).hexdigest() - ]) - - def canonicalize_path(self, path): - changes = 1 - while changes > 0: - path, changes = self._normalize_path.subn('/', path, 1) - return path - - def canonicalize_headers(self, headers, headers_to_sign): - headers_list = [] - for key, value in iter(sorted(headers)): - if key.lower() in headers_to_sign: - headers_list.append(key.lower() + ':' + self.normalize_white_spaces(value)) - return "\n".join(sorted(headers_list)) - - def normalize_white_spaces(self, value): - index = 0 - value_normalized = [] - pattern = re.compile(r'\s+') - for part in value.split('"'): - if index % 2 == 0: - part = pattern.sub(' ', part) - value_normalized.append(part) - index += 1 - return '"'.join(value_normalized).strip() - - def canonicalize_query(self, query_parts): - safe = "~+!'()*" - query_list = [] - for key, value in query_parts: - if key == 'X-' + self.vendor_key + '-Signature': - continue - query_list.append(quote(key, safe=safe) + '=' + quote(value, safe=safe)) - return "&".join(sorted(query_list)) - - def get_string_to_sign(self, canonicalized_request, current_time): - return "\n".join([ - self.algo_id, - self.long_date(current_time), - self.short_date(current_time) + '/' + self.credential_scope, - self.algo(canonicalized_request.encode('utf-8')).hexdigest() - ]) - - def create_algo(self): - if self.hash_algo == 'SHA256': - return sha256 - if self.hash_algo == 'SHA512': - return sha512 - - def long_date(self, time): - return time.strftime('%Y%m%dT%H%M%SZ') - - def short_date(self, time): - return time.strftime('%Y%m%d') - - def prepare_headers_to_sign(self, headers_to_sign): - return ";".join(sorted(headers_to_sign)) \ No newline at end of file diff --git a/external_apps/webex_bot/help.py b/external_apps/webex_bot/help.py index 5de9157..699de34 100644 --- a/external_apps/webex_bot/help.py +++ b/external_apps/webex_bot/help.py @@ -14,7 +14,6 @@ from dotenv import load_dotenv from typing import Callable, Dict from collections import defaultdict -from escherauth import EscherRequestsAuth base_url = "https://appsecurity.cisco.com/api" @@ -43,22 +42,6 @@ def InitilizeTranscripts(transcriptFileName): response = requests.request("POST", url, headers=headers, data=payload) print(response.text) -def InitilizeSwaggerFunctions(): - url = CONSTANTS.get("webex_api_endpoint")+"/functions" - - payload = json.dumps({ - "model": "OpenAI", - "dataset":"Swagger", - "description_text":"Used when given a question about panoptica to identify the tag of the api that needs to be referenced." - }) - headers = { - 'Content-Type': 'application/json' - } - - response = requests.request("POST", url, headers=headers, data=payload) - print(response.text) - return json.loads(response.text) - def ListMeetingTranscripts(): response_string = "" @@ -169,153 +152,3 @@ def ActionablesTranscripts(transcriptFileName,message): return res -def get_role_message_dict(role, content=None, fn_name=None, arguments=None, result=None): - message_dict = {"role":role} - if role == "user": - message_dict["content"] = content - elif role == "assistant": - message_dict["content"] = content - message_dict["function_call"] = {} - message_dict["function_call"]["name"] = fn_name - message_dict["function_call"]["arguments"] = arguments - elif role == "function": - message_dict["name"] = fn_name - message_dict["content"] = f'{{"result": {str(result)} }}' - return message_dict - - -def panoptica_call_functions(full_url): - access_key = "2a797523-3934-4698-9975-af13de9e15ca" - secret_key = "kSN59Kje1AiOfFaTe+itdHiPUnFUIxC1bOs4gJ1kCnk=" - date_format = '%Y%m%dT%H%M%SZ' - date_string = datetime.datetime.utcnow().strftime(date_format) - date = datetime.datetime.strptime(date_string, date_format) - print(f'the full url is {full_url}') - response = requests.get(full_url, - headers={'X-Escher-Date': date_string, - 'host': 'appsecurity.cisco.com', - 'content-type': 'application/json'}, - auth=EscherRequestsAuth("global/services/portshift_request", - {'current_time': date}, - {'api_key': access_key, 'api_secret': secret_key})) - - print("response.status_code = " + str(response.status_code)) - return response - -def parse_docstring(function: Callable) -> Dict: - doc = inspect.getdoc(function) - - function_description = re.search(r'(.*?)Parameters', doc, re.DOTALL).group(1).strip() - parameters_description = re.findall(r'(\w+)\s*:\s*([\w\[\], ]+)\n(.*?)(?=\n\w+\s*:\s*|\nReturns|\nExample$)', doc, re.DOTALL) - - returns_description_match = re.search(r'Returns\n(.*?)(?=\n\w+\s*:\s*|$)', doc, re.DOTALL) - returns_description = returns_description_match.group(1).strip() if returns_description_match else None - - example = re.search(r'Example\n(.*?)(?=\n\w+\s*:\s*|$)', doc, re.DOTALL) - example_description = example.group(1).strip() if example else None - - signature_params = list(inspect.signature(function).parameters.keys()) - properties = {} - required = [] - for name, type, description in parameters_description: - name = name.strip() - type = type.strip() - description = description.strip() - - required.append(name) - properties[name] = { - "type": type, - "description": description, - } - if len(signature_params) != len(required): - print(f'Signature params : {signature_params}, Required params : {required}') - raise ValueError(f"Number of parameters in function signature ({signature_params}) does not match the number of parameters in docstring ({required})") - for param in signature_params: - if param not in required: - raise ValueError(f"Parameter '{param}' in function signature is missing in the docstring") - - parameters = { - "type": "object", - "properties": properties, - "required": required, - } - function_dict = { - "name": function.__name__, - "description": function_description, - "parameters": parameters, - "returns": returns_description, - # "example": example_description, - } - - return function_dict - -def run_with_functions(messages): - url = "http://127.0.0.1:3000//run_function" - - payload = json.dumps({ - "model": "OpenAI", - "messages": messages - }) - headers = { - 'Content-Type': 'application/json' - } - - response = requests.request("POST", url, headers=headers, data=payload) - - print(response.text) - - return json.loads(response.text) - - -def prompt_with_functions(prompt, functions, function_dict): - # setup_database() - #prompt += prompt_append() - prompt = "You are an expert in Panoptica, which is a tool to give security insights in Kubernetes Clusters, API security. You are not aware of anything else. All queries should either use one of the APIs provided or should ask the user to rephrase the qurey: " + prompt - output = [] - fn_names_dict = {} - - if function_dict: - function_dicts = functions - for fn in functions: - fn_names_dict[fn['name']] = fn - else: - for fn in functions: - fn_names_dict[fn.__name__] = fn - function_dicts = [parse_docstring(fun) for fun in functions] - # print(function_dicts) - messages = [get_role_message_dict("user", content=(prompt))] - - response = run_with_functions(messages) - - if response["choices"][0]["finish_reason"] == "stop": - print("Received STOP signal from GPT.") - print() - print() - - elif response["choices"][0]["finish_reason"] == "function_call": - print("Received FUNCTION_CALL signal from GPT.") - fn_name = response["choices"][0]["message"]["function_call"]["name"] - arguments = response["choices"][0]["message"]["function_call"]["arguments"] - #json_arguments = json.loads(arguments) - #function = fn_names_dict[fn_name] - paths = fn_names_dict[fn_name]['path'] - full_url = base_url + paths - print(f"Running the {fn_name} function locally with args {arguments}") - response = panoptica_call_functions(full_url) - #result = function(**json_arguments) - print(f"Finished running {fn_name}. Output is {response._content}") - print() - print() - output.append(response._content.decode('utf-8')) - # output.append(f'You should call the {response.choices[0].message["function_call"].name} function using the following arguments : \n {response.choices[0].message["function_call"].arguments} \n Function raw output : {str(result)}') - messages.append(get_role_message_dict("assistant", fn_name=fn_name, arguments=arguments)) - messages.append(get_role_message_dict("function", fn_name=fn_name)) - response = run_with_functions(messages) - - - return output - -def RunFunction(message,functions): - # print(message,functions) - output = prompt_with_functions(message, functions["tag_dict"]['dashboard-controller'],function_dict=True) - return output \ No newline at end of file diff --git a/external_apps/webex_bot/main.py b/external_apps/webex_bot/main.py index 8387b9b..84e7307 100644 --- a/external_apps/webex_bot/main.py +++ b/external_apps/webex_bot/main.py @@ -39,7 +39,9 @@ from webex_bot.webex_bot import WebexBot from cmds import SummarAcross, EmptySpace, SearchAcross, ListTranscripts, Actionables, Panoptica -from help import LoadTranscripts, InitilizeTranscripts, InitilizeSwaggerFunctions +from help import LoadTranscripts, InitilizeTranscripts +from external_apps.panoptica_utils.panoptica_utils import RunFunction, InitilizeSwaggerFunctions, GetAPIKeys + from constants import CONSTANTS import requests