Skip to content

Commit

Permalink
rabbit setup
Browse files Browse the repository at this point in the history
  • Loading branch information
nleach999 committed May 29, 2024
1 parent 99a4bd4 commit 10bea9d
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 32 deletions.
37 changes: 31 additions & 6 deletions config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
from cxone_service import CxOneService
from password_strength import PasswordPolicy
from cxoneflow_logging import SecretRegistry
from persist_service import WorkflowStateService
from typing import Tuple


def get_config_path():
if "CONFIG_YAML_PATH" in os.environ.keys():
Expand Down Expand Up @@ -58,7 +61,7 @@ class CxOneFlowConfig:

__cxone_service_tuple_index = 1
__scm_service_tuple_index = 2
__rabbit_service_tuple_index = 3
__workflow_service_tuple_index = 3

@staticmethod
def log():
Expand All @@ -69,12 +72,14 @@ def get_service_monikers():
return list(CxOneFlowConfig.__scm_config_tuples_by_service_moniker.keys())

@staticmethod
def retrieve_services_by_moniker(moniker):
def retrieve_services_by_moniker(moniker : str) -> Tuple[CxOneService,SCMService,WorkflowStateService]:
service_tuple = CxOneFlowConfig.__scm_config_tuples_by_service_moniker[moniker]
return service_tuple[CxOneFlowConfig.__cxone_service_tuple_index], service_tuple[CxOneFlowConfig.__scm_service_tuple_index]
return service_tuple[CxOneFlowConfig.__cxone_service_tuple_index], service_tuple[CxOneFlowConfig.__scm_service_tuple_index], \
service_tuple[CxOneFlowConfig.__workflow_service_tuple_index]


@staticmethod
def retrieve_services_by_route(clone_urls):
def retrieve_services_by_route(clone_urls : str) -> Tuple[CxOneService,SCMService,WorkflowStateService]:

if type(clone_urls) is list:
it_list = clone_urls
Expand All @@ -84,7 +89,8 @@ def retrieve_services_by_route(clone_urls):
for url in it_list:
for entry in CxOneFlowConfig.__ordered_scm_config_tuples:
if entry[0].match(url):
return entry[CxOneFlowConfig.__cxone_service_tuple_index], entry[CxOneFlowConfig.__scm_service_tuple_index]
return entry[CxOneFlowConfig.__cxone_service_tuple_index], entry[CxOneFlowConfig.__scm_service_tuple_index], \
entry[CxOneFlowConfig.__workflow_service_tuple_index]

CxOneFlowConfig.log().error(f"No route matched for {clone_urls}")
raise RouteNotFoundException(clone_urls)
Expand Down Expand Up @@ -153,6 +159,21 @@ def __get_value_for_key_or_default(key, config_dict, default):
else:
return config_dict[key]

@staticmethod
def __workflow_service_client_factory(config_path, **kwargs):
if kwargs is None or len(kwargs.keys()) == 0:
return WorkflowStateService("amqp://localhost:5672", None, None, True)
else:
amqp_url = CxOneFlowConfig.__get_value_for_key_or_fail(config_path, "amqp-url", kwargs)
amqp_user = CxOneFlowConfig.__get_secret_from_value_of_key_or_default(kwargs, "amqp-user", None)
amqp_password = CxOneFlowConfig.__get_secret_from_value_of_key_or_default(kwargs, "amqp-password", None)
ssl_verify = CxOneFlowConfig.__get_value_for_key_or_default("ssl-verify", kwargs, True)
if type(ssl_verify) is str:
ssl_verify = ssl_verify.lower() == "true"

return WorkflowStateService(amqp_url, amqp_user, amqp_password, ssl_verify)


@staticmethod
def __cxone_client_factory(config_path, **kwargs):

Expand Down Expand Up @@ -281,6 +302,9 @@ def __setup_scm(cloner_factory, api_auth_factory, config_dict, config_path):

cxone_client = CxOneFlowConfig.__cxone_client_factory(f"{config_path}/cxone",
**(CxOneFlowConfig.__get_value_for_key_or_fail(config_path, 'cxone', config_dict)))

workflow_service_client = CxOneFlowConfig.__workflow_service_client_factory(f"{config_path}/rabbit",
**(CxOneFlowConfig.__get_value_for_key_or_default('rabbit', config_dict, {})))

scan_config_dict = CxOneFlowConfig.__get_value_for_key_or_default('scan-config', config_dict, {} )

Expand Down Expand Up @@ -315,8 +339,9 @@ def __setup_scm(cloner_factory, api_auth_factory, config_dict, config_path):
clone_config_path = f"{config_path}/connection/api-auth"

scm_service = SCMService(service_moniker, api_session, scm_shared_secret, CxOneFlowConfig.__cloner_factory(cloner_factory, clone_auth_dict, clone_config_path))


scm_tuple = (repo_matcher, cxone_service, scm_service)
scm_tuple = (repo_matcher, cxone_service, scm_service, workflow_service_client)

CxOneFlowConfig.__ordered_scm_config_tuples.append(scm_tuple)
CxOneFlowConfig.__scm_config_tuples_by_service_moniker[service_moniker] = scm_tuple
Expand Down
4 changes: 1 addition & 3 deletions docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ do
cat /opt/cxone/nginx/$f | envsubst '$SSL_CERT_PATH $SSL_CERT_KEY_PATH $CXONEFLOW_HOSTNAME' > /etc/nginx/sites-enabled/$f
done


nginx

rabbitmq-server -detached
rabbitmqctl await_startup

nginx

gunicorn --bind=127.0.0.1:5000 --name=CxOneFlow wsgi:app
2 changes: 1 addition & 1 deletion orchestration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ async def execute(orchestrator):
return 204

