Skip to content

Commit

Permalink
Merge pull request #2 from stvoutsin/bugfix/import-paths
Browse files Browse the repository at this point in the history
Small bugfixes (Import paths, schema definitions to list & folder path / schema rearrange)
  • Loading branch information
NigelHambly authored May 24, 2022
2 parents dc86006 + f27a525 commit 5e2ca42
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 19 deletions.
18 changes: 9 additions & 9 deletions gaiadmpsetup/gaiadmpsetup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession

import gaiaedr3_pyspark_schema_structures as edr3
import gaiadr3_pyspark_schema_structures as dr3
from gaiadmpstore import *
from . import gaiaedr3_pyspark_schema_structures as edr3
from . import gaiadr3_pyspark_schema_structures as dr3
from .gaiadmpstore import *

spark = SparkSession.builder.getOrCreate()

Expand All @@ -25,7 +25,6 @@ def tablesExist():
return check

if not tablesExist():

# database name to create
database = "gaiaedr3"

Expand All @@ -35,15 +34,15 @@ def tablesExist():

# create the tables against their corresponding file sets and schema
for table_key in edr3.table_dict.keys():
folder_path = edr3.table_dict[table_key][0]
schema = edr3.table_dict[table_key][1]
reattachParquetFileResourceToSparkContext(table_key, data_store + folder_path, *schema)
folder_path = edr3.table_dict[table_key][1]
schemas = edr3.table_dict[table_key][0]
reattachParquetFileResourceToSparkContext(table_key, data_store + folder_path, schemas)

# ... similarly for Gaia DR3
database = "gaiadr3"
spark.sql("create database " + database)
spark.sql("use " + database)

# TODO create the tables against their corresponding file sets and schema
#for table_key in dr3.table_dict.keys():
# folder_path = dr3.table_dict[table_key][0]
Expand All @@ -52,3 +51,4 @@ def tablesExist():


GaiaDMPSetup.setup()

3 changes: 2 additions & 1 deletion gaiadmpsetup/gaiadmpstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def saveToBinnedParquet(df, outputParquetPath, name, mode = "error", buckets = N
.option("path", outputParquetPath) \
.saveAsTable(name)

def reattachParquetFileResourceToSparkContext(table_name, file_path, *schema_structures, cluster_key = default_key, sort_key = default_key, buckets = NUM_BUCKETS):
def reattachParquetFileResourceToSparkContext(table_name, file_path, schema_structures, cluster_key = default_key, sort_key = default_key, buckets = NUM_BUCKETS):
"""
Creates a Spark (in-memory) meta-record for the table resource specified for querying
through the PySpark SQL API.
Expand Down Expand Up @@ -204,3 +204,4 @@ def cast_all_arrays(data_frame : DataFrame, data_structure : StructType):
# finally reorder according to the original specification
return reorder_columns(data_frame, data_structure)


11 changes: 6 additions & 5 deletions gaiadmpsetup/gaiadr3_pyspark_schema_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1619,19 +1619,20 @@
#'vari_rad_vel_statistics',
#'vari_short_timescale',
'xp_continuous_mean_spectrum' :
((xp_continuous_mean_spectrum_schema), release_folder + '/GDR3_XP_CONTINUOUS_MEAN_SPECTRUM'),
([xp_continuous_mean_spectrum_schema], release_folder + '/GDR3_XP_CONTINUOUS_MEAN_SPECTRUM'),
'xp_sampled_mean_spectrum' :
((xp_sampled_mean_spectrum_schema), release_folder + '/GDR3_XP_SAMPLED_MEAN_SPECTRUM'),
([xp_sampled_mean_spectrum_schema], release_folder + '/GDR3_XP_SAMPLED_MEAN_SPECTRUM'),
'xp_summary' :
((xp_summary_schema), release_folder + '/GDR3_XP_SUMMARY'),
([xp_summary_schema], release_folder + '/GDR3_XP_SUMMARY'),
#'commanded_scan_law',
#'agn_cross_id',
#'frame_rotator_source',
#'gaia_crf3_xm',
'gaia_source_simulation' :
((gaia_source_simulation_schema), release_folder + '/GDR3_GAIA_SOURCE_SIMULATION'),
([gaia_source_simulation_schema], release_folder + '/GDR3_GAIA_SOURCE_SIMULATION'),
'gaia_universe_model' :
((gaia_universe_model_schema), release_folder + '/GDR3_UNIVERSE_MODEL'),
([gaia_universe_model_schema], release_folder + '/GDR3_UNIVERSE_MODEL'),
}
# ... small tables commented out: TODO decide later what to include.


9 changes: 5 additions & 4 deletions gaiadmpsetup/gaiaedr3_pyspark_schema_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,12 +577,13 @@
# dictionary of all tables: key is table name, value = tuple(tuple of schema(s), subfolder containing parquet files)
table_dict = {
'gaia_source' :
((gaia_source_schema), release_folder + '/GEDR3_GAIASOURCE'),
([gaia_source_schema], release_folder + '/GEDR3_GAIASOURCE'),
'gaia_source_tmasspsc_best_neighbours' :
((tmasspscxsc_best_neighbour_schema, twomass_psc_schema), release_folder + '/GEDR3_2MASSPSC_BEST_NEIGHBOURS'),
([tmasspscxsc_best_neighbour_schema, twomass_psc_schema], release_folder + '/GEDR3_2MASSPSC_BEST_NEIGHBOURS'),
'gaia_source_allwise_best_neighbours' :
((allwise_best_neighbour_schema, twomass_psc_schema), release_folder + '/GEDR3_ALLWISE_BEST_NEIGHBOURS'),
([allwise_best_neighbour_schema, twomass_psc_schema], release_folder + '/GEDR3_ALLWISE_BEST_NEIGHBOURS'),
'gaia_source_ps1_best_neighbours' :
((panstarrs1_best_neighbour_schema, panstarrs_dr1_otmo_schema), release_folder + '/GEDR3_PS1_BEST_NEIGHBOURS')
([panstarrs1_best_neighbour_schema, panstarrs_dr1_otmo_schema], release_folder + '/GEDR3_PS1_BEST_NEIGHBOURS')
}


0 comments on commit 5e2ca42

Please sign in to comment.