Skip to content

Commit

Permalink
Merge pull request #34 from astronomy-commons/sam/fsspec
Browse files Browse the repository at this point in the history
Sam/fsspec
  • Loading branch information
swyatt7 authored Oct 23, 2023
2 parents 0d64473 + c78365c commit 4748fc2
Show file tree
Hide file tree
Showing 15 changed files with 1,153 additions and 77 deletions.
629 changes: 629 additions & 0 deletions cloud_tests/.pylintrc

Large diffs are not rendered by default.

63 changes: 63 additions & 0 deletions cloud_tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# LSDB Cloud Tests

### Performing LSDB cloud tests
The only currently implemented cloud platform is abfs. In order to run the tests, you will need to export the following environmental variables in a command line:
```bash
export ABFS_LINCCDATA_ACCOUNT_NAME=lincc_account_name
export ABFS_LINCCDATA_ACCOUNT_KEY=lincc_account_key
```
Then to run the tests:
```bash
pytest cloud_tests/ --timeout 10 --cloud abfs
```
The timeout needs to be increased to account for latency in contacting cloud buckets, and performing heavier i/o commputations.


### How are we connecting to the cloud resources?

We have abstracted our entire i/o infrastructure to be read through the python [fsspec](https://filesystem-spec.readthedocs.io/en/latest/index.html) library. All that needs to be provided is a valid protocol pathway, and storage options for the cloud interface.


### Adding tests for a new cloud interface protocol

There are various steps to have tests run on another cloud bucket provider (like s3 or gcs).

* 1.) You will have to create the container/bucket
* 2.) You will have to edit `cloud_tests/conftest.py` in multiple places:
```python
...
#...line 38...
@pytest.fixture
def example_cloud_path(cloud):
if cloud == "abfs":
return "abfs:///hipscat/pytests/"

#your new addition
elif cloud == "new_protocol":
return "new_protocol:///path/to/pytest/"

else:
raise NotImplementedError("Cloud format not implemented for hipscat tests!")

@pytest.fixture
def example_cloud_storage_options(cloud):
if cloud == "abfs":
storage_options = {
"account_key" : os.environ.get("ABFS_LINCCDATA_ACCOUNT_KEY"),
"account_name" : os.environ.get("ABFS_LINCCDATA_ACCOUNT_NAME")
}
return storage_options

#your new addition
elif cloud == "new_protocol":
storage_options = {
"valid_storage_option_param1" : os.environ.get("NEW_PROTOCOL_PARAM1"),
"valid_storage_option_param1" : os.environ.get("NEW_PROTOCOL_PARAM2"),
...
}

return {}
```

* 3.) Finally, you will need to copy the entire `/tests/data/` directory into your newly created bucket. This can be accomplished by running the `copy_data_to_fs.py` script in the `cloud_tests/` directory.
* 4.) Before running the tests, you will need to export your `valid_storage_option_param` into the environment.
129 changes: 129 additions & 0 deletions cloud_tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import os

import hipscat as hc
import hipscat.io.file_io.file_io as file_io
import pandas as pd
import pytest

import lsdb

DATA_DIR_NAME = "data"
SMALL_SKY_DIR_NAME = "small_sky"
SMALL_SKY_XMATCH_NAME = "small_sky_xmatch"
SMALL_SKY_NO_METADATA_DIR_NAME = "small_sky_no_metadata"
SMALL_SKY_ORDER1_DIR_NAME = "small_sky_order1"
XMATCH_CORRECT_FILE = "xmatch_correct.csv"
XMATCH_CORRECT_005_FILE = "xmatch_correct_0_005.csv"
XMATCH_CORRECT_3n_2t_NO_MARGIN_FILE = "xmatch_correct_3n_2t_no_margin.csv"
XMATCH_MOCK_FILE = "xmatch_mock.csv"
TEST_DIR = os.path.dirname(__file__)


def pytest_addoption(parser):
parser.addoption("--cloud", action="store", default="abfs")


