Skip to content

chore: remove obsolete functions #418

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 0 additions & 81 deletions tagreader/clients.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
from datetime import datetime, timedelta, timezone, tzinfo
from itertools import groupby
from operator import itemgetter
Expand All @@ -16,9 +15,6 @@
ReaderType,
convert_to_pydatetime,
ensure_datetime_with_tz,
find_registry_key,
find_registry_key_from_name,
is_windows,
)
from tagreader.web_handlers import (
AspenHandlerWeb,
Expand Down Expand Up @@ -119,83 +115,6 @@ def get_next_timeslice(
return start, calc_end


def get_server_address_aspen(datasource: str) -> Optional[Tuple[str, int]]:
"""Data sources are listed under
HKEY_CURRENT_USER\\Software\\AspenTech\\ADSA\\Caches\\AspenADSA\\username.
For each data source there are multiple keys with Host entries. We start by
identifying the correct key to use by locating the UUID for Aspen SQLplus
services located under Aspen SQLplus service component. Then we find the
host and port based on the path above and the UUID.
"""

# todo: is obsolete after removing ODBC

if not is_windows():
return None
import winreg

regkey_clsid = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Classes\Wow6432Node\CLSID"
)
regkey, _ = find_registry_key_from_name(
regkey_clsid, "Aspen SQLplus service component"
)
regkey_implemented_categories = winreg.OpenKeyEx(regkey, "Implemented Categories")

_, aspen_uuid = find_registry_key_from_name(
regkey_implemented_categories, "Aspen SQLplus services"
)

reg_adsa = winreg.OpenKey(
winreg.HKEY_CURRENT_USER,
r"Software\AspenTech\ADSA\Caches\AspenADSA\\" + os.getlogin(),
)

try:
reg_site_key = winreg.OpenKey(reg_adsa, datasource + "\\" + aspen_uuid)
host = winreg.QueryValueEx(reg_site_key, "Host")[0]
port = int(winreg.QueryValueEx(reg_site_key, "Port")[0])
return host, port
except FileNotFoundError:
return None


def get_server_address_pi(datasource: str) -> Optional[Tuple[str, int]]:
"""
PI data sources are listed under
HKEY_LOCAL_MACHINE\\Software\\Wow6432Node\\PISystem\\PI-SDK\\x.x\\ServerHandles or
\\Software\\PISystem\\PI-SDK\\x.x\\ServerHandles.

:param datasource: Name of data source
:return: host, port
:type: tuple(string, int)
"""
# todo: is obsolete after removing ODBC

if not is_windows():
return None
import winreg

try:
reg_key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\Wow6432Node\PISystem\PI-SDK"
)
reg_key_handles = find_registry_key(reg_key, "ServerHandles")
reg_site_key = find_registry_key(reg_key_handles, datasource)
if reg_site_key is None:
reg_key = winreg.OpenKey(
winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\PISystem\PI-SDK"
)
reg_key_handles = find_registry_key(reg_key, "ServerHandles")
reg_site_key = find_registry_key(reg_key_handles, datasource)
if reg_site_key is not None:
host = winreg.QueryValueEx(reg_site_key, "path")[0]
port = int(winreg.QueryValueEx(reg_site_key, "port")[0])
return host, port
except FileNotFoundError:
return None