OrchestrationDispatch.log().debug(f"Service lookup: {orchestrator.route_urls}")
cxone_service, scm_service = CxOneFlowConfig.retrieve_services_by_route(orchestrator.route_urls)
cxone_service, scm_service, rabbit_service = CxOneFlowConfig.retrieve_services_by_route(orchestrator.route_urls)
OrchestrationDispatch.log().debug(f"Service lookup success: {orchestrator.route_urls}")

if await orchestrator.is_signature_valid(scm_service.shared_secret):
Expand Down
63 changes: 63 additions & 0 deletions persist_service/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio, aio_pika, logging
from ssl import create_default_context, CERT_NONE
from enum import Enum
import urllib.parse
from cxoneflow_logging import SecretRegistry

class WorkflowStateService:

class ScanWorkflow(Enum):
PR = "pr"
PUSH = "push"

class FeedbackWorkflow(Enum):
PR = "pr"

@staticmethod
def log():
return logging.getLogger("WorkflowStateService")

def __init__(self, amqp_url , amqp_user, amqp_password, ssl_verify):
self.__lock = asyncio.Lock()
self.__amqp_url = amqp_url
self.__amqp_user = amqp_user
self.__amqp_password = amqp_password
self.__ssl_verify = ssl_verify
self.__client = None

netloc = urllib.parse.urlparse(self.__amqp_url).netloc

if '@' in netloc:
SecretRegistry.register(netloc.split("@")[0])



@property
def use_ssl(self):
return urllib.parse.urlparse(self.__amqp_url).scheme == "amqps"


async def mq_client(self):
async with self.__lock:

if self.__client is None:
WorkflowStateService.log().debug(f"Creating AMQP connection to: {self.__amqp_url}")
ctx = None

if self.use_ssl and not self.__ssl_verify:
ctx = create_default_context()
ctx.check_hostname = False
ctx.verify_mode = CERT_NONE

self.__client = await aio_pika.connect_robust(self.__amqp_url, \
login=self.__amqp_user if self.__amqp_user is not None else "guest", \
password=self.__amqp_password if self.__amqp_password is not None else "guest", \
ssl_context=ctx)
return self.__client


async def await_scan(self, workflow : ScanWorkflow, service_moniker : str, scanid : str) -> None:
pass

async def exec_scan_feedback(self, workflow : FeedbackWorkflow, service_moniker : str, scanid : str) -> None:
pass
48 changes: 26 additions & 22 deletions rabbit_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,32 @@

async def setup() -> None:
monikers = CxOneFlowConfig.get_service_monikers()

rmq = await aio_pika.connect_robust("amqp://localhost")

async with rmq.channel() as channel:
scan_in_exchange = await channel.declare_exchange("Scan In", aio_pika.ExchangeType.FANOUT, durable=True)
scan_await_exchange = await channel.declare_exchange("Scan Await", aio_pika.ExchangeType.TOPIC, durable=True, internal=True)
scan_feedback_exchange = await channel.declare_exchange("Scan Feedback", aio_pika.ExchangeType.TOPIC, durable=True, internal=True)
polling_delivery_exchange = await channel.declare_exchange("Scan Polling Delivery", aio_pika.ExchangeType.DIRECT, durable=True, internal=True)

polling_scans_queue = await channel.declare_queue("Polling Scans", durable=True)
awaited_scans_queue = await channel.declare_queue("Awaited Scans", durable=True, \
arguments = {
'x-dead-letter-exchange' : 'Scan Polling Delivery',
'x-dead-letter-routing-key' : 'poll'})

pr_feedback_queue = await channel.declare_queue("PR Feedback", durable=True)

await polling_scans_queue.bind(polling_delivery_exchange, "poll")
await awaited_scans_queue.bind(scan_await_exchange, "await.*.*")
await pr_feedback_queue.bind(scan_feedback_exchange, "feedback.pr.*")
await scan_await_exchange.bind(scan_in_exchange)
await scan_feedback_exchange.bind(scan_in_exchange)

for moniker in monikers:
__log.info(f"Configuring RabbitMQ for {moniker}")
_,_,workflow_service = CxOneFlowConfig.retrieve_services_by_moniker(moniker)

rmq = await workflow_service.mq_client()

async with rmq.channel() as channel:
scan_in_exchange = await channel.declare_exchange("Scan In", aio_pika.ExchangeType.FANOUT, durable=True)
scan_await_exchange = await channel.declare_exchange("Scan Await", aio_pika.ExchangeType.TOPIC, durable=True, internal=True)
scan_feedback_exchange = await channel.declare_exchange("Scan Feedback", aio_pika.ExchangeType.TOPIC, durable=True, internal=True)
polling_delivery_exchange = await channel.declare_exchange("Scan Polling Delivery", aio_pika.ExchangeType.DIRECT, durable=True, internal=True)

polling_scans_queue = await channel.declare_queue("Polling Scans", durable=True)
awaited_scans_queue = await channel.declare_queue("Awaited Scans", durable=True, \
arguments = {
'x-dead-letter-exchange' : 'Scan Polling Delivery',
'x-dead-letter-routing-key' : 'poll'})

pr_feedback_queue = await channel.declare_queue("PR Feedback", durable=True)

await polling_scans_queue.bind(polling_delivery_exchange, "poll")
await awaited_scans_queue.bind(scan_await_exchange, "await.*.*")
await pr_feedback_queue.bind(scan_feedback_exchange, "feedback.pr.*")
await scan_await_exchange.bind(scan_in_exchange)
await scan_feedback_exchange.bind(scan_in_exchange)


if __name__ == "__main__":
Expand Down

0 comments on commit 10bea9d

Please sign in to comment.