diff --git a/tap_gitlab/client.py b/tap_gitlab/client.py index 6ed581a..eb9ccb0 100644 --- a/tap_gitlab/client.py +++ b/tap_gitlab/client.py @@ -24,15 +24,21 @@ class GitLabStream(RESTStream): records_jsonpath = "$[*]" next_page_token_jsonpath = "$.X-Next-Page" + extra_url_params: dict = {} + bookmark_param_name = "since" @property def url_base(self) -> str: """Return the API URL root, configurable via tap settings.""" return self.config.get("api_url", DEFAULT_API_URL) + @property + def schema_filename(self) -> str: + return f"{self.name}.json" + @property def schema_filepath(self) -> Path: - return SCHEMAS_DIR / f"{self.name}.json" + return SCHEMAS_DIR / self.schema_filename @property def authenticator(self) -> APIKeyAuthenticator: @@ -56,12 +62,17 @@ def get_url_params( self, context: Optional[dict], next_page_token: Optional[Any] ) -> Dict[str, Any]: """Return a dictionary of values to be used in URL parameterization.""" - params: dict = {} + # If the class has extra default params, start with those: + params: dict = self.extra_url_params + if next_page_token: params["page"] = next_page_token if self.replication_key: params["sort"] = "asc" params["order_by"] = self.replication_key + if self.is_timestamp_replication_key: + params[self.bookmark_param_name] = self.get_starting_timestamp(context) + return params def get_next_page_token( diff --git a/tap_gitlab/streams.py b/tap_gitlab/streams.py index 601d4a1..b74cde3 100644 --- a/tap_gitlab/streams.py +++ b/tap_gitlab/streams.py @@ -1,18 +1,225 @@ """Stream type classes for tap-gitlab.""" -from pathlib import Path -from typing import Any, Dict, Optional, Union, List, Iterable +from typing import Any, Dict, Optional -from singer_sdk import typing as th # JSON Schema typing helpers +from tap_gitlab.client import GitLabStream, ProjectBasedStream -from tap_gitlab.client import GitLabStream -SCHEMAS_DIR = Path(__file__).parent / Path("./schemas") +class ProjectsStream(ProjectBasedStream): + """Gitlab Projects stream.""" - -class ProjectsStream(GitLabStream): - """Define custom stream.""" name = "projects" - path = "/projects" + path = "/projects/{project_id}" primary_keys = ["id"] replication_key = "last_activity_at" + is_sorted = True + extra_url_params = {"statistics": 1} + + +class GroupProjectsStream(ProjectBasedStream): + """Gitlab Projects stream.""" + + name = "group_projects" + path = "/groups/{group_id}/projects" + primary_keys = ["id"] + + +class ReleasesStream(ProjectBasedStream): + """Gitlab Releases stream.""" + + name = "releases" + path = "/projects/{project_id}/releases" + primary_keys = ["project_id", "commit_id", "tag_name"] + replication_key = None + + +class IssuesStream(ProjectBasedStream): + """Gitlab Issues stream.""" + + name = "issues" + path = "/projects/{project_id}/issues" + primary_keys = ["id"] + replication_key = "updated_at" + bookmark_param_name = "updated_after" + is_sorted = True + extra_url_params = {"scope": "all"} + + +class CommitsStream(ProjectBasedStream): + """Gitlab Commits stream.""" + + name = "commits" + path = "/projects/{project_id}/repository/commits" + primary_keys = ["id"] + replication_key = "created_at" + is_sorted = False + extra_url_params = {"with_stats": "true"} + + +class MergeRequestCommitsStream(ProjectBasedStream): + """Gitlab Commits stream.""" + + name = "merge_request_commits" + path = "/projects/{project_id}/merge_requests/{secondary_id}/commits" + primary_keys = ["project_id", "merge_request_iid", "commit_id"] + + +class EpicsStream(ProjectBasedStream): + """Gitlab Epics stream.""" + + name = "epics" + path = "/groups/{group_id}/epics" + primary_keys = ["id"] + replication_key = "updated_at" + bookmark_param_name = "updated_after" + + def get_child_context(self, record: dict, context: Optional[dict]) -> dict: + """Perform post processing, including queuing up any child stream types.""" + # Ensure child state record(s) are created + return { + "group_id": record["group_id"], + "epic_id": record["id"], + "epic_iid": record["iid"], + } + + +class EpicIssuesStream(GitLabStream): + """EpicIssues stream class.""" + + name = "epic_issues" + path = "/groups/{group_id}/epics/{epic_iid}/issues" + primary_keys = ["group_id", "epic_iid", "epic_issue_id"] + parent_stream_type = EpicsStream # Stream should wait for parents to complete. + + def get_url_params( + self, context: Optional[dict], next_page_token: Optional[Any] + ) -> Dict[str, Any]: + """Return a dictionary of values to be used in parameterization.""" + result = super().get_url_params(context, next_page_token) + if not context or "epic_id" not in context: + raise ValueError("Cannot sync epic issues without already known epic IDs.") + return result + + +class BranchesStream(ProjectBasedStream): + name = "branches" + path = "/projects/{project_id}/repository/branches" + primary_keys = ["project_id", "name"] + + +class PipelinesStream(ProjectBasedStream): + name = "pipelines" + path = "/projects/{project_id}/pipelines" + primary_keys = ["id"] + replication_key = "updated_at" + bookmark_param_name = "updated_after" + + def get_child_context(self, record: dict, context: Optional[dict]) -> dict: + context = super().get_child_context(record, context) + context["pipeline_id"] = record["id"] + return context + + +class PipelinesExtendedStream(ProjectBasedStream): + name = "pipelines_extended" + path = "/projects/{project_id}/pipelines/{pipeline_id}" + primary_keys = ["id"] + parent_stream_type = PipelinesStream + + +class JobsStream(ProjectBasedStream): + name = "jobs" + path = "/projects/{project_id}/pipelines/{pipeline_id}/jobs" + primary_keys = ["id"] + parent_stream_type = PipelinesStream # Stream should wait for parents to complete. + + +class ProjectMilestonesStream(ProjectBasedStream): + name = "project_milestones" + path = "/projects/{project_id}/milestones" + primary_keys = ["id"] + schema_filename = "milestones.json" + + +class ProjectMergeRequestsStream(ProjectBasedStream): + name = "merge_requests" + path = "/projects/{project_id}/merge_requests" + primary_keys = ["id"] + replication_key = "updated_at" + bookmark_param_name = "updated_after" + parent_stream_type = None # Stream should wait for parents to complete. + extra_url_params = {"scope": "all"} + + +class UsersStream(ProjectBasedStream): + name = "users" + path = "/projects/{project_id}/users" + primary_keys = ["id"] + + +class SiteUsersStream(GitLabStream): + name = "site_users" + path = "/users" + primary_keys = ["id"] + schema_filename = "users.json" + + +class GroupsStream(GitLabStream): + name = "groups" + path = "/groups/{group_id}" + primary_keys = ["id"] + + +class GroupMilestonesStream(ProjectBasedStream): + name = "group_milestones" + path = "/groups/{group_id}/milestones" + primary_keys = ["id"] + schema_filename = "milestones.json" + + +class GroupMembersStream(ProjectBasedStream): + name = "group_members" + path = "/groups/{group_id}/members" + primary_keys = ["group_id", "id"] + + +class ProjectMembersStream(ProjectBasedStream): + name = "project_members" + path = "/projects/{project_id}/members" + primary_keys = ["project_id", "id"] + + +class TagsStream(ProjectBasedStream): + name = "tags" + path = "/projects/{project_id}/repository/tags" + primary_keys = ["project_id", "commit_id", "name"] + + +class GroupLabelsStream(ProjectBasedStream): + name = "group_labels" + path = "/groups/{group_id}/labels" + primary_keys = ["group_id", "id"] + + +class ProjectLabelsStream(ProjectBasedStream): + name = "project_labels" + path = "/projects/{project_id}/labels" + primary_keys = ["project_id", "id"] + + +class VulnerabilitiesStream(ProjectBasedStream): + name = "vulnerabilities" + path = "/projects/{project_id}/vulnerabilities" + primary_keys = ["id"] + + +class GroupVariablesStream(ProjectBasedStream): + name = "group_variables" + path = "/groups/{group_id}/variables" + primary_keys = ["project_id", "key"] + + +class ProjectVariablesStream(ProjectBasedStream): + name = "project_variables" + path = "/projects/{project_id}/variables" + primary_keys = ["group_id", "key"] diff --git a/tap_gitlab/tap.py b/tap_gitlab/tap.py index 0b9d67a..40c9039 100644 --- a/tap_gitlab/tap.py +++ b/tap_gitlab/tap.py @@ -1,24 +1,27 @@ """GitLab tap class.""" +import inspect from typing import List from singer_sdk import Tap, Stream from singer_sdk import typing as th # JSON schema typing helpers from tap_gitlab.caching import setup_requests_cache -from tap_gitlab.streams import ( - # TODO: Import your custom stream types here: - GitLabStream, - ProjectsStream, -) -STREAM_TYPES = [ - # TODO: Compile a list of custom stream types here: - ProjectsStream, +from tap_gitlab.streams import GitLabStream, ProjectBasedStream +from tap_gitlab import streams + +OPTIN_CLASS_NAMES = [ + "merge_request_commits", + "pipelines_extended", + "group_variables", + "project_variables", ] +ULTIMATE_LICENSE_CLASS_NAMES = ["epics", "epic_issues"] class TapGitLab(Tap): """GitLab tap class.""" + name = "tap-gitlab" config_jsonschema = th.PropertiesList( @@ -26,64 +29,85 @@ class TapGitLab(Tap): "api_url", th.StringType, required=False, - description="Overrides the base URL.", + description="Optionally overrides the default base URL for the Gitlab API.", ), th.Property( "private_token", th.StringType, required=True, - description="TODO", + description="An access token to use when calling to the Gitlab API.", ), th.Property( "groups", th.StringType, required=False, - description="TODO", + description=( + "A space delimited list of group ids, e.g. " + "'orgname1 orgname2 orgname3'" + ), ), th.Property( "projects", th.StringType, required=False, - description="TODO", + description=( + "A space delimited list of project ids, e.g. " + "'orgname/projectname1 orgname/projectname2" + ), ), th.Property( "start_date", th.DateTimeType, required=False, - description="TODO", + description=( + "Optional. If provided, this is the furthest date for which " + "data will be retrieved." + ), ), th.Property( "ultimate_license", th.BooleanType, required=False, - description="TODO", + description=( + "If not set to 'true', the following streams will be ignored: " + "'epics' and 'epic_issues'." + ), ), th.Property( "fetch_merge_request_commits", th.BooleanType, required=False, - description="TODO", + description=( + "If not set to 'true', the 'merge_request_commits' stream will be " + "ignored." + ), default=False, ), th.Property( "fetch_pipelines_extended", th.BooleanType, required=False, - description="TODO", + description=( + "If not set to 'true', the 'pipelines_extended' stream will be ignored." + ), default=False, ), th.Property( "fetch_group_variables", th.BooleanType, required=False, - description="TODO", + description=( + "If not set to 'true', the 'group_variables' stream will be ignored." + ), default=False, ), th.Property( "fetch_project_variables", th.BooleanType, required=False, - description="TODO", + description=( + "If not set to 'true', the 'project_variables' stream will be ignored." + ), default=False, ), th.Property( @@ -113,5 +137,29 @@ class TapGitLab(Tap): def discover_streams(self) -> List[Stream]: """Return a list of discovered streams.""" - setup_requests_cache(self.config) - return [stream_class(tap=self) for stream_class in STREAM_TYPES] + setup_requests_cache(dict(self.config)) + + stream_types: List[type] = [] + for class_name, module_class in inspect.getmembers(streams, inspect.isclass): + class_name = module_class.__name__ + if not issubclass(module_class, (GitLabStream, ProjectBasedStream)): + continue # Not a stream class. + + if class_name in ["GitLabStream", "ProjectBasedStream"]: + continue # Base classes, not streams. + + if ( + class_name in OPTIN_CLASS_NAMES + and not self.config[f"fetch_{class_name}"] + ): + continue # This is an "optin" class, and is not opted in. + + if ( + class_name in ULTIMATE_LICENSE_CLASS_NAMES + and not self.config["ultimate_license"] + ): + continue # This is an ultimate license class and will be skipped. + + stream_types.append(module_class) + + return [stream_class(tap=self) for stream_class in stream_types]