Skip to content

Commit

Permalink
Merge pull request bluesky#1835 from callumforrester/1834-fix-linting
Browse files Browse the repository at this point in the history
Migrate code en-masse to fulfill new ruff rules
  • Loading branch information
tacaswell authored Nov 1, 2024
2 parents 59c87bb + 13f5ed8 commit 8ad45bf
Show file tree
Hide file tree
Showing 23 changed files with 150 additions and 171 deletions.
12 changes: 6 additions & 6 deletions .github/pages/make_switcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,28 @@
from argparse import ArgumentParser
from pathlib import Path
from subprocess import CalledProcessError, check_output
from typing import List, Optional
from typing import Optional


def report_output(stdout: bytes, label: str) -> List[str]:
def report_output(stdout: bytes, label: str) -> list[str]:
ret = stdout.decode().strip().split("\n")
print(f"{label}: {ret}")
return ret


def get_branch_contents(ref: str) -> List[str]:
def get_branch_contents(ref: str) -> list[str]:
"""Get the list of directories in a branch."""
stdout = check_output(["git", "ls-tree", "-d", "--name-only", ref])
return report_output(stdout, "Branch contents")


def get_sorted_tags_list() -> List[str]:
def get_sorted_tags_list() -> list[str]:
"""Get a list of sorted tags in descending order from the repository."""
stdout = check_output(["git", "tag", "-l", "--sort=-v:refname"])
return report_output(stdout, "Tags list")


def get_versions(ref: str, add: Optional[str]) -> List[str]:
def get_versions(ref: str, add: Optional[str]) -> list[str]:
"""Generate the file containing the list of all GitHub Pages builds."""
# Get the directories (i.e. builds) from the GitHub Pages branch
try:
Expand All @@ -41,7 +41,7 @@ def get_versions(ref: str, add: Optional[str]) -> List[str]:
tags = get_sorted_tags_list()

# Make the sorted versions list from main branches and tags
versions: List[str] = []
versions: list[str] = []
for version in ["master", "main"] + tags:
if version in builds:
versions.append(version)
Expand Down
53 changes: 27 additions & 26 deletions src/bluesky/bundlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import inspect
import time as ttime
from collections import defaultdict, deque
from typing import Any, Callable, Deque, Dict, FrozenSet, Iterable, List, Optional, Tuple, Union
from collections.abc import Iterable
from typing import Any, Callable, Optional, Union

from event_model import (
ComposeDescriptorBundle,
Expand Down Expand Up @@ -48,7 +49,7 @@
short_uid,
)

ObjDict = Dict[Any, Dict[str, T]]
ObjDict = dict[Any, dict[str, T]]
ExternalAssetDoc = Union[Datum, Resource, StreamDatum, StreamResource]


