Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Collector service #4

Merged
merged 1 commit into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/spaceone/inventory_v2/interface/grpc/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from spaceone.core.pygrpc.server import GRPCServer
from .region import Region
from .collector import Collector

_all_ = ["app"]

app = GRPCServer()
app.add_service(Region)
app.add_service(Region)
app.add_service(Collector)
66 changes: 66 additions & 0 deletions src/spaceone/inventory_v2/interface/grpc/collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from google.protobuf.json_format import ParseDict
from spaceone.api.inventory_v2.v1 import collector_pb2, collector_pb2_grpc
from spaceone.api.inventory_v2.v1 import job_pb2
from spaceone.core.pygrpc import BaseAPI

from spaceone.inventory_v2.service.collector_service import CollectorService


class Collector(BaseAPI, collector_pb2_grpc.CollectorServicer):
pb2 = collector_pb2
pb2_grpc = collector_pb2_grpc

def create(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.create(params)
return self.dict_to_message(response)

def update(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.update(params)
return self.dict_to_message(response)

def delete(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
collector_svc.delete(params)
return self.empty()

def get(self, request, context):
params, metadata = self.parse_request(request, context)

collector_svc = CollectorService(metadata)
response: dict = collector_svc.get(params)
return self.dict_to_message(response)

def list(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.list(params)
return self.dict_to_message(response)

def stat(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.stat(params)
return self.dict_to_message(response)

def collect(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.collect(params)
return ParseDict(response, job_pb2.JobInfo())

def update_plugin(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
response: dict = collector_svc.update_plugin(params)
return self.dict_to_message(response)

def verify_plugin(self, request, context):
params, metadata = self.parse_request(request, context)
collector_svc = CollectorService(metadata)
collector_svc.verify_plugin(params)
return self.empty()
100 changes: 100 additions & 0 deletions src/spaceone/inventory_v2/manager/collection_state_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging
from typing import Union, Tuple, List

from spaceone.core.model.mongo_model import QuerySet
from spaceone.core.manager import BaseManager
from spaceone.inventory_v2.model.collection_state.database import CollectionState

_LOGGER = logging.getLogger(__name__)


class CollectionStateManager(BaseManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.collector_id = self.transaction.get_meta("collector_id")
self.job_task_id = self.transaction.get_meta("job_task_id")
self.secret_id = self.transaction.get_meta("secret.secret_id")
self.collection_state_model = CollectionState

def create_collection_state(self, cloud_service_id: str, domain_id: str) -> None:
def _rollback(vo: CollectionState):
_LOGGER.info(
f"[ROLLBACK] Delete collection state: cloud_service_id = {vo.cloud_service_id}, "
f"collector_id = {vo.collector_id}"
)
vo.terminate()

if self.collector_id and self.job_task_id and self.secret_id:
state_data = {
"collector_id": self.collector_id,
"job_task_id": self.job_task_id,
"secret_id": self.secret_id,
"cloud_service_id": cloud_service_id,
"domain_id": domain_id,
}

state_vo = self.collection_state_model.create(state_data)
self.transaction.add_rollback(_rollback, state_vo)

def update_collection_state_by_vo(
self, params: dict, state_vo: CollectionState
) -> CollectionState:
def _rollback(old_data):
_LOGGER.info(
f"[ROLLBACK] Revert collection state : cloud_service_id = {state_vo.cloud_service_id}, "
f"collector_id = {state_vo.collector_id}"
)
state_vo.update(old_data)

self.transaction.add_rollback(_rollback, state_vo.to_dict())
return state_vo.update(params)

def reset_collection_state(self, state_vo: CollectionState) -> None:
if self.job_task_id:
params = {"disconnected_count": 0, "job_task_id": self.job_task_id}

self.update_collection_state_by_vo(params, state_vo)

def get_collection_state(
self, cloud_service_id: str, domain_id: str
) -> Union[CollectionState, None]:
if self.collector_id and self.secret_id:
state_vos = self.collection_state_model.filter(
collector_id=self.collector_id,
secret_id=self.secret_id,
cloud_service_id=cloud_service_id,
domain_id=domain_id,
)

if state_vos.count() > 0:
return state_vos[0]

return None

def filter_collection_states(self, **conditions) -> QuerySet:
return self.collection_state_model.filter(**conditions)

def list_collection_states(self, query: dict) -> Tuple[QuerySet, int]:
return self.collection_state_model.query(**query)

def delete_collection_state_by_cloud_service_id(
self, resource_id: str, domain_id: str
) -> None:
state_vos = self.filter_collection_states(
cloud_service_id=resource_id, domain_id=domain_id
)
state_vos.delete()

def delete_collection_state_by_cloud_service_ids(
self, cloud_service_ids: List[str]
) -> None:
state_vos = self.filter_collection_states(cloud_service_id=cloud_service_ids)
state_vos.delete()

def delete_collection_state_by_collector_id(
self, collector_id: str, domain_id: str
) -> None:
state_vos = self.filter_collection_states(
collector_id=collector_id, domain_id=domain_id
)
state_vos.delete()
90 changes: 90 additions & 0 deletions src/spaceone/inventory_v2/manager/collector_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import logging
from typing import Tuple, Union
from datetime import datetime
from spaceone.core.manager import BaseManager
from spaceone.core.model.mongo_model import QuerySet
from spaceone.inventory_v2.model.collector.database import Collector

__ALL__ = ["CollectorManager"]

_LOGGER = logging.getLogger(__name__)


class CollectorManager(BaseManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.collector_model = Collector()

def create_collector(self, params: dict) -> Collector:
def _rollback(vo: Collector):
_LOGGER.info(f"[ROLLBACK] Delete collector : {vo.name} ({vo.collector_id})")
vo.delete()

collector_vo: Collector = self.collector_model.create(params)
self.transaction.add_rollback(_rollback, collector_vo)
return collector_vo

def update_collector_by_vo(
self, params: dict, collector_vo: Collector
) -> Collector:
def _rollback(old_data):
_LOGGER.info(f"[ROLLBACK] Revert Data : {old_data.get('collector_id')}")
collector_vo.update(old_data)

self.transaction.add_rollback(_rollback, collector_vo.to_dict())
return collector_vo.update(params)

def enable_collector(
self, collector_id: str, domain_id: str, workspace_id: str = None
):
collector_vo: Collector = self.collector_model.get(
collector_id=collector_id, domain_id=domain_id, workspace_id=workspace_id
)

return self.update_collector_by_vo({"state": "ENABLED"}, collector_vo)

def disable_collector(
self, collector_id: str, domain_id: str, workspace_id: str = None
):
collector_vo: Collector = self.collector_model.get(
collector_id=collector_id, domain_id=domain_id, workspace_id=workspace_id
)

return self.update_collector_by_vo({"state": "DISABLED"}, collector_vo)

@staticmethod
def delete_collector_by_vo(collector_vo: Collector) -> None:
collector_vo.delete()

def get_collector(
self,
collector_id: str,
domain_id: str,
workspace_id: Union[list, str, None] = None,
) -> Collector:
conditions = {
"collector_id": collector_id,
"domain_id": domain_id,
}

if workspace_id:
conditions.update({"workspace_id": workspace_id})

return self.collector_model.get(**conditions)

def filter_collector(self, **conditions) -> QuerySet:
return self.collector_model.filter(**conditions)

def list_collectors(self, query: dict) -> Tuple[QuerySet, int]:
return self.collector_model.query(**query)

def stat_collectors(self, query: dict) -> dict:
return self.collector_model.stat(**query)

def update_last_collected_time(self, collector_vo: Collector):
_LOGGER.debug(
f"[update_last_collected_time] updated collected at: {collector_vo.collector_id}"
)
self.update_collector_by_vo(
{"last_collected_at": datetime.utcnow()}, collector_vo
)
49 changes: 49 additions & 0 deletions src/spaceone/inventory_v2/manager/collector_plugin_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
from typing import Generator, Union
from spaceone.core.manager import BaseManager
from spaceone.core.connector.space_connector import SpaceConnector

__ALL__ = ["CollectorPluginManager"]

_LOGGER = logging.getLogger(__name__)


class CollectorPluginManager(BaseManager):
def init_plugin(self, endpoint: str, options: dict) -> dict:
plugin_connector: SpaceConnector = self.locator.get_connector(
SpaceConnector, endpoint=endpoint, token="NO_TOKEN"
)
return plugin_connector.dispatch("Collector.init", {"options": options})

def verify_plugin(self, endpoint: str, options: dict, secret_data: dict) -> None:
plugin_connector: SpaceConnector = self.locator.get_connector(
SpaceConnector, endpoint=endpoint, token="NO_TOKEN"
)
params = {"options": options, "secret_data": secret_data}
plugin_connector.dispatch("Collector.verify", params)

def collect(
self,
endpoint: str,
options: dict,
secret_data: dict,
task_options: dict = None,
) -> Generator[dict, None, None]:
plugin_connector: SpaceConnector = self.locator.get_connector(
SpaceConnector, endpoint=endpoint, token="NO_TOKEN"
)

params = {"options": options, "secret_data": secret_data, "filter": {}}

if task_options:
params["task_options"] = task_options

return plugin_connector.dispatch("Collector.collect", params)

def get_tasks(self, endpoint: str, secret_data: dict, options: dict) -> dict:
plugin_connector: SpaceConnector = self.locator.get_connector(
SpaceConnector, endpoint=endpoint, token="NO_TOKEN"
)

params = {"options": options, "secret_data": secret_data}
return plugin_connector.dispatch("Job.get_tasks", params)
Loading
Loading