def pytest_generate_tests(metafunc):
# This is called for every test. Only get/set command line arguments
# if the argument is specified in the list of test "fixturenames".
option_value = metafunc.config.option.cloud
if 'cloud' in metafunc.fixturenames and option_value is not None:
metafunc.parametrize("cloud", [option_value])


@pytest.fixture
def example_cloud_path(cloud):
if cloud == "abfs":
return "abfs:///hipscat/pytests/"

else:
raise NotImplementedError("Cloud format not implemented for lsdb tests!")


@pytest.fixture
def example_cloud_storage_options(cloud):
if cloud == "abfs":
storage_options = {
"account_key" : os.environ.get("ABFS_LINCCDATA_ACCOUNT_KEY"),
"account_name" : os.environ.get("ABFS_LINCCDATA_ACCOUNT_NAME")
}
return storage_options

return {}


@pytest.fixture
def test_data_dir_cloud(example_cloud_path):
return os.path.join(example_cloud_path, "lsdb", "data")


@pytest.fixture
def small_sky_dir_cloud(test_data_dir_cloud):
return os.path.join(test_data_dir_cloud, SMALL_SKY_DIR_NAME)


@pytest.fixture
def small_sky_xmatch_dir_cloud(test_data_dir_cloud):
return os.path.join(test_data_dir_cloud, SMALL_SKY_XMATCH_NAME)


@pytest.fixture
def small_sky_no_metadata_dir_cloud(test_data_dir_cloud):
return os.path.join(test_data_dir_cloud, SMALL_SKY_NO_METADATA_DIR_NAME)


@pytest.fixture
def small_sky_order1_dir_cloud(test_data_dir_cloud):
return os.path.join(test_data_dir_cloud, SMALL_SKY_ORDER1_DIR_NAME)


@pytest.fixture
def small_sky_hipscat_catalog_cloud(small_sky_dir_cloud, example_cloud_storage_options):
return hc.catalog.Catalog.read_from_hipscat(
small_sky_dir_cloud, storage_options=example_cloud_storage_options
)

@pytest.fixture
def small_sky_catalog_cloud(small_sky_dir_cloud, example_cloud_storage_options):
return lsdb.read_hipscat(small_sky_dir_cloud, storage_options=example_cloud_storage_options)


@pytest.fixture
def small_sky_xmatch_catalog_cloud(small_sky_xmatch_dir_cloud, example_cloud_storage_options):
return lsdb.read_hipscat(small_sky_xmatch_dir_cloud, storage_options=example_cloud_storage_options)


@pytest.fixture
def small_sky_order1_hipscat_catalog_cloud(small_sky_order1_dir_cloud, example_cloud_storage_options):
return hc.catalog.Catalog.read_from_hipscat(small_sky_order1_dir_cloud, storage_options=example_cloud_storage_options)


@pytest.fixture
def small_sky_order1_catalog_cloud(small_sky_order1_dir_cloud, example_cloud_storage_options):
return lsdb.read_hipscat(small_sky_order1_dir_cloud, storage_options=example_cloud_storage_options)


@pytest.fixture
def xmatch_correct_cloud(small_sky_xmatch_dir_cloud, example_cloud_storage_options):
pathway = os.path.join(small_sky_xmatch_dir_cloud, XMATCH_CORRECT_FILE)
return file_io.load_csv_to_pandas(pathway, storage_options=example_cloud_storage_options)
#return pd.read_csv(os.path.join(small_sky_xmatch_dir_cloud, XMATCH_CORRECT_FILE), storage_options=example_cloud_storage_options)


@pytest.fixture
def xmatch_correct_005_cloud(small_sky_xmatch_dir_cloud, example_cloud_storage_options):
pathway = os.path.join(small_sky_xmatch_dir_cloud, XMATCH_CORRECT_005_FILE)
return file_io.load_csv_to_pandas(pathway, storage_options=example_cloud_storage_options)
#return pd.read_csv(os.path.join(small_sky_xmatch_dir, XMATCH_CORRECT_005_FILE))


