diff --git a/src/spaceone/notification/service/__init__.py b/src/spaceone/notification/service/__init__.py index 42c2a4f..f0ab18f 100644 --- a/src/spaceone/notification/service/__init__.py +++ b/src/spaceone/notification/service/__init__.py @@ -2,5 +2,3 @@ from spaceone.notification.service.protocol_service import ProtocolService from spaceone.notification.service.project_channel_service import ProjectChannelService from spaceone.notification.service.user_channel_service import UserChannelService -from spaceone.notification.service.quota_service import QuotaService -from spaceone.notification.service.notification_usage_service import NotificationUsageService diff --git a/src/spaceone/notification/service/notification_service.py b/src/spaceone/notification/service/notification_service.py index 1379ff5..182008c 100644 --- a/src/spaceone/notification/service/notification_service.py +++ b/src/spaceone/notification/service/notification_service.py @@ -24,15 +24,20 @@ @mutation_handler @event_handler class NotificationService(BaseService): - + resource = "Notification" + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.notification_mgr: NotificationManager = self.locator.get_manager('NotificationManager') - - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['resource_type', 'resource_id', 'topic', 'message', 'domain_id']) + self.notification_mgr: NotificationManager = self.locator.get_manager( + "NotificationManager" + ) + + @transaction( + permission="notification:Notification.write", role_types=["DOMAIN_ADMIN"] + ) + @check_required(["resource_type", "resource_id", "topic", "message", "domain_id"]) def create(self, params): - """ Create Notification + """Create Notification Args: params (dict): { @@ -42,242 +47,336 @@ def create(self, params): 'message': 'dict', 'notification_type': 'str' -> INFO(default) | ERROR | SUCCESS | WARNING, 'notification_level': 'str' -> ALL(default) | LV1 | LV2 | LV3 | LV4 | LV5, - 'domain_id': 'str' + 'domain_id': 'str' # injected from auth } Returns: notification_vo (object) """ - identity_mgr: IdentityManager = self.locator.get_manager('IdentityManager') + identity_mgr: IdentityManager = self.locator.get_manager("IdentityManager") - domain_id = params['domain_id'] - resource_type = params['resource_type'] - resource_id = params['resource_id'] - message = params['message'] + domain_id = params["domain_id"] + resource_type = params["resource_type"] + resource_id = params["resource_id"] + message = params["message"] domain_info = identity_mgr.get_domain_info(domain_id) - message['domain_name'] = self.get_domain_name(domain_info) + message["domain_name"] = self.get_domain_name(domain_info) identity_mgr.get_resource(resource_id, resource_type, domain_id) - if resource_type == 'identity.Domain': + if resource_type == "identity.Domain": self.dispatch_domain(params) - elif resource_type == 'identity.Project': + elif resource_type == "identity.Project": self.dispatch_project_channel(params) - elif resource_type == 'identity.User': + elif resource_type == "identity.User": self.dispatch_user_channel(params) def dispatch_domain(self, params): _LOGGER.debug(f'[Dispatch Domain] Domain ID: {params["resource_id"]}') - identity_mgr: IdentityManager = self.locator.get_manager('IdentityManager') - users = identity_mgr.get_all_users_in_domain(params['resource_id']) + identity_mgr: IdentityManager = self.locator.get_manager("IdentityManager") + users = identity_mgr.get_all_users_in_domain(params["resource_id"]) for user in users: user_channel_params = { - 'notification_type': params.get('notification_type', 'INFO'), - 'notification_level': params.get('notification_level', 'ALL'), - 'topic': params['topic'], - 'message': params['message'], - 'resource_type': 'identity.User', - 'resource_id': user['user_id'], - 'domain_id': params['resource_id'] + "notification_type": params.get("notification_type", "INFO"), + "notification_level": params.get("notification_level", "ALL"), + "topic": params["topic"], + "message": params["message"], + "resource_type": "identity.User", + "resource_id": user["user_id"], + "domain_id": params["resource_id"], } self.dispatch_user_channel(user_channel_params) def dispatch_project_channel(self, params): _LOGGER.debug(f'[Dispatch Project Channel] Project ID: {params["resource_id"]}') - protocol_mgr: ProtocolManager = self.locator.get_manager('ProtocolManager') - project_ch_mgr: ProjectChannelManager = self.locator.get_manager('ProjectChannelManager') + protocol_mgr: ProtocolManager = self.locator.get_manager("ProtocolManager") + project_ch_mgr: ProjectChannelManager = self.locator.get_manager( + "ProjectChannelManager" + ) - domain_id = params['domain_id'] - topic = params['topic'] - resource_id = params['resource_id'] + domain_id = params["domain_id"] + topic = params["topic"] + resource_id = params["resource_id"] - notification_type = params.get('notification_type', 'INFO') - notification_level = params.get('notification_level', 'ALL') - message = params['message'] + notification_type = params.get("notification_type", "INFO") + notification_level = params.get("notification_level", "ALL") + message = params["message"] prj_ch_vos, prj_ch_total_count = project_ch_mgr.list_project_channels( - {'filter': [{'k': 'project_id', 'v': resource_id, 'o': 'eq'}, - {'k': 'domain_id', 'v': domain_id, 'o': 'eq'}]}) + { + "filter": [ + {"k": "project_id", "v": resource_id, "o": "eq"}, + {"k": "domain_id", "v": domain_id, "o": "eq"}, + ] + } + ) for prj_ch_vo in prj_ch_vos: - if prj_ch_vo.state == 'ENABLED': - protocol_vo = protocol_mgr.get_protocol(prj_ch_vo.protocol_id, domain_id) - - dispatch_subscribe = self.check_subscribe_for_dispatch(prj_ch_vo.is_subscribe, prj_ch_vo.subscriptions, - topic) - dispatch_schedule = self.check_schedule_for_dispatch(prj_ch_vo.is_scheduled, prj_ch_vo.schedule) - dispatch_notification_level = self.check_notification_level_for_dispatch(notification_level, - prj_ch_vo.notification_level) - - _LOGGER.debug(f'[Notification] subscribe: {dispatch_subscribe} | schedule: {dispatch_schedule} ' - f'| notification_level: {dispatch_notification_level}') - - if dispatch_subscribe and dispatch_schedule and dispatch_notification_level: - _LOGGER.info(f'[Notification] Dispatch Notification to project: {resource_id}') - - if protocol_vo.protocol_type == 'INTERNAL': + if prj_ch_vo.state == "ENABLED": + protocol_vo = protocol_mgr.get_protocol( + prj_ch_vo.protocol_id, domain_id + ) + + dispatch_subscribe = self.check_subscribe_for_dispatch( + prj_ch_vo.is_subscribe, prj_ch_vo.subscriptions, topic + ) + dispatch_schedule = self.check_schedule_for_dispatch( + prj_ch_vo.is_scheduled, prj_ch_vo.schedule + ) + dispatch_notification_level = ( + self.check_notification_level_for_dispatch( + notification_level, prj_ch_vo.notification_level + ) + ) + + _LOGGER.debug( + f"[Notification] subscribe: {dispatch_subscribe} | schedule: {dispatch_schedule} " + f"| notification_level: {dispatch_notification_level}" + ) + + if ( + dispatch_subscribe + and dispatch_schedule + and dispatch_notification_level + ): + _LOGGER.info( + f"[Notification] Dispatch Notification to project: {resource_id}" + ) + + if protocol_vo.protocol_type == "INTERNAL": internal_project_channel_data = prj_ch_vo.data - for user_id in internal_project_channel_data.get('users', []): - params.update({ - 'resource_type': 'identity.User', - 'resource_id': user_id - }) - _LOGGER.debug(f'[Forward to User Channel] User ID: {user_id}') + for user_id in internal_project_channel_data.get("users", []): + params.update( + { + "resource_type": "identity.User", + "resource_id": user_id, + } + ) + _LOGGER.debug( + f"[Forward to User Channel] User ID: {user_id}" + ) self.dispatch_user_channel(params) - elif protocol_vo.protocol_type == 'EXTERNAL': - _LOGGER.info(f'[Notification] Dispatch Notification to project: {resource_id}') - channel_data = self.get_channel_data(prj_ch_vo, protocol_vo, domain_id) + elif protocol_vo.protocol_type == "EXTERNAL": + _LOGGER.info( + f"[Notification] Dispatch Notification to project: {resource_id}" + ) + channel_data = self.get_channel_data( + prj_ch_vo, protocol_vo, domain_id + ) secret_data = self.get_secret_data(protocol_vo, domain_id) - self.push_queue(protocol_vo.protocol_id, channel_data, secret_data, notification_type, message, domain_id) + self.push_queue( + protocol_vo.protocol_id, + channel_data, + secret_data, + notification_type, + message, + domain_id, + ) else: - _LOGGER.info(f'[Notification] Skip Notification to project: {resource_id}') + _LOGGER.info( + f"[Notification] Skip Notification to project: {resource_id}" + ) else: - _LOGGER.info(f'[Notification] Project Channel is disabled: {prj_ch_vo.project_channel_id}') + _LOGGER.info( + f"[Notification] Project Channel is disabled: {prj_ch_vo.project_channel_id}" + ) def dispatch_user_channel(self, params): _LOGGER.debug(f'[Dispatch User Channel] User ID: {params["resource_id"]}') - protocol_mgr: ProtocolManager = self.locator.get_manager('ProtocolManager') - user_ch_mgr: UserChannelManager = self.locator.get_manager('UserChannelManager') + protocol_mgr: ProtocolManager = self.locator.get_manager("ProtocolManager") + user_ch_mgr: UserChannelManager = self.locator.get_manager("UserChannelManager") - domain_id = params['domain_id'] - topic = params['topic'] - resource_id = params['resource_id'] + domain_id = params["domain_id"] + topic = params["topic"] + resource_id = params["resource_id"] - notification_type = params.get('notification_type', 'INFO') - message = params['message'] + notification_type = params.get("notification_type", "INFO") + message = params["message"] user_ch_vos, user_ch_total_count = user_ch_mgr.list_user_channels( - {'filter': [{'k': 'user_id', 'v': resource_id, 'o': 'eq'}, - {'k': 'domain_id', 'v': domain_id, 'o': 'eq'}]}) + { + "filter": [ + {"k": "user_id", "v": resource_id, "o": "eq"}, + {"k": "domain_id", "v": domain_id, "o": "eq"}, + ] + } + ) for user_ch_vo in user_ch_vos: - if user_ch_vo.state == 'ENABLED': - protocol_vo = protocol_mgr.get_protocol(user_ch_vo.protocol_id, domain_id) - - dispatch_subscribe = self.check_subscribe_for_dispatch(user_ch_vo.is_subscribe, - user_ch_vo.subscriptions, topic) - dispatch_schedule = self.check_schedule_for_dispatch(user_ch_vo.is_scheduled, user_ch_vo.schedule) - - _LOGGER.debug(f'[Notification] subscribe: {dispatch_subscribe} | schedule: {dispatch_schedule}') + if user_ch_vo.state == "ENABLED": + protocol_vo = protocol_mgr.get_protocol( + user_ch_vo.protocol_id, domain_id + ) + + dispatch_subscribe = self.check_subscribe_for_dispatch( + user_ch_vo.is_subscribe, user_ch_vo.subscriptions, topic + ) + dispatch_schedule = self.check_schedule_for_dispatch( + user_ch_vo.is_scheduled, user_ch_vo.schedule + ) + + _LOGGER.debug( + f"[Notification] subscribe: {dispatch_subscribe} | schedule: {dispatch_schedule}" + ) if dispatch_subscribe and dispatch_schedule: - _LOGGER.info(f'[Notification] Dispatch Notification to user: {resource_id}') - channel_data = self.get_channel_data(user_ch_vo, protocol_vo, domain_id) + _LOGGER.info( + f"[Notification] Dispatch Notification to user: {resource_id}" + ) + channel_data = self.get_channel_data( + user_ch_vo, protocol_vo, domain_id + ) secret_data = self.get_secret_data(protocol_vo, domain_id) - self.push_queue(protocol_vo.protocol_id, channel_data, secret_data, notification_type, message, domain_id) + self.push_queue( + protocol_vo.protocol_id, + channel_data, + secret_data, + notification_type, + message, + domain_id, + ) else: - _LOGGER.info(f'[Notification] Skip Notification to user: {resource_id}') + _LOGGER.info( + f"[Notification] Skip Notification to user: {resource_id}" + ) else: - _LOGGER.info(f'[Notification] User Channel is disabled: {user_ch_vo.user_channel_id}') + _LOGGER.info( + f"[Notification] User Channel is disabled: {user_ch_vo.user_channel_id}" + ) - params.update({'user_id': resource_id}) + params.update({"user_id": resource_id}) self.notification_mgr.create_notification(params) - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['protocol_id', 'data', 'message', 'domain_id']) + @transaction( + permission="notification:Notification.write", role_types=["DOMAIN_ADMIN"] + ) + @check_required(["protocol_id", "data", "message", "domain_id"]) def push(self, params): - """ Push notification + """Push notification Args: params (dict): { - 'protocol_id': 'str', - 'data': 'str', - 'message': 'dict', + 'protocol_id': 'str', # required + 'data': 'str', # required + 'message': 'dict', # required 'notification_type', 'str', - 'domain_id': 'str' + 'notification_level', 'str', + 'domain_id': 'str' # injected from auth } Returns: None """ - domain_id = params['domain_id'] - protocol_id = params['protocol_id'] - notification_type = params.get('notification_type', 'INFO') - data = params['data'] - message = params.get('message', {}) + domain_id = params["domain_id"] + protocol_id = params["protocol_id"] + notification_type = params.get("notification_type", "INFO") + data = params["data"] + message = params.get("message", {}) - protocol_mgr: ProtocolManager = self.locator.get_manager('ProtocolManager') + protocol_mgr: ProtocolManager = self.locator.get_manager("ProtocolManager") protocol_vo = protocol_mgr.get_protocol(protocol_id, domain_id) secret_data = self.get_secret_data(protocol_vo, domain_id) - self.push_queue(protocol_vo.protocol_id, data, secret_data, notification_type, message, domain_id) - - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['notifications', 'domain_id']) + self.push_queue( + protocol_vo.protocol_id, + data, + secret_data, + notification_type, + message, + domain_id, + ) + + @transaction(permission="notification:Notification.write", role_types=["USER"]) + @check_required(["notifications", "domain_id"]) def delete(self, params): - """ Delete notifications + """Delete notifications Args: params (dict): { - 'notifications': 'list', - 'domain_id': 'str' + 'notifications': 'list', # required + 'domain_id': 'str' # injected from auth } Returns: None """ filter_params = { - 'notification_id': params['notifications'], - 'domain_id': params['domain_id'] + "notification_id": params["notifications"], + "domain_id": params["domain_id"], } notification_vos = self.notification_mgr.filter_notifications(**filter_params) self.notification_mgr.delete_notification_by_vos(notification_vos) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['notifications', 'domain_id']) + @transaction(permission="notification:Notification.write", role_types=["USER"]) + @check_required(["notifications", "domain_id"]) def set_read(self, params): - """ Change the notifications to read status. + """Change the notifications to read status. Args: params (dict): { - 'notifications': 'list', - 'domain_id': 'str' + 'notifications': 'list', # required + 'domain_id': 'str' # injected from auth } Returns: None """ - self.notification_mgr.set_read_notification(params['notifications'], params['domain_id']) + self.notification_mgr.set_read_notification( + params["notifications"], params["domain_id"] + ) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['notification_id', 'domain_id']) + @transaction(permission="notification:Notification.read", role_types=["USER"]) + @check_required(["notification_id", "domain_id"]) def get(self, params): - """ Get Notification + """Get Notification Args: params (dict): { - 'domain_id': 'str', - 'only': 'list', + 'notification_id': 'str', # required 'set_read': 'bool' + 'domain_id': 'str', # injected from auth } Returns: notification_vo (object) """ - return self.notification_mgr.get_notification(params['notification_id'], params['domain_id'], - params.get('only')) - - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['domain_id']) - @append_query_filter(['notification_id', 'topic', 'notification_type', 'notification_level', 'is_read', - 'project_id', 'user_id', 'domain_id', 'user_self']) - @append_keyword_filter(['notification_id', 'topic']) + return self.notification_mgr.get_notification( + params["notification_id"], params["domain_id"], params.get("only") + ) + + @transaction(permission="notification:Notification.read", role_types=["USER"]) + @check_required(["domain_id"]) + @append_query_filter( + [ + "notification_id", + "topic", + "notification_type", + "notification_level", + "is_read", + "project_id", + "parent_notification_id", + "user_id", + "domain_id", + ] + ) + @append_keyword_filter(["notification_id", "topic"]) def list(self, params): - """ List User Channels + """List User Channels Args: params (dict): { + 'query': 'dict (spaceone.api.core.v1.Query)' 'notification_id': 'str', 'topic': 'str', 'notification_type': 'str', @@ -285,8 +384,8 @@ def list(self, params): 'is_read': 'bool', 'project_id': 'str', 'user_id': 'str', + 'parent_notification_id': 'str', 'domain_id': 'str', - 'query': 'dict (spaceone.api.core.v1.Query)' } Returns: @@ -294,13 +393,13 @@ def list(self, params): total_count (int) """ - query = params.get('query', {}) + query = params.get("query", {}) return self.notification_mgr.list_notifications(query) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['query', 'domain_id']) - @append_query_filter(['domain_id', 'user_self']) - @append_keyword_filter(['notification_id', 'topic']) + @transaction(permission="notification:Notification.read", role_types=["USER"]) + @check_required(["query", "domain_id"]) + @append_query_filter(["domain_id"]) + @append_keyword_filter(["notification_id", "topic"]) def stat(self, params): """ Args: @@ -314,36 +413,46 @@ def stat(self, params): total_count (int) """ - query = params.get('query', {}) + query = params.get("query", {}) return self.notification_mgr.stat_notifications(query) - def push_queue(self, protocol_id, channel_data, secret_data, notification_type, message, domain_id): + def push_queue( + self, + protocol_id, + channel_data, + secret_data, + notification_type, + message, + domain_id, + ): task = { - 'name': 'dispatch_notification', - 'version': 'v1', - 'executionEngine': 'BaseWorker', - 'stages': [{ - 'locator': 'SERVICE', - 'name': 'NotificationService', - 'metadata': self.transaction.meta, - 'method': 'dispatch_notification', - 'params': { - 'protocol_id': protocol_id, - 'channel_data': channel_data, - 'secret_data': secret_data, - 'notification_type': notification_type, - 'message': message, - 'domain_id': domain_id, + "name": "dispatch_notification", + "version": "v1", + "executionEngine": "BaseWorker", + "stages": [ + { + "locator": "SERVICE", + "name": "NotificationService", + "metadata": self.transaction.meta, + "method": "dispatch_notification", + "params": { + "protocol_id": protocol_id, + "channel_data": channel_data, + "secret_data": secret_data, + "notification_type": notification_type, + "message": message, + "domain_id": domain_id, + }, } - }] + ], } - _LOGGER.debug(f'[push_queue] task: {task}') - queue.put('notification_q', utils.dump_json(task)) + _LOGGER.debug(f"[push_queue] task: {task}") + queue.put("notification_q", utils.dump_json(task)) - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) + @transaction(append_meta={"authorization.scope": "DOMAIN"}) def delete_old_notifications(self, params): - """ Delete old notifications + """Delete old notifications Args: params (dict): {} @@ -356,108 +465,157 @@ def delete_old_notifications(self, params): condition_date = now - datetime.timedelta(days=OLD_NOTIFICATION_DAYS) condition_date_iso = utils.datetime_to_iso8601(condition_date) - query = {'filter': [{'k': 'created_at', 'v': condition_date_iso, 'o': 'datetime_lte'}]} + query = { + "filter": [ + {"k": "created_at", "v": condition_date_iso, "o": "datetime_lte"} + ] + } _LOGGER.debug(f"[delete_old_notifications] Query: {query}") notification_vos, total_count = self.notification_mgr.list_notifications(query) - _LOGGER.debug(f"[delete_old_notifications] Old Notification Count: {total_count}") + _LOGGER.debug( + f"[delete_old_notifications] Old Notification Count: {total_count}" + ) notification_vos.delete() def get_channel_data(self, channel_vo, protocol_vo, domain_id): - secret_mgr: SecretManager = self.locator.get_manager('SecretManager') + secret_mgr: SecretManager = self.locator.get_manager("SecretManager") channel_data = None plugin_info = protocol_vo.plugin_info.to_dict() - plugin_metadata = plugin_info.get('metadata', {}) + plugin_metadata = plugin_info.get("metadata", {}) - if plugin_metadata.get('data_type') == 'PLAIN_TEXT': + if plugin_metadata.get("data_type") == "PLAIN_TEXT": channel_data = channel_vo.data - elif plugin_metadata.get('data_type') == 'SECRET': + elif plugin_metadata.get("data_type") == "SECRET": channel_data = secret_mgr.get_secret_data(channel_vo.secret_id, domain_id) return channel_data def get_secret_data(self, protocol_vo, domain_id): - secret_mgr: SecretManager = self.locator.get_manager('SecretManager') + secret_mgr: SecretManager = self.locator.get_manager("SecretManager") secret_data = {} plugin_info = protocol_vo.plugin_info.to_dict() - if secret_id := plugin_info.get('secret_id'): + if secret_id := plugin_info.get("secret_id"): secret_data = secret_mgr.get_secret_data(secret_id, domain_id) return secret_data - def dispatch_notification(self, protocol_id, channel_data, secret_data, notification_type, message, domain_id): - protocol_mgr: ProtocolManager = self.locator.get_manager('ProtocolManager') - plugin_mgr: PluginManager = self.locator.get_manager('PluginManager') + def dispatch_notification( + self, + protocol_id, + channel_data, + secret_data, + notification_type, + message, + domain_id, + ): + protocol_mgr: ProtocolManager = self.locator.get_manager("ProtocolManager") + plugin_mgr: PluginManager = self.locator.get_manager("PluginManager") protocol_vo = protocol_mgr.get_protocol(protocol_id, domain_id) - if protocol_vo.state == 'ENABLED': + if protocol_vo.state == "ENABLED": plugin_info = protocol_vo.plugin_info.to_dict() - options = plugin_info.get('options', {}) + options = plugin_info.get("options", {}) - _LOGGER.debug(f'[Plugin Initialize] plugin_id: {plugin_info["plugin_id"]} | version: {plugin_info["version"]} ' - f'| domain_id: {domain_id}') + _LOGGER.debug( + f'[Plugin Initialize] plugin_id: {plugin_info["plugin_id"]} | version: {plugin_info["version"]} ' + f"| domain_id: {domain_id}" + ) try: endpoint_info = plugin_mgr.initialize(plugin_info, domain_id) plugin_metadata = plugin_mgr.init_plugin(options) - plugin_info['metadata'] = plugin_metadata + plugin_info["metadata"] = plugin_metadata - if version := endpoint_info.get('updated_version'): - plugin_info['version'] = version + if version := endpoint_info.get("updated_version"): + plugin_info["version"] = version - protocol_mgr = self.locator.get_manager('ProtocolManager') - protocol_mgr.update_protocol_by_vo({'plugin_info': plugin_info}, protocol_vo) + protocol_mgr = self.locator.get_manager("ProtocolManager") + protocol_mgr.update_protocol_by_vo( + {"plugin_info": plugin_info}, protocol_vo + ) except Exception as e: - _LOGGER.error(f'[Notification] Plugin Error: {e}') - - self._dispatch_notification(protocol_vo, secret_data, channel_data, notification_type, - message, options, plugin_mgr) + _LOGGER.error(f"[Notification] Plugin Error: {e}") + + self._dispatch_notification( + protocol_vo, + secret_data, + channel_data, + notification_type, + message, + options, + plugin_mgr, + ) else: - _LOGGER.info('[Notification] Protocol is disabled. skip notification') - - def _dispatch_notification(self, protocol_vo, secret_data, channel_data, notification_type, message, options, plugin_mgr): + _LOGGER.info("[Notification] Protocol is disabled. skip notification") + + def _dispatch_notification( + self, + protocol_vo, + secret_data, + channel_data, + notification_type, + message, + options, + plugin_mgr, + ): month, date = self.get_month_date() - noti_usage_vo, usage_month, usage_date = self.get_notification_usage(protocol_vo, month, date) + noti_usage_vo, usage_month, usage_date = self.get_notification_usage( + protocol_vo, month, date + ) try: self.check_quota(protocol_vo, usage_month, usage_date) - plugin_mgr.dispatch_notification(secret_data, channel_data, notification_type, message, options) + plugin_mgr.dispatch_notification( + secret_data, channel_data, notification_type, message, options + ) self.increment_usage(noti_usage_vo) except Exception as e: self.increment_fail_count(noti_usage_vo) def check_quota(self, protocol_vo, usage_month, usage_date, count=1): - quota_mgr: QuotaManager = self.locator.get_manager('QuotaManager') + quota_mgr: QuotaManager = self.locator.get_manager("QuotaManager") - query = {'filter': [{'k': 'protocol', 'v': protocol_vo, 'o': 'eq'}]} + query = {"filter": [{"k": "protocol", "v": protocol_vo, "o": "eq"}]} quota_results, quota_total_count = quota_mgr.list_quotas(query) if quota_total_count: limit = quota_results[0].limit - self._check_quota_limit(protocol_vo.protocol_id, limit, usage_month, usage_date, count) + self._check_quota_limit( + protocol_vo.protocol_id, limit, usage_month, usage_date, count + ) else: # Check Default Quota plugin_id = protocol_vo.plugin_info.plugin_id if plugin_id in DEFAULT_QUOTA: limit = DEFAULT_QUOTA[plugin_id] - self._check_quota_limit(protocol_vo.protocol_id, limit, usage_month, usage_date, count) + self._check_quota_limit( + protocol_vo.protocol_id, limit, usage_month, usage_date, count + ) def increment_usage(self, noti_usage_vo, count=1): - noti_usage_mgr: NotificationUsageManager = self.locator.get_manager('NotificationUsageManager') - _LOGGER.debug(f"[increment_usage] Incremental Usage Count - Protocol {noti_usage_vo.protocol_id} (count: {count})") + noti_usage_mgr: NotificationUsageManager = self.locator.get_manager( + "NotificationUsageManager" + ) + _LOGGER.debug( + f"[increment_usage] Incremental Usage Count - Protocol {noti_usage_vo.protocol_id} (count: {count})" + ) noti_usage_mgr.incremental_notification_usage(noti_usage_vo, count) def increment_fail_count(self, noti_usage_vo, count=1): - noti_usage_mgr: NotificationUsageManager = self.locator.get_manager('NotificationUsageManager') + noti_usage_mgr: NotificationUsageManager = self.locator.get_manager( + "NotificationUsageManager" + ) _LOGGER.debug( - f"[increment_fail_count] Incremental Fail Count - Protocol {noti_usage_vo.protocol_id} (count: {count})") + f"[increment_fail_count] Incremental Fail Count - Protocol {noti_usage_vo.protocol_id} (count: {count})" + ) noti_usage_mgr.incremental_notification_fail_count(noti_usage_vo, count) def get_notification_usage(self, protocol_vo, month, date): @@ -465,15 +623,20 @@ def get_notification_usage(self, protocol_vo, month, date): usage_date = 0 noti_usage_vo = None - noti_usage_mgr: NotificationUsageManager = self.locator.get_manager('NotificationUsageManager') + noti_usage_mgr: NotificationUsageManager = self.locator.get_manager( + "NotificationUsageManager" + ) month_query = { - 'filter': [ - {'k': 'protocol_id', 'v': protocol_vo.protocol_id, 'o': 'eq'}, - {'k': 'usage_month', 'v': month, 'o': 'eq'} + "filter": [ + {"k": "protocol_id", "v": protocol_vo.protocol_id, "o": "eq"}, + {"k": "usage_month", "v": month, "o": "eq"}, ] } - month_usage_results, month_usage_total_count = noti_usage_mgr.list_notification_usages(month_query) + ( + month_usage_results, + month_usage_total_count, + ) = noti_usage_mgr.list_notification_usages(month_query) for _noti_usage in month_usage_results: usage_month += _noti_usage.count @@ -483,10 +646,10 @@ def get_notification_usage(self, protocol_vo, month, date): if not noti_usage_vo: params = { - 'protocol_id': protocol_vo.protocol_id, - 'usage_month': month, - 'usage_date': date, - 'domain_id': protocol_vo.domain_id + "protocol_id": protocol_vo.protocol_id, + "usage_month": month, + "usage_date": date, + "domain_id": protocol_vo.domain_id, } noti_usage_vo = noti_usage_mgr.create_notification_usage(params) @@ -498,9 +661,11 @@ def check_schedule_for_dispatch(is_scheduled, schedule): now_time = datetime.datetime.utcnow() valid_weekday = check_weekday_schedule(now_time, schedule.day_of_week) - valid_time = check_time_schedule(now_time, schedule.start_hour, schedule.end_hour) + valid_time = check_time_schedule( + now_time, schedule.start_hour, schedule.end_hour + ) - _LOGGER.debug(f'Weekday: {valid_weekday} | Time {valid_time}') + _LOGGER.debug(f"Weekday: {valid_weekday} | Time {valid_time}") if valid_weekday and valid_time: return True @@ -519,8 +684,10 @@ def check_subscribe_for_dispatch(is_subscribe, subscriptions, topic): return False @staticmethod - def check_notification_level_for_dispatch(notification_level, prj_channel_notification_level): - if notification_level == 'ALL': + def check_notification_level_for_dispatch( + notification_level, prj_channel_notification_level + ): + if notification_level == "ALL": return True elif prj_channel_notification_level == notification_level: return True @@ -530,24 +697,28 @@ def check_notification_level_for_dispatch(notification_level, prj_channel_notifi @staticmethod def get_month_date(): now = datetime.datetime.now() - return now.strftime('%Y-%m'), now.strftime('%d') + return now.strftime("%Y-%m"), now.strftime("%d") @staticmethod def _check_quota_limit(protocol_id, limit, usage_month, usage_date, count): - if 'month' in limit: - if limit['month'] != -1 and limit['month'] < (usage_month + count): - raise ERROR_QUOTA_IS_EXCEEDED(protocol_id=protocol_id, - limit=f'limit.month=({usage_month}/{limit["month"]})') - - if 'day' in limit: - if limit['day'] != -1 and limit['day'] < (usage_date + count): - raise ERROR_QUOTA_IS_EXCEEDED(protocol_id=protocol_id, - limit=f'limit.day=({usage_date}/{limit["day"]})') + if "month" in limit: + if limit["month"] != -1 and limit["month"] < (usage_month + count): + raise ERROR_QUOTA_IS_EXCEEDED( + protocol_id=protocol_id, + limit=f'limit.month=({usage_month}/{limit["month"]})', + ) + + if "day" in limit: + if limit["day"] != -1 and limit["day"] < (usage_date + count): + raise ERROR_QUOTA_IS_EXCEEDED( + protocol_id=protocol_id, + limit=f'limit.day=({usage_date}/{limit["day"]})', + ) @staticmethod def get_domain_name(domain_info): - _tags = domain_info.get('tags', {}) - if description := _tags.get('description'): + _tags = domain_info.get("tags", {}) + if description := _tags.get("description"): return description else: - return domain_info['name'] + return domain_info["name"] diff --git a/src/spaceone/notification/service/project_channel_service.py b/src/spaceone/notification/service/project_channel_service.py index d65fb44..34c798b 100644 --- a/src/spaceone/notification/service/project_channel_service.py +++ b/src/spaceone/notification/service/project_channel_service.py @@ -17,19 +17,24 @@ @mutation_handler @event_handler class ProjectChannelService(BaseService): + resource = "ProjectChannel" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.project_channel_mgr: ProjectChannelManager = self.locator.get_manager('ProjectChannelManager') - self.identity_mgr: IdentityManager = self.locator.get_manager('IdentityManager') - self.protocol_mgr: ProtocolManager = self.locator.get_manager('ProtocolManager') - self.secret_mgr: SecretManager = self.locator.get_manager('SecretManager') - - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['protocol_id', 'name', 'data', 'project_id', 'domain_id']) + self.project_channel_mgr: ProjectChannelManager = self.locator.get_manager( + "ProjectChannelManager" + ) + self.identity_mgr: IdentityManager = self.locator.get_manager("IdentityManager") + self.protocol_mgr: ProtocolManager = self.locator.get_manager("ProtocolManager") + self.secret_mgr: SecretManager = self.locator.get_manager("SecretManager") + + @transaction( + permission="notification:ProjectChannel.write", + role_types=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["protocol_id", "name", "data", "project_id", "domain_id"]) def create(self, params): - - """ Create Project Channel + """Create Project Channel Args: params (dict): { 'protocol_id': 'str', @@ -49,56 +54,60 @@ def create(self, params): project_channel_vo (object) """ - protocol_id = params['protocol_id'] - domain_id = params['domain_id'] - data = params['data'] - project_id = params['project_id'] - is_subscribe = params.get('is_subscribe', False) - is_scheduled = params.get('is_scheduled', False) + protocol_id = params["protocol_id"] + domain_id = params["domain_id"] + data = params["data"] + project_id = params["project_id"] + is_subscribe = params.get("is_subscribe", False) + is_scheduled = params.get("is_scheduled", False) if not is_subscribe: - params['subscriptions'] = [] + params["subscriptions"] = [] if is_scheduled: - validate_schedule(params.get('schedule', {})) + validate_schedule(params.get("schedule", {})) else: - params['schedule'] = None + params["schedule"] = None - self.identity_mgr.get_resource(project_id, 'identity.Project', domain_id) + self.identity_mgr.get_resource(project_id, "identity.Project", domain_id) protocol_vo = self.protocol_mgr.get_protocol(protocol_id, domain_id) - if protocol_vo.state == 'DISABLED': + if protocol_vo.state == "DISABLED": raise ERROR_PROTOCOL_DISABLED() metadata = protocol_vo.plugin_info.metadata - schema = metadata.get('data', {}).get('schema') + schema = metadata.get("data", {}).get("schema") if schema: validate_json_schema(data, schema) - if metadata['data_type'] == 'SECRET': + if metadata["data_type"] == "SECRET": new_secret_parameters = { - 'name': utils.generate_id('project-ch', 4), - 'secret_type': 'CREDENTIALS', - 'data': data, - 'project_id': project_id, - 'domain_id': domain_id + "name": utils.generate_id("project-ch", 4), + "secret_type": "CREDENTIALS", + "data": data, + "project_id": project_id, + "domain_id": domain_id, } - project_channel_secret = self.secret_mgr.create_secret(new_secret_parameters) + project_channel_secret = self.secret_mgr.create_secret( + new_secret_parameters + ) - params.update({ - 'secret_id': project_channel_secret['secret_id'], - 'data': {} - }) + params.update( + {"secret_id": project_channel_secret["secret_id"], "data": {}} + ) # Create Project Channel return self.project_channel_mgr.create_project_channel(params) - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['project_channel_id', 'domain_id']) + @transaction( + permission="notification:ProjectChannel.write", + role_types=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["project_channel_id", "domain_id"]) def update(self, params): - """ Update project channel + """Update project channel Args: params (dict): { @@ -113,35 +122,44 @@ def update(self, params): Returns: project_channel_vo (object) """ - project_channel_id = params['project_channel_id'] - domain_id = params['domain_id'] + project_channel_id = params["project_channel_id"] + domain_id = params["domain_id"] - project_channel_vo: ProjectChannel = self.project_channel_mgr.get_project_channel(project_channel_id, domain_id) + project_channel_vo: ProjectChannel = ( + self.project_channel_mgr.get_project_channel(project_channel_id, domain_id) + ) - if 'data' in params: - protocol_vo = self.protocol_mgr.get_protocol(project_channel_vo.protocol_id, domain_id) + if "data" in params: + protocol_vo = self.protocol_mgr.get_protocol( + project_channel_vo.protocol_id, domain_id + ) metadata = protocol_vo.plugin_info.metadata - schema = metadata.get('data', {}).get('schema') + schema = metadata.get("data", {}).get("schema") if schema: - validate_json_schema(params['data'], schema) + validate_json_schema(params["data"], schema) if project_channel_vo.secret_id: secret_params = { - 'secret_id': project_channel_vo.secret_id, - 'data': params['data'], - 'domain_id': domain_id + "secret_id": project_channel_vo.secret_id, + "data": params["data"], + "domain_id": domain_id, } self.secret_mgr.update_secret_data(secret_params) - params['data'] = {} + params["data"] = {} - return self.project_channel_mgr.update_project_channel_by_vo(params, project_channel_vo) + return self.project_channel_mgr.update_project_channel_by_vo( + params, project_channel_vo + ) - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['project_channel_id', 'domain_id']) + @transaction( + permission="notification:ProjectChannel.write", + role_types=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["project_channel_id", "domain_id"]) def set_schedule(self, params): - """ Set schedule for Project Channel + """Set schedule for Project Channel Args: params (dict): { @@ -154,25 +172,28 @@ def set_schedule(self, params): Returns: project_channel_vo (object) """ - project_channel_vo = self.project_channel_mgr.get_project_channel(params['project_channel_id'], - params['domain_id']) + project_channel_vo = self.project_channel_mgr.get_project_channel( + params["project_channel_id"], params["domain_id"] + ) - is_scheduled = params.get('is_scheduled', False) + is_scheduled = params.get("is_scheduled", False) if is_scheduled: - validate_schedule(params.get('schedule', {})) + validate_schedule(params.get("schedule", {})) else: - params.update({ - 'is_scheduled': False, - 'schedule': None - }) + params.update({"is_scheduled": False, "schedule": None}) - return self.project_channel_mgr.update_project_channel_by_vo(params, project_channel_vo) + return self.project_channel_mgr.update_project_channel_by_vo( + params, project_channel_vo + ) - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['project_channel_id', 'domain_id']) + @transaction( + permission="notification:ProjectChannel.write", + role_types=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["project_channel_id", "domain_id"]) def set_subscription(self, params): - """ Set subscriptions for Project Channel + """Set subscriptions for Project Channel Args: params (dict): { @@ -185,18 +206,18 @@ def set_subscription(self, params): Returns: project_channel_vo (object) """ - if not params.get('is_subscribe', False): - params.update({ - 'is_subscribe': False, - 'subscriptions': [] - }) + if not params.get("is_subscribe", False): + params.update({"is_subscribe": False, "subscriptions": []}) return self.project_channel_mgr.update_project_channel(params) - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['project_channel_id', 'domain_id']) + @transaction( + permission="notification:ProjectChannel.write", + role_types=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["project_channel_id", "domain_id"]) def delete(self, params): - """ Delete project channel + """Delete project channel Args: params (dict): { @@ -207,20 +228,27 @@ def delete(self, params): Returns: None """ - project_channel_id = params['project_channel_id'] - domain_id = params['domain_id'] + project_channel_id = params["project_channel_id"] + domain_id = params["domain_id"] - project_channel_vo = self.project_channel_mgr.get_project_channel(project_channel_id, domain_id) + project_channel_vo = self.project_channel_mgr.get_project_channel( + project_channel_id, domain_id + ) if secret_id := project_channel_vo.secret_id: - self.secret_mgr.delete_secret({'secret_id': secret_id, 'domain_id': domain_id}) + self.secret_mgr.delete_secret( + {"secret_id": secret_id, "domain_id": domain_id} + ) self.project_channel_mgr.delete_project_channel_by_vo(project_channel_vo) - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['project_channel_id', 'domain_id']) + @transaction( + permission="notification:ProjectChannel.write", + role_types=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["project_channel_id", "domain_id"]) def enable(self, params): - """ Enable project channel + """Enable project channel Args: params (dict): { @@ -232,12 +260,17 @@ def enable(self, params): project_channel_vo (object) """ - return self.project_channel_mgr.enable_project_channel(params['project_channel_id'], params['domain_id']) + return self.project_channel_mgr.enable_project_channel( + params["project_channel_id"], params["domain_id"] + ) - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['project_channel_id', 'domain_id']) + @transaction( + permission="notification:ProjectChannel.write", + role_types=["WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["project_channel_id", "domain_id"]) def disable(self, params): - """ Disable project channel + """Disable project channel Args: params (dict): { @@ -249,12 +282,17 @@ def disable(self, params): project_channel_vo (object) """ - return self.project_channel_mgr.disable_project_channel(params['project_channel_id'], params['domain_id']) + return self.project_channel_mgr.disable_project_channel( + params["project_channel_id"], params["domain_id"] + ) - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['project_channel_id', 'domain_id']) + @transaction( + permission="notification:ProjectChannel.read", + role_types=["DOMAIN_OWNER", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["project_channel_id", "domain_id"]) def get(self, params): - """ Get Project Channel + """Get Project Channel Args: params (dict): { @@ -266,17 +304,33 @@ def get(self, params): project_channel_vo (object) """ - return self.project_channel_mgr.get_project_channel(params['project_channel_id'], - params['domain_id'], - params.get('only')) - - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['domain_id']) - @append_query_filter(['project_channel_id', 'name', 'state', 'secret_id', 'is_subscribe', 'is_scheduled', - 'notification_level', 'protocol_id', 'project_id', 'domain_id', 'user_projects']) - @append_keyword_filter(['project_channel_id', 'name']) + return self.project_channel_mgr.get_project_channel( + params["project_channel_id"], params["domain_id"], params.get("only") + ) + + @transaction( + permission="notification:ProjectChannel.read", + role_types=["DOMAIN_OWNER", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["domain_id"]) + @append_query_filter( + [ + "project_channel_id", + "name", + "state", + "secret_id", + "is_subscribe", + "is_scheduled", + "notification_level", + "protocol_id", + "project_id", + "domain_id", + "user_projects", + ] + ) + @append_keyword_filter(["project_channel_id", "name"]) def list(self, params): - """ List Project Channels + """List Project Channels Args: params (dict): { @@ -298,13 +352,16 @@ def list(self, params): results (list): 'list of project_channel_vo' total_count (int) """ - query = params.get('query', {}) + query = params.get("query", {}) return self.project_channel_mgr.list_project_channels(query) - @transaction(append_meta={'authorization.scope': 'PROJECT'}) - @check_required(['query', 'domain_id']) - @append_query_filter(['domain_id', 'user_projects']) - @append_keyword_filter(['project_channel_id', 'name']) + @transaction( + permission="notification:ProjectChannel.read", + role_types=["DOMAIN_OWNER", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["query", "domain_id"]) + @append_query_filter(["domain_id", "user_projects"]) + @append_keyword_filter(["project_channel_id", "name"]) def stat(self, params): """ Args: @@ -318,5 +375,5 @@ def stat(self, params): values (list): 'list of statistics data' total_count (int) """ - query = params.get('query', {}) + query = params.get("query", {}) return self.project_channel_mgr.stat_project_channels(query) diff --git a/src/spaceone/notification/service/protocol_service.py b/src/spaceone/notification/service/protocol_service.py index 5b1eabd..4ec41d0 100644 --- a/src/spaceone/notification/service/protocol_service.py +++ b/src/spaceone/notification/service/protocol_service.py @@ -14,7 +14,6 @@ from spaceone.notification.model import Protocol from spaceone.notification.conf.protocol_conf import * - _LOGGER = logging.getLogger(__name__) @@ -23,15 +22,16 @@ @mutation_handler @event_handler class ProtocolService(BaseService): + resource = "Protocol" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.protocol_mgr: ProtocolManager = self.locator.get_manager('ProtocolManager') + self.protocol_mgr: ProtocolManager = self.locator.get_manager("ProtocolManager") - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['name', 'plugin_info', 'domain_id']) + @transaction(permission="notification:Protocol.write", role_types=["DOMAIN_ADMIN"]) + @check_required(["name", "plugin_info", "domain_id"]) def create(self, params): - """ Create Protocol + """Create Protocol Args: params (dict): { 'name': 'str', @@ -47,17 +47,19 @@ def create(self, params): return self._create(params) def _create(self, params): - domain_id = params['domain_id'] - plugin_info = params['plugin_info'] + domain_id = params["domain_id"] + plugin_info = params["plugin_info"] self._check_plugin_info(plugin_info) _plugin = self._get_plugin(plugin_info, domain_id) - plugin_capability = _plugin.get('capability', {}) + plugin_capability = _plugin.get("capability", {}) - if 'supported_schema' in plugin_capability: - params['capability'] = {'supported_schema': plugin_capability['supported_schema']} + if "supported_schema" in plugin_capability: + params["capability"] = { + "supported_schema": plugin_capability["supported_schema"] + } else: - raise ERROR_WRONG_PLUGIN_SETTINGS(key='capability.supported_schema') + raise ERROR_WRONG_PLUGIN_SETTINGS(key="capability.supported_schema") _LOGGER.debug(f'[create] capability: {params["capability"]}') _LOGGER.debug(f'[create] name: {params["name"]}') @@ -65,41 +67,41 @@ def _create(self, params): plugin_metadata, endpoint_info = self._init_plugin(plugin_info, domain_id) request_plugin = { - 'plugin_id': plugin_info['plugin_id'], - 'options': plugin_info.get('options', {}), - 'metadata': plugin_metadata + "plugin_id": plugin_info["plugin_id"], + "options": plugin_info.get("options", {}), + "metadata": plugin_metadata, } - if version := endpoint_info.get('updated_version', plugin_info.get('version')): - request_plugin.update({ - 'version': version - }) + if version := endpoint_info.get("updated_version", plugin_info.get("version")): + request_plugin.update({"version": version}) - if 'secret_data' in plugin_info: - secret_mgr: SecretManager = self.locator.get_manager('SecretManager') + if "secret_data" in plugin_info: + secret_mgr: SecretManager = self.locator.get_manager("SecretManager") secret_params = { - 'name': utils.generate_id('secret-noti-proto', 4), - 'secret_type': 'CREDENTIALS', - 'data': plugin_info['secret_data'], - 'schema': plugin_info['schema'], - 'domain_id': domain_id + "name": utils.generate_id("secret-noti-proto", 4), + "secret_type": "CREDENTIALS", + "data": plugin_info["secret_data"], + "schema": plugin_info["schema"], + "domain_id": domain_id, } protocol_secret = secret_mgr.create_secret(secret_params) - request_plugin.update({ - 'secret_id': protocol_secret['secret_id'], - 'schema': plugin_info['schema'] - }) - - params['plugin_info'] = request_plugin + request_plugin.update( + { + "secret_id": protocol_secret["secret_id"], + "schema": plugin_info["schema"], + } + ) + + params["plugin_info"] = request_plugin protocol_vo: Protocol = self.protocol_mgr.create_protocol(params) return protocol_vo - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['protocol_id', 'domain_id']) + @transaction(permission="notification:Protocol.write", role_types=["DOMAIN_ADMIN"]) + @check_required(["protocol_id", "domain_id"]) def update(self, params): - """ Update protocol + """Update protocol Args: params (dict): { @@ -113,20 +115,20 @@ def update(self, params): protocol_vo (object) """ - domain_id = params['domain_id'] - protocol_id = params['protocol_id'] + domain_id = params["domain_id"] + protocol_id = params["protocol_id"] protocol_vo = self.protocol_mgr.get_protocol(protocol_id, domain_id) - if protocol_vo.protocol_type == 'INTERNAL': + if protocol_vo.protocol_type == "INTERNAL": raise ERROR_NOT_ALLOWED_UPDATE_PROTOCOL_TYPE(protocol_id=protocol_id) return self.protocol_mgr.update_protocol_by_vo(params, protocol_vo) - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['protocol_id', 'domain_id']) + @transaction(permission="notification:Protocol.write", role_types=["DOMAIN_ADMIN"]) + @check_required(["protocol_id", "domain_id"]) def update_plugin(self, params): - """ Update protocol plugin + """Update protocol plugin Args: params (dict): { @@ -139,48 +141,48 @@ def update_plugin(self, params): Returns: protocol_vo (object) """ - protocol_id = params['protocol_id'] - domain_id = params['domain_id'] + protocol_id = params["protocol_id"] + domain_id = params["domain_id"] protocol_vo = self.protocol_mgr.get_protocol(protocol_id, domain_id) plugin_info = protocol_vo.plugin_info.to_dict() - if plugin_info['upgrade_mode'] == 'AUTO': + if plugin_info["upgrade_mode"] == "AUTO": plugin_metadata, endpoint_info = self._init_plugin(plugin_info, domain_id) - plugin_info['metadata'] = plugin_metadata + plugin_info["metadata"] = plugin_metadata - if version := endpoint_info.get('updated_version'): - plugin_info['version'] = version + if version := endpoint_info.get("updated_version"): + plugin_info["version"] = version else: - if version := params.get('version'): + if version := params.get("version"): # Update plugin_version - plugin_id = plugin_info['plugin_id'] + plugin_id = plugin_info["plugin_id"] - repo_mgr = self.locator.get_manager('RepositoryManager') + repo_mgr = self.locator.get_manager("RepositoryManager") repo_mgr.check_plugin_version(plugin_id, version, domain_id) - plugin_info['version'] = version - plugin_info['metadata'] = self._init_plugin(plugin_info, domain_id) + plugin_info["version"] = version + plugin_info["metadata"] = self._init_plugin(plugin_info, domain_id) - if options := params.get('options', {}): + if options := params.get("options", {}): # Overwrite - plugin_info['options'] = options + plugin_info["options"] = options params = { - 'protocol_id': protocol_id, - 'domain_id': domain_id, - 'plugin_info': plugin_info + "protocol_id": protocol_id, + "domain_id": domain_id, + "plugin_info": plugin_info, } - _LOGGER.debug(f'[update_plugin] {plugin_info}') + _LOGGER.debug(f"[update_plugin] {plugin_info}") return self.protocol_mgr.update_protocol_by_vo(params, protocol_vo) - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['protocol_id', 'domain_id']) + @transaction(permission="notification:Protocol.write", role_types=["DOMAIN_ADMIN"]) + @check_required(["protocol_id", "domain_id"]) def delete(self, params): - """ Delete protocol + """Delete protocol Args: params (dict): { @@ -191,22 +193,22 @@ def delete(self, params): Returns: None """ - protocol_id = params['protocol_id'] - domain_id = params['domain_id'] + protocol_id = params["protocol_id"] + domain_id = params["domain_id"] protocol_vo: Protocol = self.protocol_mgr.get_protocol(protocol_id, domain_id) self.check_existed_channel_using_protocol(protocol_vo) if secret_id := protocol_vo.plugin_info.secret_id: - secret_mgr: SecretManager = self.locator.get_manager('SecretManager') - secret_mgr.delete_secret({'secret_id': secret_id, 'domain_id': domain_id}) + secret_mgr: SecretManager = self.locator.get_manager("SecretManager") + secret_mgr.delete_secret({"secret_id": secret_id, "domain_id": domain_id}) return self.protocol_mgr.delete_protocol_by_vo(protocol_vo) - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['protocol_id', 'domain_id']) + @transaction(permission="notification:Protocol.write", role_types=["DOMAIN_ADMIN"]) + @check_required(["protocol_id", "domain_id"]) def enable(self, params): - """ Enable protocol + """Enable protocol Args: params (dict): { @@ -218,12 +220,14 @@ def enable(self, params): protocol_vo (object) """ - return self.protocol_mgr.enable_protocol(params['protocol_id'], params['domain_id']) + return self.protocol_mgr.enable_protocol( + params["protocol_id"], params["domain_id"] + ) - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['protocol_id', 'domain_id']) + @transaction(permission="notification:Protocol.write", role_types=["DOMAIN_ADMIN"]) + @check_required(["protocol_id", "domain_id"]) def disable(self, params): - """ Disable protocol + """Disable protocol Args: params (dict): { @@ -235,12 +239,17 @@ def disable(self, params): protocol_vo (object) """ - return self.protocol_mgr.disable_protocol(params['protocol_id'], params['domain_id']) + return self.protocol_mgr.disable_protocol( + params["protocol_id"], params["domain_id"] + ) - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['protocol_id', 'domain_id']) + @transaction( + permission="notification:Protocol.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["protocol_id", "domain_id"]) def get(self, params): - """ Get Protocol + """Get Protocol Args: params (dict): { @@ -251,21 +260,26 @@ def get(self, params): Returns: domain_vo (object) """ - protocol_id = params['protocol_id'] - domain_id = params['domain_id'] + protocol_id = params["protocol_id"] + domain_id = params["domain_id"] # Create Default Protocol if protocol is not exited self._create_default_protocol(domain_id) self._initialize_protocols(domain_id) - return self.protocol_mgr.get_protocol(protocol_id, domain_id, params.get('only')) - - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['domain_id']) - @append_query_filter(['protocol_id', 'name', 'state', 'protocol_type', 'domain_id']) - @append_keyword_filter(['protocol_id', 'name']) + return self.protocol_mgr.get_protocol( + protocol_id, domain_id, params.get("only") + ) + + @transaction( + permission="notification:Protocol.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["domain_id"]) + @append_query_filter(["protocol_id", "name", "state", "protocol_type", "domain_id"]) + @append_keyword_filter(["protocol_id", "name"]) def list(self, params): - """ List protocol + """List protocol Args: params (dict): { @@ -281,8 +295,8 @@ def list(self, params): results (list): 'list of protocol_vo' total_count (int) """ - domain_id = params['domain_id'] - query = params.get('query', {}) + domain_id = params["domain_id"] + query = params.get("query", {}) # Create Default Protocol if protocol is not exited self._create_default_protocol(domain_id) @@ -290,10 +304,13 @@ def list(self, params): return self.protocol_mgr.list_protocols(query) - @transaction(append_meta={'authorization.scope': 'DOMAIN'}) - @check_required(['query', 'domain_id']) - @append_query_filter(['domain_id']) - @append_keyword_filter(['protocol_id', 'name']) + @transaction( + permission="notification:Protocol.read", + role_types=["DOMAIN_ADMIN", "WORKSPACE_OWNER", "WORKSPACE_MEMBER"], + ) + @check_required(["query", "domain_id"]) + @append_query_filter(["domain_id"]) + @append_keyword_filter(["protocol_id", "name"]) def stat(self, params): """ Args: @@ -307,89 +324,113 @@ def stat(self, params): total_count (int) """ - query = params.get('query', {}) + query = params.get("query", {}) return self.protocol_mgr.stat_protocols(query) def _get_plugin(self, plugin_info, domain_id): - plugin_id = plugin_info['plugin_id'] + plugin_id = plugin_info["plugin_id"] - repo_mgr: RepositoryManager = self.locator.get_manager('RepositoryManager') + repo_mgr: RepositoryManager = self.locator.get_manager("RepositoryManager") plugin_info = repo_mgr.get_plugin(plugin_id, domain_id) - if version := plugin_info.get('version'): + if version := plugin_info.get("version"): repo_mgr.check_plugin_version(plugin_id, version, domain_id) return plugin_info def _init_plugin(self, plugin_info, domain_id): - options = plugin_info.get('options', {}) + options = plugin_info.get("options", {}) - plugin_mgr: PluginManager = self.locator.get_manager('PluginManager') + plugin_mgr: PluginManager = self.locator.get_manager("PluginManager") endpoint_info = plugin_mgr.initialize(plugin_info, domain_id) metadata = plugin_mgr.init_plugin(options) return metadata, endpoint_info def check_existed_channel_using_protocol(self, protocol_vo): - project_channel_mgr: ProjectChannelManager = self.locator.get_manager('ProjectChannelManager') - user_channel_mgr: UserChannelManager = self.locator.get_manager('UserChannelManager') - - query = {'filter': [{'k': 'protocol_id', 'v': protocol_vo.protocol_id, 'o': 'eq'}]} + project_channel_mgr: ProjectChannelManager = self.locator.get_manager( + "ProjectChannelManager" + ) + user_channel_mgr: UserChannelManager = self.locator.get_manager( + "UserChannelManager" + ) + + query = { + "filter": [{"k": "protocol_id", "v": protocol_vo.protocol_id, "o": "eq"}] + } - project_channel_vos, prj_ch_total_count = project_channel_mgr.list_project_channels(query) - user_channel_vos, user_ch_total_count = user_channel_mgr.list_user_channels(query) + ( + project_channel_vos, + prj_ch_total_count, + ) = project_channel_mgr.list_project_channels(query) + user_channel_vos, user_ch_total_count = user_channel_mgr.list_user_channels( + query + ) if prj_ch_total_count > 0 or user_ch_total_count > 0: - raise EROR_DELETE_PROJECT_EXITED_CHANNEL(protocol_id=protocol_vo.protocol_id) + raise EROR_DELETE_PROJECT_EXITED_CHANNEL( + protocol_id=protocol_vo.protocol_id + ) - @cache.cacheable(key='default-protocol:{domain_id}', expire=300) + @cache.cacheable(key="default-protocol:{domain_id}", expire=300) def _create_default_protocol(self, domain_id): - _LOGGER.debug(f'[_create_default_protocol] domain_id: {domain_id}') + _LOGGER.debug(f"[_create_default_protocol] domain_id: {domain_id}") - query = {'filter': [{'k': 'domain_id', 'v': domain_id, 'o': 'eq'}]} + query = {"filter": [{"k": "domain_id", "v": domain_id, "o": "eq"}]} protocol_vos, total_count = self.protocol_mgr.list_protocols(query) installed_protocol_names = [protocol_vo.name for protocol_vo in protocol_vos] - _LOGGER.debug(f'[_create_default_protocol] Installed Plugins : {installed_protocol_names}') + _LOGGER.debug( + f"[_create_default_protocol] Installed Plugins : {installed_protocol_names}" + ) for _protocol_data in DEFAULT_INTERNAL_PROTOCOLS: - if _protocol_data['name'] not in installed_protocol_names: + if _protocol_data["name"] not in installed_protocol_names: _LOGGER.debug(f'Create default protocol: {_protocol_data["name"]}') - _protocol_data['domain_id'] = domain_id + _protocol_data["domain_id"] = domain_id self.protocol_mgr.create_protocol(_protocol_data) return True - @cache.cacheable(key='init-protocol:{domain_id}', expire=300) + @cache.cacheable(key="init-protocol:{domain_id}", expire=300) def _initialize_protocols(self, domain_id): - _LOGGER.debug(f'[_initialize_protocol] domain_id: {domain_id}') + _LOGGER.debug(f"[_initialize_protocol] domain_id: {domain_id}") - query = {'filter': [{'k': 'domain_id', 'v': domain_id, 'o': 'eq'}]} + query = {"filter": [{"k": "domain_id", "v": domain_id, "o": "eq"}]} protocol_vos, total_count = self.protocol_mgr.list_protocols(query) - installed_protocol_ids = [protocol_vo.plugin_info.plugin_id for protocol_vo in protocol_vos] - _LOGGER.debug(f'[_initialize_protocol] Installed Plugins : {installed_protocol_ids}') + installed_protocol_ids = [ + protocol_vo.plugin_info.plugin_id for protocol_vo in protocol_vos + ] + _LOGGER.debug( + f"[_initialize_protocol] Installed Plugins : {installed_protocol_ids}" + ) global_conf = config.get_global() - for _protocol_data in global_conf.get('INSTALLED_PROTOCOL_PLUGINS', []): - if _protocol_data['plugin_info']['plugin_id'] not in installed_protocol_ids: + for _protocol_data in global_conf.get("INSTALLED_PROTOCOL_PLUGINS", []): + if _protocol_data["plugin_info"]["plugin_id"] not in installed_protocol_ids: try: - _LOGGER.debug(f'[_initialize_protocol] Create init protocol: {_protocol_data["plugin_info"]["plugin_id"]}') - _protocol_data['domain_id'] = domain_id + _LOGGER.debug( + f'[_initialize_protocol] Create init protocol: {_protocol_data["plugin_info"]["plugin_id"]}' + ) + _protocol_data["domain_id"] = domain_id self._create(_protocol_data) except Exception as e: - _LOGGER.error(f'[_initialize_protocol] {e}') + _LOGGER.error(f"[_initialize_protocol] {e}") return True @staticmethod def _check_plugin_info(plugin_info_params): - if 'plugin_id' not in plugin_info_params: - raise ERROR_REQUIRED_PARAMETER(key='plugin_info.plugin_id') - - if 'secret_data' in plugin_info_params and 'schema' not in plugin_info_params: - raise ERROR_REQUIRED_PARAMETER(key='plugin_info.schema') - - if 'upgrade_mode' in plugin_info_params and plugin_info_params['upgrade_mode'] == 'MANUAL': - if 'version' not in plugin_info_params: - raise ERROR_REQUIRED_PARAMETER(key='plugin_info.version') + if "plugin_id" not in plugin_info_params: + raise ERROR_REQUIRED_PARAMETER(key="plugin_info.plugin_id") + + if "secret_data" in plugin_info_params and "schema" not in plugin_info_params: + raise ERROR_REQUIRED_PARAMETER(key="plugin_info.schema") + + if ( + "upgrade_mode" in plugin_info_params + and plugin_info_params["upgrade_mode"] == "MANUAL" + ): + if "version" not in plugin_info_params: + raise ERROR_REQUIRED_PARAMETER(key="plugin_info.version") diff --git a/src/spaceone/notification/service/user_channel_service.py b/src/spaceone/notification/service/user_channel_service.py index f38307d..7dbef44 100644 --- a/src/spaceone/notification/service/user_channel_service.py +++ b/src/spaceone/notification/service/user_channel_service.py @@ -16,18 +16,24 @@ @mutation_handler @event_handler class UserChannelService(BaseService): + resource = "UserChannel" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - self.user_channel_mgr: UserChannelManager = self.locator.get_manager('UserChannelManager') - self.identity_mgr: IdentityManager = self.locator.get_manager('IdentityManager') - self.protocol_mgr: ProtocolManager = self.locator.get_manager('ProtocolManager') - self.secret_mgr: SecretManager = self.locator.get_manager('SecretManager') - - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['protocol_id', 'name', 'data', 'user_id', 'domain_id']) + self.user_channel_mgr: UserChannelManager = self.locator.get_manager( + "UserChannelManager" + ) + self.identity_mgr: IdentityManager = self.locator.get_manager("IdentityManager") + self.protocol_mgr: ProtocolManager = self.locator.get_manager("ProtocolManager") + self.secret_mgr: SecretManager = self.locator.get_manager("SecretManager") + + @transaction( + permission="notification:UserChannel.write", + role_types=["USER"], + ) + @check_required(["protocol_id", "name", "data", "user_id", "domain_id"]) def create(self, params): - """ Create user Channel + """Create user Channel Args: params (dict): { @@ -47,54 +53,54 @@ def create(self, params): user_channel_vo (object) """ - protocol_id = params['protocol_id'] - domain_id = params['domain_id'] - data = params['data'] - user_id = params['user_id'] - is_subscribe = params.get('is_subscribe', False) - is_scheduled = params.get('is_scheduled', False) + protocol_id = params["protocol_id"] + domain_id = params["domain_id"] + data = params["data"] + user_id = params["user_id"] + is_subscribe = params.get("is_subscribe", False) + is_scheduled = params.get("is_scheduled", False) if not is_subscribe: - params['subscriptions'] = [] + params["subscriptions"] = [] if is_scheduled: - validate_schedule(params.get('schedule', {})) + validate_schedule(params.get("schedule", {})) else: - params['schedule'] = None + params["schedule"] = None - self.identity_mgr.get_resource(user_id, 'identity.User', domain_id) + self.identity_mgr.get_resource(user_id, "identity.User", domain_id) protocol_vo: Protocol = self.protocol_mgr.get_protocol(protocol_id, domain_id) - if protocol_vo.state == 'DISABLED': + if protocol_vo.state == "DISABLED": raise ERROR_PROTOCOL_DISABLED() - if protocol_vo.protocol_type == 'INTERNAL': + if protocol_vo.protocol_type == "INTERNAL": raise ERROR_PROTOCOL_INTERNVAL() metadata = protocol_vo.plugin_info.metadata - validate_json_schema(metadata.get('data', {}).get('schema', {}), data) + validate_json_schema(metadata.get("data", {}).get("schema", {}), data) - if metadata['data_type'] == 'SECRET': + if metadata["data_type"] == "SECRET": new_secret_parameters = { - 'name': utils.generate_id('user-ch', 4), - 'secret_type': 'CREDENTIALS', - 'data': data, - 'domain_id': domain_id + "name": utils.generate_id("user-ch", 4), + "secret_type": "CREDENTIALS", + "data": data, + "domain_id": domain_id, } user_channel_secret = self.secret_mgr.create_secret(new_secret_parameters) - params.update({ - 'secret_id': user_channel_secret['secret_id'], - 'data': {} - }) + params.update({"secret_id": user_channel_secret["secret_id"], "data": {}}) return self.user_channel_mgr.create_user_channel(params) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['user_channel_id', 'domain_id']) + @transaction( + permission="notification:UserChannel.write", + role_types=["USER"], + ) + @check_required(["user_channel_id", "domain_id"]) def update(self, params): - """ Update user channel + """Update user channel Args: params (dict): { @@ -109,25 +115,30 @@ def update(self, params): user_channel_vo (object) """ - user_channel_id = params['user_channel_id'] - domain_id = params['domain_id'] + user_channel_id = params["user_channel_id"] + domain_id = params["domain_id"] - user_channel_vo: UserChannel = self.user_channel_mgr.get_user_channel(user_channel_id, domain_id) + user_channel_vo: UserChannel = self.user_channel_mgr.get_user_channel( + user_channel_id, domain_id + ) - if 'data' in params and user_channel_vo.secret_id: + if "data" in params and user_channel_vo.secret_id: secret_params = { - 'secret_id': user_channel_vo.secret_id, - 'data': params['data'], - 'domain_id': domain_id + "secret_id": user_channel_vo.secret_id, + "data": params["data"], + "domain_id": domain_id, } self.secret_mgr.update_secret_data(secret_params) - params['data'] = {} + params["data"] = {} return self.user_channel_mgr.update_user_channel_by_vo(params, user_channel_vo) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['user_channel_id', 'domain_id']) + @transaction( + permission="notification:UserChannel.write", + role_types=["USER"], + ) + @check_required(["user_channel_id", "domain_id"]) def set_schedule(self, params): """ set_schedule @@ -143,22 +154,24 @@ def set_schedule(self, params): user_channel_vo (object) """ - user_channel_vo = self.user_channel_mgr.get_user_channel(params['user_channel_id'], params['domain_id']) + user_channel_vo = self.user_channel_mgr.get_user_channel( + params["user_channel_id"], params["domain_id"] + ) - is_scheduled = params.get('is_scheduled', False) + is_scheduled = params.get("is_scheduled", False) if is_scheduled: - validate_schedule(params.get('schedule', {})) + validate_schedule(params.get("schedule", {})) else: - params.update({ - 'is_scheduled': False, - 'schedule': None - }) + params.update({"is_scheduled": False, "schedule": None}) return self.user_channel_mgr.update_user_channel_by_vo(params, user_channel_vo) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['user_channel_id', 'domain_id']) + @transaction( + permission="notification:UserChannel.write", + role_types=["USER"], + ) + @check_required(["user_channel_id", "domain_id"]) def set_subscription(self, params): """ set_subscription @@ -173,18 +186,18 @@ def set_subscription(self, params): Returns: user_channel_vo (object) """ - if not params.get('is_subscribe', False): - params.update({ - 'is_subscribe': False, - 'subscriptions': [] - }) + if not params.get("is_subscribe", False): + params.update({"is_subscribe": False, "subscriptions": []}) return self.user_channel_mgr.update_user_channel(params) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['user_channel_id', 'domain_id']) + @transaction( + permission="notification:UserChannel.write", + role_types=["USER"], + ) + @check_required(["user_channel_id", "domain_id"]) def delete(self, params): - """ Delete user channel + """Delete user channel Args: params (dict): { @@ -195,20 +208,27 @@ def delete(self, params): Returns: None """ - user_channel_id = params['user_channel_id'] - domain_id = params['domain_id'] + user_channel_id = params["user_channel_id"] + domain_id = params["domain_id"] - user_channel_vo = self.user_channel_mgr.get_user_channel(user_channel_id, domain_id) + user_channel_vo = self.user_channel_mgr.get_user_channel( + user_channel_id, domain_id + ) if secret_id := user_channel_vo.secret_id: - self.secret_mgr.delete_secret({'secret_id': secret_id, 'domain_id': domain_id}) + self.secret_mgr.delete_secret( + {"secret_id": secret_id, "domain_id": domain_id} + ) self.user_channel_mgr.delete_user_channel_by_vo(user_channel_vo) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['user_channel_id', 'domain_id']) + @transaction( + permission="notification:UserChannel.write", + role_types=["USER"], + ) + @check_required(["user_channel_id", "domain_id"]) def enable(self, params): - """ Enable user channel + """Enable user channel Args: params (dict): { @@ -220,12 +240,17 @@ def enable(self, params): user_channel_vo (object) """ - return self.user_channel_mgr.enable_user_channel(params['user_channel_id'], params['domain_id']) + return self.user_channel_mgr.enable_user_channel( + params["user_channel_id"], params["domain_id"] + ) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['user_channel_id', 'domain_id']) + @transaction( + permission="notification:UserChannel.write", + role_types=["USER"], + ) + @check_required(["user_channel_id", "domain_id"]) def disable(self, params): - """ Disable user channel + """Disable user channel Args: params (dict): { @@ -237,12 +262,17 @@ def disable(self, params): user_channel_vo (object) """ - return self.user_channel_mgr.disable_user_channel(params['user_channel_id'], params['domain_id']) + return self.user_channel_mgr.disable_user_channel( + params["user_channel_id"], params["domain_id"] + ) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['user_channel_id', 'domain_id']) + @transaction( + permission="notification:UserChannel.read", + role_types=["USER"], + ) + @check_required(["user_channel_id", "domain_id"]) def get(self, params): - """ Get User Channel + """Get User Channel Args: params (dict): { @@ -254,15 +284,30 @@ def get(self, params): user_channel_vo (object) """ - return self.user_channel_mgr.get_user_channel(params['user_channel_id'], params['domain_id'], params.get('only')) - - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['domain_id']) - @append_query_filter(['user_channel_id', 'name', 'state', 'secret_id', 'protocol_id', 'user_id', 'domain_id', - 'user_self']) - @append_keyword_filter(['user_channel_id', 'name']) + return self.user_channel_mgr.get_user_channel( + params["user_channel_id"], params["domain_id"], params.get("only") + ) + + @transaction( + permission="notification:UserChannel.read", + role_types=["USER"], + ) + @check_required(["domain_id"]) + @append_query_filter( + [ + "user_channel_id", + "name", + "state", + "secret_id", + "protocol_id", + "user_id", + "domain_id", + "user_self", + ] + ) + @append_keyword_filter(["user_channel_id", "name"]) def list(self, params): - """ List User Channels + """List User Channels Args: params (dict): { @@ -281,13 +326,16 @@ def list(self, params): total_count (int) """ - query = params.get('query', {}) + query = params.get("query", {}) return self.user_channel_mgr.list_user_channels(query) - @transaction(append_meta={'authorization.scope': 'USER'}) - @check_required(['query', 'domain_id']) - @append_query_filter(['domain_id', 'user_self']) - @append_keyword_filter(['user_channel_id', 'name']) + @transaction( + permission="notification:UserChannel.read", + role_types=["USER"], + ) + @check_required(["query", "domain_id"]) + @append_query_filter(["domain_id", "user_self"]) + @append_keyword_filter(["user_channel_id", "name"]) def stat(self, params): """ Args: @@ -301,5 +349,5 @@ def stat(self, params): total_count (int) """ - query = params.get('query', {}) + query = params.get("query", {}) return self.user_channel_mgr.stat_user_channels(query)