Expand All @@ -60,26 +61,26 @@ def __init__(self, md, record_interruptions, emit, emit_sync, log, *, strict_pre
self.bundling = False # if we are in the middle of bundling readings
self._bundle_name = None # name given to event descriptor
self._run_start_uid = None # The (future) runstart uid
self._objs_read: Deque[HasName] = deque() # objects read in one Event
self._read_cache: Deque[Dict[str, Reading]] = deque() # cache of obj.read() in one Event
self._objs_read: deque[HasName] = deque() # objects read in one Event
self._read_cache: deque[dict[str, Reading]] = deque() # cache of obj.read() in one Event
self._asset_docs_cache = deque() # cache of obj.collect_asset_docs()
self._describe_cache: ObjDict[DataKey] = dict() # cache of all obj.describe() output # noqa: C408
self._describe_collect_cache: ObjDict[Dict[str, DataKey]] = dict() # noqa: C408 # cache of all obj.describe() output
self._describe_collect_cache: ObjDict[dict[str, DataKey]] = dict() # noqa: C408 # cache of all obj.describe() output

self._config_desc_cache: ObjDict[DataKey] = dict() # " obj.describe_configuration() # noqa: C408
self._config_values_cache: ObjDict[Any] = dict() # " obj.read_configuration() values # noqa: C408
self._config_ts_cache: ObjDict[Any] = dict() # " obj.read_configuration() timestamps # noqa: C408
# cache of {name: (doc, compose_event, compose_event_page)}
self._descriptors: Dict[Any, ComposeDescriptorBundle] = dict() # noqa: C408
self._descriptor_objs: Dict[str, Dict[HasName, Dict[str, DataKey]]] = dict() # noqa: C408
self._descriptors: dict[Any, ComposeDescriptorBundle] = dict() # noqa: C408
self._descriptor_objs: dict[str, dict[HasName, dict[str, DataKey]]] = dict() # noqa: C408
# cache of {obj: {objs_frozen_set: (doc, compose_event, compose_event_page)}
self._local_descriptors: Dict[Any, Dict[FrozenSet[str], ComposeDescriptorBundle]] = dict() # noqa: C408
self._local_descriptors: dict[Any, dict[frozenset[str], ComposeDescriptorBundle]] = dict() # noqa: C408
# a seq_num counter per stream
self._sequence_counters: Dict[Any, int] = dict() # noqa: C408
self._sequence_counters_copy: Dict[Any, int] = dict() # for if we redo data-points # noqa: C408
self._monitor_params: Dict[Subscribable, Tuple[Callback, Dict]] = dict() # noqa: C408 # cache of {obj: (cb, kwargs)}
self._sequence_counters: dict[Any, int] = dict() # noqa: C408
self._sequence_counters_copy: dict[Any, int] = dict() # for if we redo data-points # noqa: C408
self._monitor_params: dict[Subscribable, tuple[Callback, dict]] = dict() # noqa: C408 # cache of {obj: (cb, kwargs)}
# a cache of stream_resource uid to the data_keys that stream_resource collects for
self._stream_resource_data_keys: Dict[str, Iterable[str]] = dict() # noqa: C408
self._stream_resource_data_keys: dict[str, Iterable[str]] = dict() # noqa: C408
self.run_is_open = False
self._uncollected = set() # objects after kickoff(), before collect()
# we expect the RE to take care of the composition
Expand All @@ -92,7 +93,7 @@ def __init__(self, md, record_interruptions, emit, emit_sync, log, *, strict_pre
self.emit_sync = emit_sync
self.log = log
# Map of set of collect objects to list of stream names that they can be collected into
self._declared_stream_names: Dict[FrozenSet, List[str]] = {}
self._declared_stream_names: dict[frozenset, list[str]] = {}

async def open_run(self, msg):
self.run_is_open = True
Expand Down Expand Up @@ -174,14 +175,14 @@ async def close_run(self, msg):
async def _prepare_stream(
self,
desc_key: str,
objs_dks: Dict[HasName, Dict[str, DataKey]],
objs_dks: dict[HasName, dict[str, DataKey]],
):
# We do not have an Event Descriptor for this set
# so one must be created.
data_keys = {}
config = {}
object_keys = {}
hints: Dict[str, Any] = {}
hints: dict[str, Any] = {}

for obj, dks in objs_dks.items():
maybe_update_hints(hints, obj)
Expand Down Expand Up @@ -248,7 +249,7 @@ async def declare_stream(self, msg):
for obj in objs:
if collect:
data_keys = self._describe_collect_cache[obj]
streams_and_data_keys: List[Tuple[str, Dict[str, Any]]] = (
streams_and_data_keys: list[tuple[str, dict[str, Any]]] = (
self._maybe_format_datakeys_with_stream_name(data_keys, message_stream_name=stream_name)
)

Expand Down Expand Up @@ -411,7 +412,7 @@ async def monitor(self, msg):
stream_bundle = await self._prepare_stream(name, {obj: self._describe_cache[obj]})
compose_event = stream_bundle[1]

def emit_event(readings: Optional[Dict[str, Reading]] = None, *args, **kwargs):
def emit_event(readings: Optional[dict[str, Reading]] = None, *args, **kwargs):
if readings is not None:
# We were passed something we can use, but check no args or kwargs
assert (
Expand Down Expand Up @@ -645,9 +646,9 @@ async def kickoff(self, msg):
# seperate places so it could be two seperate methods for each dictionary type.
def _maybe_format_datakeys_with_stream_name(
self,
describe_collect_dict: Union[Dict[str, DataKey], Dict[str, Dict[str, DataKey]]],
describe_collect_dict: Union[dict[str, DataKey], dict[str, dict[str, DataKey]]],
message_stream_name: Optional[str] = None,
) -> List[Tuple[str, Dict[str, DataKey]]]:
) -> list[tuple[str, dict[str, DataKey]]]:
"""
Check if the dictionary returned by describe collect is a dict
`{str: DataKey}` or a `{str: {str: DataKey}}`.
Expand Down Expand Up @@ -718,7 +719,7 @@ async def _describe_collect(self, collect_object: Flyable):
describe_collect = self._describe_collect_cache[collect_object]
describe_collect_items = list(self._maybe_format_datakeys_with_stream_name(describe_collect))

local_descriptors: Dict[Any, Dict[FrozenSet[str], ComposeDescriptorBundle]] = {}
local_descriptors: dict[Any, dict[frozenset[str], ComposeDescriptorBundle]] = {}

# Check that singly nested stuff should have been pre-declared
def is_data_key(obj: Any) -> bool:
Expand All @@ -729,7 +730,7 @@ def is_data_key(obj: Any) -> bool:
), "Single nested data keys should be pre-decalred"

# Make sure you can't use identidal data keys in multiple streams
duplicates: Dict[str, DataKey] = defaultdict(dict)
duplicates: dict[str, DataKey] = defaultdict(dict)
for stream, data_keys in describe_collect.items():
for key, stuff in data_keys.items():
for other_stream, other_data_keys in describe_collect.items():
Expand Down Expand Up @@ -792,7 +793,7 @@ async def _pack_seq_nums_into_stream_datum(

# message strem name here?
async def _pack_external_assets(
self, asset_docs: Iterable[Tuple[str, ExternalAssetDoc]], message_stream_name: Optional[str]
self, asset_docs: Iterable[tuple[str, ExternalAssetDoc]], message_stream_name: Optional[str]
):
"""Packs some external asset documents with relevant information from the run."""

Expand Down Expand Up @@ -864,7 +865,7 @@ async def _pack_external_assets(

return stream_datum_previous_indices_difference

def get_external_data_keys(self, data_keys: Dict[str, DataKey]) -> List[DataKey]:
def get_external_data_keys(self, data_keys: dict[str, DataKey]) -> list[DataKey]:
"""Get the external data keys from the descriptor data_keys dictionary"""
return [x for x in data_keys if ("external" in data_keys[x] and data_keys[x]["external"] == "STREAM:")]

Expand All @@ -876,7 +877,7 @@ async def _collect_events(
message_stream_name: Optional[str],
):
payload = []
pages: Dict[FrozenSet[str], List[Event]] = defaultdict(list)
pages: dict[frozenset[str], list[Event]] = defaultdict(list)

if message_stream_name:
compose_event = self._descriptors[message_stream_name].compose_event
Expand Down Expand Up @@ -991,7 +992,7 @@ async def collect(self, msg):

# Get references to get_index methods if we have more than one collect object
# raise error if collect_objects don't obey WritesStreamAssests protocol
indices: List[Callable[[None], SyncOrAsync[int]]] = []
indices: list[Callable[[None], SyncOrAsync[int]]] = []
if len(collect_objects) > 1:
indices = [check_supports(obj, WritesStreamAssets).get_index for obj in collect_objects]

Expand Down Expand Up @@ -1062,7 +1063,7 @@ async def collect(self, msg):
# Make event pages for an object which is EventCollectable or EventPageCollectable
# objects that are EventCollectable will now group the Events and Emit an Event Page
if len(collect_objects) == 1 and not isinstance(collect_objects[0], WritesStreamAssets):
local_descriptors: Dict[Any, Dict[FrozenSet[str], ComposeDescriptorBundle]] = {}
local_descriptors: dict[Any, dict[frozenset[str], ComposeDescriptorBundle]] = {}
collect_obj = collect_objects[0]

# If the single collect object is singly nested, gather descriptors
Expand Down
4 changes: 2 additions & 2 deletions src/bluesky/callbacks/best_effort.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
import threading
import time
import weakref
from collections.abc import Mapping
from datetime import datetime
from functools import partial
from io import StringIO
from pprint import pformat
from typing import List, Mapping
from warnings import warn

import matplotlib.pyplot as plt
Expand Down Expand Up @@ -553,7 +553,7 @@ class LivePlotPlusPeaks(LivePlot):
# Track state of axes, which may share instances of LivePlotPlusPeaks.
__labeled: Mapping[Axis, bool] = weakref.WeakKeyDictionary() # map ax to True/False
__visible: Mapping[Axis, bool] = weakref.WeakKeyDictionary() # map ax to True/False
__instances: Mapping[Axis, List["LivePlotPlusPeaks"]] = (
__instances: Mapping[Axis, list["LivePlotPlusPeaks"]] = (
weakref.WeakKeyDictionary()
) # map ax to list of instances

Expand Down
16 changes: 8 additions & 8 deletions src/bluesky/callbacks/tiled_writer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import copy
from typing import Any, Dict, Optional, Tuple, Union
from typing import Any, Optional, Union

import pandas as pd
from event_model import RunRouter
Expand Down Expand Up @@ -60,12 +60,12 @@ class _RunWriter(CallbackBase):
def __init__(self, client: BaseClient):
self.client = client
self.root_node: Union[None, Container] = None
self._desc_nodes: Dict[str, Container] = {} # references to descriptor containers by their uid's
self._sres_nodes: Dict[str, BaseClient] = {}
self._docs_cache: Dict[str, Union[Datum, Resource, StreamResource]] = {}
self._handlers: Dict[str, ConsolidatorBase] = {}
self.data_keys_int: Dict[str, Dict[str, Any]] = {}
self.data_keys_ext: Dict[str, Dict[str, Any]] = {}
self._desc_nodes: dict[str, Container] = {} # references to descriptor containers by their uid's
self._sres_nodes: dict[str, BaseClient] = {}
self._docs_cache: dict[str, Union[Datum, Resource, StreamResource]] = {}
self._handlers: dict[str, ConsolidatorBase] = {}
self.data_keys_int: dict[str, dict[str, Any]] = {}
self.data_keys_ext: dict[str, dict[str, Any]] = {}

def _ensure_resource_backcompat(self, doc: StreamResource) -> StreamResource:
"""Kept for back-compatibility with old StreamResource schema from event_model<1.20.0
Expand Down Expand Up @@ -218,7 +218,7 @@ def resource(self, doc: Resource):
def stream_resource(self, doc: StreamResource):
self._docs_cache[doc["uid"]] = self._ensure_resource_backcompat(doc)

def get_sres_node(self, sres_uid: str, desc_uid: Optional[str] = None) -> Tuple[BaseClient, ConsolidatorBase]:
def get_sres_node(self, sres_uid: str, desc_uid: Optional[str] = None) -> tuple[BaseClient, ConsolidatorBase]:
"""Get Stream Resource node from Tiled, if it already exists, or register it from a cached SR document"""

if sres_uid in self._sres_nodes.keys():
Expand Down
20 changes: 10 additions & 10 deletions src/bluesky/consolidators.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import collections
import dataclasses
import enum
from typing import Any, Dict, List, Optional, Set, Tuple
from typing import Any, Optional

import numpy as np
from event_model.documents import EventDescriptor, StreamDatum, StreamResource
Expand Down Expand Up @@ -47,7 +47,7 @@ class DataSource:
id: Optional[int] = None
mimetype: Optional[str] = None
parameters: dict = dataclasses.field(default_factory=dict)
assets: List[Asset] = dataclasses.field(default_factory=list)
assets: list[Asset] = dataclasses.field(default_factory=list)
management: Management = Management.writable


Expand Down Expand Up @@ -75,14 +75,14 @@ class ConsolidatorBase:
automated discovery of the subclassed Consolidator.
"""

supported_mimetypes: Set[str] = set()
supported_mimetypes: set[str] = set()

def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor):
self.mimetype = self.get_supported_mimetype(stream_resource)

self.data_key = stream_resource["data_key"]
self.uri = stream_resource["uri"]
self.assets: List[Asset] = []
self.assets: list[Asset] = []
self._sres_parameters = stream_resource["parameters"]

# Find data shape and machine dtype; dtype_numpy, dtype_str take precedence if specified
Expand All @@ -109,7 +109,7 @@ def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor)

self._num_rows: int = 0 # Number of rows in the Data Source (all rows, includung skips)
self._has_skips: bool = False
self._seqnums_to_indices_map: Dict[int, int] = {}
self._seqnums_to_indices_map: dict[int, int] = {}

@classmethod
def get_supported_mimetype(cls, sres):
Expand All @@ -118,7 +118,7 @@ def get_supported_mimetype(cls, sres):
return sres["mimetype"]

@property
def shape(self) -> Tuple[int]:
def shape(self) -> tuple[int]:
"""Native shape of the data stored in assets
This includes the leading (0-th) dimension corresponding to the number of rows, including skipped rows, if
Expand All @@ -127,7 +127,7 @@ def shape(self) -> Tuple[int]:
return self._num_rows, *self.datum_shape

@property
def chunks(self) -> Tuple[Tuple[int, ...], ...]:
def chunks(self) -> tuple[tuple[int, ...], ...]:
"""Explicit (dask-style) specification of chunk sizes
The produced chunk specification is a tuple of tuples of int that specify the sizes of each chunk in each
Expand Down Expand Up @@ -168,7 +168,7 @@ def has_skips(self) -> bool:
return self._num_rows > len(self._seqnums_to_indices_map)

@property
def adapter_parameters(self) -> Dict:
def adapter_parameters(self) -> dict:
"""A dictionary of parameters passed to an Adapter
These parameters are intended to provide any additional information required to read a data source of a
Expand Down Expand Up @@ -262,7 +262,7 @@ def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor)
self.swmr = self._sres_parameters.get("swmr", True)

@property
def adapter_parameters(self) -> Dict:
def adapter_parameters(self) -> dict:
"""Parameters to be passed to the HDF5 adapter, a dictionary with the keys:
dataset: List[str] - a path to the dataset within the hdf5 file represented as list split at `/`
Expand All @@ -276,7 +276,7 @@ class TIFFConsolidator(ConsolidatorBase):

def __init__(self, stream_resource: StreamResource, descriptor: EventDescriptor):
super().__init__(stream_resource, descriptor)
self.data_uris: List[str] = []
self.data_uris: list[str] = []

def get_datum_uri(self, indx: int):
"""Return a full uri for a datum (an individual TIFF file) based on its index in the sequence.
Expand Down
5 changes: 2 additions & 3 deletions src/bluesky/magics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import collections
import warnings
from operator import attrgetter
from typing import List

import numpy as np
from IPython.core.magic import Magics, line_magic, magics_class
Expand Down Expand Up @@ -63,8 +62,8 @@ def detectors(self, val):
)
self._detectors = val

_positioners: List[Movable] = []
_detectors: List[Readable] = []
_positioners: list[Movable] = []
_detectors: list[Readable] = []


@magics_class
Expand Down
Loading

0 comments on commit 8ad45bf

Please sign in to comment.