@pytest.fixture
def xmatch_correct_3n_2t_no_margin_cloud(small_sky_xmatch_dir_cloud, example_cloud_storage_options):
pathway = os.path.join(small_sky_xmatch_dir_cloud, XMATCH_CORRECT_3n_2t_NO_MARGIN_FILE)
return file_io.load_csv_to_pandas(pathway, storage_options=example_cloud_storage_options)


@pytest.fixture
def xmatch_mock_cloud(small_sky_xmatch_dir_cloud, example_cloud_storage_options):
pathway = os.path.join(small_sky_xmatch_dir_cloud, XMATCH_MOCK_FILE)
return file_io.load_csv_to_pandas(pathway, storage_options=example_cloud_storage_options)
72 changes: 72 additions & 0 deletions cloud_tests/copy_data_to_fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os

from hipscat.io.file_io.file_io import get_fs


def copy_tree_fs_to_fs(
fs1_source: str, fs2_destination: str,
storage_options1: dict = None, storage_options2: dict = None,
verbose=False
):
"""Recursive Copies directory from one filesystem to the other.
Args:
fs1_source: location of source directory to copy
fs2_destination: location of destination directory to for fs1 to be written two
storage_options1: dictionary that contains abstract filesystem1 credentials
storage_options2: dictionary that contains abstract filesystem2 credentials
"""

source_fs, source_fp = get_fs(fs1_source, storage_options=storage_options1)
destination_fs, desintation_fp = get_fs(fs2_destination, storage_options=storage_options2)
copy_dir(source_fs, source_fp, destination_fs, desintation_fp, verbose=verbose)


def copy_dir(source_fs, source_fp, destination_fs, desintation_fp, verbose=False, chunksize=1024*1024):
"""Recursive method to copy directories and their contents.
Args:
fs1: fsspec.filesystem for the source directory contents
fs1_pointer: source directory to copy content files
fs2: fsspec.filesytem for destination directory
fs2_pointer: destination directory for copied contents
"""
destination_folder = os.path.join(desintation_fp, source_fp.split("/")[-1])
if destination_folder[-1] != "/":
destination_folder += "/"
if not destination_fs.exists(destination_folder):
if verbose:
print(f"Creating destination folder: {destination_folder}")
destination_fs.makedirs(destination_folder, exist_ok=True)

dir_contents = source_fs.listdir(source_fp)
files = [x for x in source_fs.listdir(source_fp) if x["type"] == "file"]

for _file in files:
destination_fname = os.path.join(destination_folder, _file["name"].split("/")[-1])
if verbose:
print(f'Copying file {_file["name"]} to {destination_fname}')
with source_fs.open(_file["name"], "rb") as source_file:
with destination_fs.open(destination_fname, "wb") as destination_file:
while True:
chunk = source_file.read(chunksize)
if not chunk:
break
destination_file.write(chunk)

dirs = [x for x in dir_contents if x["type"] == "directory"]
for _dir in dirs:
copy_dir(source_fs, _dir["name"], destination_fs, destination_folder, chunksize=chunksize, verbose=verbose)

if __name__ == "__main__":

source_pw = f"{os.getcwd()}/../tests/data"
target_pw = "new_protocol:///path/to/pytest/lsdb"

target_so = {
"valid_storage_option_param1" : os.environ.get("NEW_PROTOCOL_PARAM1"),
"valid_storage_option_param1" : os.environ.get("NEW_PROTOCOL_PARAM2"),
}
copy_tree_fs_to_fs(
source_pw, target_pw, {}, target_so, verbose=True
)
26 changes: 26 additions & 0 deletions cloud_tests/lsdb/catalog/test_catalog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import dask.dataframe as dd
import pandas as pd
from hipscat.pixel_math import HealpixPixel