def get_handler(
imstype: Optional[IMSType],
datasource: str,
Expand Down
35 changes: 0 additions & 35 deletions tagreader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,6 @@ def is_linux() -> bool:
import subprocess


def find_registry_key(base_key, search_key_name: str):
search_key_name = search_key_name.lower()
if base_key is not None:
num_keys, _, _ = winreg.QueryInfoKey(base_key)
for i in range(0, num_keys):
key_name = winreg.EnumKey(base_key, i)
if key_name.lower() == search_key_name:
return winreg.OpenKey(base_key, key_name)
else:
key = find_registry_key(
winreg.OpenKey(base_key, key_name), search_key_name
)
if key is not None:
return key
return None


def find_registry_key_from_name(base_key, search_key_name: str):
search_key_name = search_key_name.lower()
num_keys, _, _ = winreg.QueryInfoKey(base_key)
key = key_string = None
for i in range(0, num_keys):
try:
key_string = winreg.EnumKey(base_key, i)
key = winreg.OpenKey(base_key, key_string)
_, num_vals, _ = winreg.QueryInfoKey(key)
if num_vals > 0:
(_, key_name, _) = winreg.EnumValue(key, 0)
if str(key_name).lower() == search_key_name:
break
except Exception as err:
logging.error("{}: {}".format(i, err))
return key, key_string


def convert_to_pydatetime(date_stamp: Union[datetime, str, pd.Timestamp]) -> datetime:
if isinstance(date_stamp, datetime):
return date_stamp
Expand Down
33 changes: 1 addition & 32 deletions tagreader/web_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from tagreader.cache import BucketCache, SmartCache
from tagreader.logger import logger
from tagreader.utils import IMSType, ReaderType, is_mac, is_windows, urljoin
from tagreader.utils import ReaderType, is_mac, is_windows, urljoin


class MD4:
Expand Down Expand Up @@ -258,11 +258,6 @@ def __init__(
self._max_rows = options.get("max_rows", 100000)
self._connection_string = "" # Used for raw SQL queries

@staticmethod
def generate_connection_string(host, *_, **__):
# todo: is obsolete after removing ODBC
raise NotImplementedError

@staticmethod
def generate_search_query(
tag: Optional[str],
Expand Down Expand Up @@ -670,27 +665,6 @@ def generate_sql_query(
connection_string += f"<![CDATA[{query}]]></SQL>"
return connection_string

def initialize_connection_string(
# todo: is obsolete after removing ODBC
self,
host: Optional[str] = None,
port: int = 10014,
connection_string: Optional[str] = None,
):
# todo: is obsolete after removing ODBC
if connection_string:
self._connection_string = connection_string
else:
if host is None:
from tagreader.clients import get_server_address_aspen

host = get_server_address_aspen(self.datasource)
self._connection_string = (
f"DRIVER=AspenTech SQLPlus;HOST={host};"
f"PORT={port};CHARINT=N;CHARFLOAT=N;"
"CHARTIME=N;CONVERTERRORS=N"
)

def query_sql(self, query: str, parse: bool = True) -> Union[str, pd.DataFrame]:
url = urljoin(self.base_url, "SQL")
if self._connection_string is None:
Expand Down Expand Up @@ -758,11 +732,6 @@ def __init__(
def _time_to_UTC_string(time: datetime) -> str:
return time.astimezone(pytz.UTC).strftime("%d-%b-%y %H:%M:%S")

@staticmethod
def generate_connection_string(host, *_, **__):
# todo: is obsolete after removing ODBC
raise NotImplementedError

@staticmethod
def escape(s: str) -> str:
# https://techsupport.osisoft.com/Documentation/PI-Web-API/help/topics/search-queries.html
Expand Down
16 changes: 0 additions & 16 deletions tests/test_AspenHandlerREST.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,19 +232,3 @@ def test_generate_sql_query(aspen_handler: AspenHandlerWeb) -> None:
"<![CDATA[myquery]]></SQL>"
)
assert res == expected


def test_initialize_connection_string(aspen_handler: AspenHandlerWeb) -> None:
# todo: is obsolete after removing ODBC
aspen_handler.initialize_connection_string(
host="my_host", port=999, connection_string="my_connection_string"
)
assert aspen_handler._connection_string == "my_connection_string"
aspen_handler.initialize_connection_string(
host="my_host",
port=999,
)
assert aspen_handler._connection_string == (
"DRIVER=AspenTech SQLPlus;HOST=my_host;PORT=999;"
"CHARINT=N;CHARFLOAT=N;CHARTIME=N;CONVERTERRORS=N"
)
2 changes: 1 addition & 1 deletion tests/test_clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import pytz

from tagreader.clients import IMSClient, get_missing_intervals, get_next_timeslice
from tagreader.utils import IMSType, ReaderType, is_windows
from tagreader.utils import IMSType, ReaderType


def test_init_client_without_cache() -> None:
Expand Down