Skip to content

Commit

Permalink
Add Stream Testing Utility as draft (#5)
Browse files Browse the repository at this point in the history
* Add StreamTestUtility

* Add some stream attribute tests

* Add expectation testing

* Add built-in tests

* Change class to TapTestUtility

* Flatten test naming

* Update poetry deps

* Make stream record limit explicit and fmt

* MIgrate test generation logic into test util

* Add more typing tests

* Add stream replication key handling

* Fix tests to passing and add ovservability
  • Loading branch information
stkbailey authored Nov 3, 2021
1 parent 244d4a5 commit adabada
Show file tree
Hide file tree
Showing 13 changed files with 518 additions and 200 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,5 @@ dmypy.json

.env
.meltano
.secrets
.secrets
.DS_Store
148 changes: 80 additions & 68 deletions poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[tool.poetry]
name = "tap-slack"
version = "0.1.0"
version = "0.1.1"
description = "`tap-slack` is a Singer tap for Slack, built with the Meltano SDK for Singer Taps."
authors = ["Stephen Bailey"]
license = "Apache 2.0"

[tool.poetry.dependencies]
python = "<3.9,>=3.6.2"
python = "<3.10,>=3.6.2"
requests = "^2.25.1"
singer-sdk = "^0.3.10"

Expand Down
10 changes: 9 additions & 1 deletion tap_slack/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""REST client handling, including SlackStream base class."""

import time
from typing import Any, Dict, Text, Optional
from typing import Any, Dict, Text, Optional, List

from singer_sdk.streams import RESTStream
from singer_sdk.authenticators import BearerTokenAuthenticator
Expand All @@ -24,6 +24,14 @@ def authenticator(self) -> BearerTokenAuthenticator:
token=self.config.get("api_key"),
)

@property
def expectations(self) -> List[str]:
return [
"stream__returns_record",
"stream__record_schema_matches_catalog",
"stream__primary_key",
]

def get_url_params(
self, context: Optional[dict], next_page_token: Optional[Any]
) -> Dict[str, Any]:
Expand Down
8 changes: 6 additions & 2 deletions tap_slack/schemas/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@

schema = th.PropertiesList(
th.Property("channel_id", th.StringType, required=True),
th.Property("message_ts", th.DateTimeType, required=True),
th.Property("ts", th.DateTimeType, required=True),
th.Property(
"ts",
th.NumberType,
required=True,
description="Epoch timestamp of when the thread reply was posted.",
),
th.Property(
"blocks",
th.ArrayType(
Expand Down
19 changes: 17 additions & 2 deletions tap_slack/schemas/threads.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,18 @@

schema = th.PropertiesList(
th.Property("channel_id", th.StringType, required=True),
th.Property("ts", th.DateTimeType),
th.Property("thread_ts", th.DateTimeType, required=True),
th.Property(
"thread_ts",
th.NumberType,
required=True,
description="Epoch timestamp of when the thread parent message was posted.",
),
th.Property(
"ts",
th.NumberType,
required=True,
description="Epoch timestamp of when the thread reply was posted.",
),
th.Property("client_msg_id", th.StringType),
th.Property("type", th.StringType),
th.Property("text", th.StringType),
Expand All @@ -12,7 +22,12 @@
th.Property("edited", th.StringType),
th.Property("files", th.StringType),
th.Property("upload", th.StringType),
th.Property("parent_user_id", th.StringType),
th.Property("display_as_bot", th.StringType),
th.Property("upload", th.StringType),
th.Property("edited", th.StringType),
th.Property("is_locked", th.BooleanType),
th.Property("files", th.StringType),
th.Property(
"blocks",
th.ArrayType(
Expand Down
2 changes: 1 addition & 1 deletion tap_slack/schemas/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
th.Property("is_restricted", th.BooleanType),
th.Property("is_ultra_restricted", th.BooleanType),
th.Property("is_bot", th.BooleanType),
th.Property("updated", th.StringType),
th.Property("updated", th.NumberType),
th.Property("is_app_user", th.BooleanType),
th.Property("is_email_confirmed", th.BooleanType),
th.Property("who_can_share_contact_card", th.StringType),
Expand Down
59 changes: 34 additions & 25 deletions tap_slack/streams.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Stream type classes for tap-slack."""
import requests
import pendulum

from datetime import datetime, timezone, timedelta
from typing import Any, Dict, Optional, Iterable
from typing import Any, Dict, Optional, Iterable, cast
from singer_sdk.helpers.jsonpath import extract_jsonpath

from tap_slack.client import SlackStream
Expand Down Expand Up @@ -38,20 +39,20 @@ def _join_channel(self, channel_id: str) -> requests.Response:
url = f"{self.url_base}/conversations.join"
params = {"channel": channel_id}
response = self.requests_session.post(
url=url,
params=params,
headers=self.authenticator.auth_headers
url=url, params=params, headers=self.authenticator.auth_headers
)
if not response.json().get("ok"):
self.logger.warning("Error joining channel %s: %s", response.json().get("error"))
self.logger.info("Successfully joined channel: %s", channel_id)
self.logger.warning(
"Error joining channel %s: %s", response.json().get("error")
)
self.logger.info("Successfully joined channel: %s", channel_id)


class ChannelMembersStream(SlackStream):
name = "channel_members"
parent_stream_type = ChannelsStream
path = "/conversations.members"
primary_keys = ["channel_id", "id"]
primary_keys = ["channel_id", "member_id"]
records_jsonpath = "members.[*]"
schema = schemas.channel_members

Expand Down Expand Up @@ -81,32 +82,36 @@ class MessagesStream(SlackStream):
max_requests_per_minute = 50

@property
def threads_stream_starting_timestamp(self):
lookback_days = timedelta(self.config["thread_lookback_days"])
return datetime.now(tz=timezone.utc) - lookback_days
def threads_stream_start(self):
lookback_days = timedelta(days=self.config["thread_lookback_days"])
start_date = datetime.now(tz=timezone.utc) - lookback_days
return start_date.timestamp()

def get_url_params(self, context, next_page_token):
"""Augment default to implement incremental syncing."""
params = super().get_url_params(context, next_page_token)
start_time = self.get_starting_timestamp(context)
if start_time:
params["oldest"] = start_time.strftime("%s")
start_timestamp = self.get_starting_replication_key_value(context)
if start_timestamp:
params["oldest"] = start_timestamp
return params

def post_process(self, row: dict, context: Optional[dict]) -> dict:
"""
Directly invoke the threads stream sync on relevant messages,
and filter out messages that have already been synced before.
"""
messages_stream_starting_timestamp = super().get_starting_timestamp(context)
if row.get("thread_ts") and self._tap.streams["threads"].selected:
threads_context = {**context, **{"thread_ts": row["ts"]}}
self._tap.streams["threads"].sync(context=threads_context)
if row["ts"] < messages_stream_starting_timestamp.strftime("%s"):
if row["ts"] and float(row["ts"]) < self.get_starting_replication_key_value(
context
):
return None
return row

def get_starting_timestamp(self, context: Optional[dict]) -> Optional[datetime]:
def get_starting_replication_key_value(
self, context: Optional[dict]
) -> Optional[int]:
"""
Threads can continue to have messages for weeks after the original message
was posted, so we cannot assume that we have scraped all message replies
Expand All @@ -115,16 +120,20 @@ def get_starting_timestamp(self, context: Optional[dict]) -> Optional[datetime]:
(e.g. for full syncs) or the THREAD_LOOKBACK_DAYS days before the current run.
A longer THREAD_LOOKBACK_DAYS will result in longer incremental sync runs.
"""
messages_stream_starting_timestamp = super().get_starting_timestamp(context)
if not messages_stream_starting_timestamp:
return None
elif (
self.threads_stream_starting_timestamp
< messages_stream_starting_timestamp
):
return self.threads_stream_starting_timestamp
state = self.get_context_state(context)
replication_key_value = state.get("replication_key_value")
if replication_key_value:
if self.threads_stream_start < self.threads_stream_start:
return self.threads_stream_start
return replication_key_value
elif "start_date" in self.config:
start_date = cast(datetime, pendulum.parse(self.config["start_date"]))
return start_date.replace(tzinfo=timezone.utc).timestamp()
else:
return messages_stream_starting_timestamp
self.logger.info(
"Setting replication value to 0 to perform full historical sync."
)
return 0.0


class ThreadsStream(SlackStream):
Expand Down
8 changes: 8 additions & 0 deletions tap_slack/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,11 @@ class TapSlack(Tap):
def discover_streams(self) -> List[Stream]:
"""Return a list of discovered streams."""
return [stream_class(tap=self) for stream_class in STREAM_TYPES]

@property
def expectations(self):
return [
"tap__cli",
"tap__discovery",
"tap__stream_connections",
]
Loading

0 comments on commit adabada

Please sign in to comment.