def test_catalog_repr_equals_ddf_repr(small_sky_order1_catalog_cloud):
assert repr(small_sky_order1_catalog_cloud) == repr(small_sky_order1_catalog_cloud._ddf)


def test_catalog_html_repr_equals_ddf_html_repr(small_sky_order1_catalog_cloud):
assert small_sky_order1_catalog_cloud._repr_html_() == small_sky_order1_catalog_cloud._ddf._repr_html_()


def test_catalog_compute_equals_ddf_compute(small_sky_order1_catalog_cloud):
pd.testing.assert_frame_equal(small_sky_order1_catalog_cloud.compute(), small_sky_order1_catalog_cloud._ddf.compute())


def test_get_catalog_partition_gets_correct_partition(small_sky_order1_catalog_cloud):
for _, row in small_sky_order1_catalog_cloud.hc_structure.get_pixels().iterrows():
hp_order = row["Norder"]
hp_pixel = row["Npix"]
partition = small_sky_order1_catalog_cloud.get_partition(hp_order, hp_pixel)
pixel = HealpixPixel(order=hp_order, pixel=hp_pixel)
partition_index = small_sky_order1_catalog_cloud._ddf_pixel_map[pixel]
ddf_partition = small_sky_order1_catalog_cloud._ddf.partitions[partition_index]
dd.utils.assert_eq(partition, ddf_partition)
48 changes: 48 additions & 0 deletions cloud_tests/lsdb/catalog/test_cone_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pytest
from astropy.coordinates import SkyCoord


def test_cone_search_filters_correct_points(small_sky_order1_catalog_cloud):
ra = 0
dec = -80
radius = 20
center_coord = SkyCoord(ra, dec, unit='deg')
cone_search_catalog = small_sky_order1_catalog_cloud.cone_search(ra, dec, radius).compute()
print(len(cone_search_catalog))
for _, row in small_sky_order1_catalog_cloud.compute().iterrows():
row_ra = row[small_sky_order1_catalog_cloud.hc_structure.catalog_info.ra_column]
row_dec = row[small_sky_order1_catalog_cloud.hc_structure.catalog_info.dec_column]
sep = SkyCoord(row_ra, row_dec, unit='deg').separation(center_coord)
if sep.degree <= radius:
assert len(cone_search_catalog.loc[cone_search_catalog["id"] == row["id"]]) == 1
else:
assert len(cone_search_catalog.loc[cone_search_catalog["id"] == row["id"]]) == 0


def test_cone_search_filters_partitions(small_sky_order1_catalog_cloud):
ra = 0
dec = -80
radius = 20
hc_conesearch = small_sky_order1_catalog_cloud.hc_structure.filter_by_cone(ra, dec, radius)
consearch_catalog = small_sky_order1_catalog_cloud.cone_search(ra, dec, radius)
assert len(hc_conesearch.get_healpix_pixels()) == len(consearch_catalog.hc_structure.get_pixels())
assert len(hc_conesearch.get_healpix_pixels()) == consearch_catalog._ddf.npartitions
print(hc_conesearch.get_healpix_pixels())
for pixel in hc_conesearch.get_healpix_pixels():
assert pixel in consearch_catalog._ddf_pixel_map


def test_negative_radius_errors(small_sky_order1_catalog_cloud):
with pytest.raises(ValueError):
small_sky_order1_catalog_cloud.cone_search(0, 0, -1)


def test_invalid_ra_dec(small_sky_order1_catalog_cloud):
with pytest.raises(ValueError):
small_sky_order1_catalog_cloud.cone_search(-200, 0, 1)
with pytest.raises(ValueError):
small_sky_order1_catalog_cloud.cone_search(200, 0, 1)
with pytest.raises(ValueError):
small_sky_order1_catalog_cloud.cone_search(0, -100, 1)
with pytest.raises(ValueError):
small_sky_order1_catalog_cloud.cone_search(0, 100, 1)
Loading

0 comments on commit 4748fc2

Please sign in to comment.