Skip to content

Commit

Permalink
Column and topic mapping refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
rcboufleur committed Oct 17, 2024
1 parent 0bd52d1 commit 284b0df
Showing 1 changed file with 14 additions and 22 deletions.
36 changes: 14 additions & 22 deletions python/lsst/consdb/efd_transform/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def process_interval(
"instrument": instrument,
}

# map all topics and fields to perform a single query per topic
# map all topics and fields to perform a single query per topic
topics_columns_map = {}
for column in self.config["columns"]:
Expand All @@ -128,32 +129,23 @@ def process_interval(
"columns": [],
}
for field in values["fields"]:
# print(field["name"])
topics_columns_map[topic]["fields"].append(field)
topics_columns_map[topic]["fields"].append(field["name"])

# remove duplicate fields per topic
topics_columns_map[topic]["fields"] = list(set(topics_columns_map[topic]["fields"]))

# Append packed_series to the list
topics_columns_map[topic]["packed_series"].append(column["packed_series"])
topics_columns_map[topic]["columns"].append(column)

# Remove duplicated fiels in topic fields
for key, value in topics_columns_map.items():
# Removing duplicates from 'fields'
seen_fields = []
unique_fields = []
for field in value["fields"]:
if field not in seen_fields:
seen_fields.append(field)
unique_fields.append(field)
value["fields"] = unique_fields

# 'packed_series' should be a boolean (all elements should be True
# or False)
value["packed_series"] = all(value["packed_series"])
# Add a new key to store if any series is packed
topics_columns_map[topic]["is_packed"] = any(topics_columns_map[topic]["packed_series"])

# Iterates over topic to perform the transformation
for key, topic in topics_columns_map.items():
# query the topic
self.log.info(f"Querying the Topic: {topic['name']}")
topic_series = self.get_efd_values(topic, topic_interval, topic["packed_series"])
topic_series = self.get_efd_values(topic, topic_interval, topic["is_packed"])

# process the columns in that topic:
for column in topic["columns"]:
Expand Down Expand Up @@ -200,9 +192,7 @@ def process_interval(
)

result_exp[exposure["id"]][column["name"]] = column_value

# if column["function"] == "proccess_column_value":
# print(result_exp)


if "VisitEFD" in column["tables"]:
for visit in visits:
Expand All @@ -222,6 +212,8 @@ def process_interval(
results.append(result_exp[result_row])

df_exposures = pandas.DataFrame(results)

df_exposures.to_csv('exposures.csv')
self.log.info(f"Exposure results to be inserted into the database: {len(df_exposures)}")

exp_dao = ExposureEfdDao(db_uri=self.db_uri)
Expand Down Expand Up @@ -377,7 +369,7 @@ def get_efd_values(
topic.get("window", 0.0), format="sec"
) # Time window around the interval

fields = [f["name"] for f in topic["fields"]] # List of field names to query
fields = [f for f in topic["fields"]] # List of field names to query

# Define the chunk size for querying to manage large numbers of fields
chunk_size = 100 # Adjust as necessary based on system capabilities
Expand Down Expand Up @@ -411,7 +403,7 @@ def get_efd_values(
# only if not empty
except Exception as e:
# Log any errors encountered during querying
self.log.debug(e)
self.log.warning(f"An unexpected error occurred: {e}")
# Optional: you might want to include a placeholder DataFrame
# with the same columns here if needed

Expand Down

0 comments on commit 284b0df

Please sign in to comment.