Skip to content
This repository has been archived by the owner on Jun 15, 2022. It is now read-only.

Commit

Permalink
Merge pull request #3 from lsst-sqre/tickets/DM-34848
Browse files Browse the repository at this point in the history
Add debugging output and actually find all buckets
  • Loading branch information
athornton authored May 21, 2022
2 parents 6838d6e + f68e460 commit 80f44c5
Showing 1 changed file with 50 additions and 25 deletions.
75 changes: 50 additions & 25 deletions bucketmapper
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3
import asyncio
import logging
import os
from dataclasses import asdict, dataclass, field
from datetime import timedelta
Expand Down Expand Up @@ -124,11 +125,20 @@ class BucketMapper:
token: str = os.getenv("INFLUXDB_TOKEN") or "",
url: str = os.getenv("INFLUXDB_URL") or DEFAULT_URL,
org: str = os.getenv("INFLUXDB_ORG") or DEFAULT_ORG,
debug: bool = bool(os.getenv("DEBUG")) or False,
) -> None:
assert token, "INFLUXDB_TOKEN or token param must be set"
loglevel = logging.WARNING
self.debug = debug
if self.debug:
loglevel = logging.DEBUG
logging.basicConfig(
format="%(asctime)s %(levelname)s:%(message)s", level=loglevel
)
self.log = logging.getLogger(__name__)
self.log.debug("Logging established.")
self.org = org
self.api_url = url + "/api/v2"
self.loop = asyncio.get_event_loop()
self.params = {"org": self.org}
self.session = aiohttp.ClientSession()
self.session.headers.update({"Authorization": f"Token {token}"})
Expand All @@ -141,40 +151,52 @@ class BucketMapper:
await self.session.close()

async def list_buckets(self) -> List[BucketGet]:
next = f"{self.api_url}/buckets"
"""List all buckets."""
url = f"{self.api_url}/buckets"
b_list = []
while next:
resp = await self.session.get(next, params=self.params)
pagesize = 20
# I have no idea why pagesize X really gives you X-2 buckets back,
# but it does.
#
# It also doesn't set links.next. So rather than trying to
# chase links, we'll just keep asking, setting "after" to the last
# bucket we saw, until no more buckets appear.
params: Dict[str, Any] = {"limit": pagesize}
params.update(self.params)
last_id = ""
while True:
if last_id:
params.update({"after": last_id})
resp = await self.session.get(url, params=params)
obj = await resp.json()
b_list.extend(obj["buckets"])
next = ""
if "links" in obj and "next" in obj["links"]:
next = obj["links"]["next"]
return [BucketGet(**x) for x in b_list]
bucketlist = obj["buckets"]
if not bucketlist:
break
b_list.extend(bucketlist)
last_id = bucketlist[-1]["id"]
buckets = [BucketGet(**x) for x in b_list]
self.log.debug(f"Buckets -> {buckets}")
return buckets

async def list_dbrps(self) -> List[DBRPGet]:
async def get_dbrp_page(uri: str) -> List[Dict[str, Any]]:
resp = await self.session.get(uri, params=self.params)
obj = await resp.json()
return obj["content"]

"""List all DRBPs."""
# This method does not appear to be paginated
# https://docs.influxdata.com/influxdb/v2.2/api
d_list: List[Dict[str, Any]] = []
next = f"{self.api_url}/dbrps"
while next:
obj = await get_dbrp_page(next)
if not obj:
break
d_list.extend(obj)
next = ""
if "links" in obj[-1] and "next" in obj[-1]["links"]:
next = obj[-1]["links"]
return [DBRPGet(**x) for x in d_list]
url = f"{self.api_url}/dbrps"
resp = await self.session.get(url, params=self.params)
obj = await resp.json()
d_list = obj["content"]
dbrps = [DBRPGet(**x) for x in d_list]
self.log.debug(f"DBRPS -> {dbrps}")
return dbrps

async def prepare_buckets_without_dbrps(self) -> List[BucketGet]:
"""Generate a list of buckets whose corresponding DBRPs are missing"""
buckets = await self.list_buckets()
dbrps_ids = [x.bucketID for x in await self.list_dbrps()]
missing_buckets = [x for x in buckets if x.id not in dbrps_ids]
self.log.info(f"Buckets without DBRPS -> {missing_buckets}")
return missing_buckets

def make_dbrp_from_bucket(self, bucket: BucketGet) -> DBRPPost:
Expand Down Expand Up @@ -235,16 +257,19 @@ class BucketMapper:
async def prepare_new_dbrps(self) -> List[DBRPPost]:
"""Generate a list of DBRPs to create"""
# fmt: off
return [
new_dbrps = [
self.make_dbrp_from_bucket(x)
for x in await self.prepare_buckets_without_dbrps()
]
# fmt: on
self.log.debug(f"New DBRPS to create -> {new_dbrps}")
return new_dbrps

async def create_new_dbrps(self) -> None:
"""Create DBRPs in InfluxDB v2"""
dbrps = await self.prepare_new_dbrps()
payloads = [asdict(x) for x in dbrps]
self.log.debug(f"Creating DBRPS -> {payloads}")
# fmt: off
payload_futs = [
self.session.post(
Expand Down

0 comments on commit 80f44c5

Please sign in to comment.