-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuild_feature_store.py
247 lines (195 loc) · 8.9 KB
/
build_feature_store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
# Databricks notebook source
# MAGIC %md # Build Surrogate Model Feature Stores
# MAGIC
# MAGIC ### Goal
# MAGIC Transform surrogate model features (building metadata and weather) and write to feature store.
# MAGIC
# MAGIC ### Process
# MAGIC * Transform building metadata into features and subset to features of interest
# MAGIC * Apply upgrade logic to building metadata features
# MAGIC * Pivot weather data into wide vector format with pkey `weather_file_city` and a 8760-length timeseries vector for each weather feature column
# MAGIC * Write building metadata features and weather features to feature store tables
# MAGIC
# MAGIC ### I/Os
# MAGIC
# MAGIC ##### Inputs:
# MAGIC - `ml.surrogate_model.building_simulation_outputs_annual`: Building simulation outputs indexed by (building_id, upgrade_id)
# MAGIC - `ml.surrogate_model.building_metadata`: Building metadata indexed by (building_id)
# MAGIC - `ml.surrogate_model.weather_data_hourly`: Hourly weather data indexed by (weather_file_city, hour datetime)
# MAGIC
# MAGIC ##### Outputs:
# MAGIC - `ml.surrogate_model.building_features`: Building metadata features indexed by (building_id)
# MAGIC - `ml.surrogate_model.weather_features_hourly`: Weather features indexed by (weather_file_city) with a 8760-length timeseries vector for each weather feature column
# MAGIC
# MAGIC ---
# MAGIC Cluster/ User Requirements
# MAGIC - Access Mode: Single User or Shared (Not No Isolation Shared)
# MAGIC - Runtime: >= Databricks Runtime 13.2 for ML or above (or >= Databricks Runtime 13.2 + `%pip install databricks-feature-engineering`)
# MAGIC - `USE CATALOG`, `CREATE SCHEMA` privleges on the `ml` Unity Catalog (Ask Miki for access if permission is denied)
# COMMAND ----------
# MAGIC %load_ext autoreload
# MAGIC %autoreload 2
# COMMAND ----------
# DBTITLE 1,Imports
import re
from functools import reduce
import pyspark.sql.functions as F
from databricks.feature_engineering import FeatureEngineeringClient
from pyspark.sql import DataFrame
from pyspark.sql.types import StringType
from src.dmutils import qa_utils
from src import feature_utils
# COMMAND ----------
# MAGIC %md ## Feature Transformation
# COMMAND ----------
# MAGIC %md #### Baseline
# MAGIC
# MAGIC Refer to [Notion Page](https://www.notion.so/rewiringamerica/Features-Upgrades-c8239f52a100427fbf445878663d7135?pvs=4#086a1d050b8c4094ad10e2275324668b) and [options.tsv](https://github.com/NREL/resstock/blob/run/euss/resources/options_lookup.tsv).
# MAGIC
# COMMAND ----------
# DBTITLE 1,Transform building metadata
baseline_building_metadata_transformed = feature_utils.transform_building_features('ml.surrogate_model.building_metadata')
# COMMAND ----------
# MAGIC %md #### Upgrades
# MAGIC
# MAGIC Refer to [Notion Page](https://www.notion.so/rewiringamerica/Features-Upgrades-c8239f52a100427fbf445878663d7135?pvs=4#3141dfeeb07144da9fe983b2db13b6d3), [ResStock docs](https://oedi-data-lake.s3.amazonaws.com/nrel-pds-building-stock/end-use-load-profiles-for-us-building-stock/2022/EUSS_ResRound1_Technical_Documentation.pdf), and [upgrade.yml](https://github.com/NREL/resstock/blob/run/euss/EUSS-project-file_2018_10k.yml).
# COMMAND ----------
# DBTITLE 1,Build metadata table for all samples and upgrades
building_metadata_upgrades = feature_utils.build_upgrade_metadata_table(baseline_building_metadata_transformed)
# COMMAND ----------
# DBTITLE 1,Drop rows where upgrade was not applied
building_metadata_applicable_upgrades = feature_utils.drop_non_upgraded_samples(
building_metadata_upgrades,
check_applicability_logic=True)
# COMMAND ----------
# MAGIC %md #### Summary
# COMMAND ----------
# DBTITLE 1,Make sure there are no Null Features
n_building_upgrade_samples = building_metadata_applicable_upgrades.count()
print(n_building_upgrade_samples)
non_null_df = building_metadata_applicable_upgrades.dropna()
assert non_null_df.count() == n_building_upgrade_samples, "Null values present, run qa_utils.check_for_null_values(building_metadata_upgrades)"
# COMMAND ----------
# DBTITLE 1,Print out feature counts
pkey_cols = ["weather_file_city", "upgrade_id", "building_id"]
string_columns = [
field.name
for field in building_metadata_upgrades.schema.fields
if isinstance(field.dataType, StringType) and field.name not in pkey_cols
]
non_string_columns = [
field.name
for field in building_metadata_upgrades.schema.fields
if not isinstance(field.dataType, StringType) and field.name not in pkey_cols
]
# count how many distinct vals there are for each categorical feature
distinct_string_counts = building_metadata_upgrades.select(
[F.countDistinct(c).alias(c) for c in string_columns]
)
# Collect the results as a dictionary
distinct_string_counts_dict = distinct_string_counts.collect()[0].asDict()
# print the total number of features:
print(f"Building Metadata Features: {len(non_string_columns + string_columns)}")
print(f"\tNumeric Features: {len(non_string_columns)}")
print(f"\tCategorical Features: {len(string_columns)}")
print(
f"\tFeature Dimensionality: {len(non_string_columns) + sum(distinct_string_counts_dict.values()) }"
)
# COMMAND ----------
# MAGIC %md ### Weather Features
# COMMAND ----------
# DBTITLE 1,Weather feature transformation function
def transform_weather_features() -> DataFrame:
"""
Read and transform weather timeseries table. Pivot from long format indexed by (weather_file_city, hour)
to a table indexed by weather_file_city with a 8760 len array timeseries for each weather feature column
Returns:
DataFrame: wide(ish) format dataframe indexed by weather_file_city with timeseries array for each weather feature
"""
weather_df = spark.read.table("ml.surrogate_model.weather_data_hourly")
weather_pkeys = ["weather_file_city"]
weather_data_arrays = weather_df.groupBy(weather_pkeys).agg(
*[
F.collect_list(c).alias(c)
for c in weather_df.columns
if c not in weather_pkeys + ["datetime_formatted"]
]
)
return weather_data_arrays
# COMMAND ----------
# DBTITLE 1,Transform weather features
weather_features = transform_weather_features()
# COMMAND ----------
# DBTITLE 1,Add weather file city index
# fit the string indexer on the weather feature df
weather_file_city_indexer = feature_utils.fit_weather_city_index(df_to_fit=weather_features)
# apply indexer to weather feature df to get a weather_file_city_index column
weather_features_indexed = feature_utils.transform_weather_city_index(
df_to_transform=weather_features,
weather_file_city_indexer=weather_file_city_indexer)
# apply indexer to building metadata feature df to get a weather_file_city_index column
building_metadata_with_weather_index = feature_utils.transform_weather_city_index(
df_to_transform=building_metadata_applicable_upgrades,
weather_file_city_indexer=weather_file_city_indexer)
# COMMAND ----------
# MAGIC %md ## Create Feature Store
# MAGIC
# COMMAND ----------
# MAGIC %md ### Create/Use schema in catalog in the Unity Catalog MetaStore
# MAGIC
# MAGIC To use an existing catalog, you must have the `USE CATALOG` privilege on the catalog.
# MAGIC To create a new schema in the catalog, you must have the `CREATE SCHEMA` privilege on the catalog.
# COMMAND ----------
# DBTITLE 1,Check if you have access on ml catalog
# MAGIC %sql
# MAGIC -- if you do not see `ml` listed here, this means you do not have permissions
# MAGIC SHOW CATALOGS
# COMMAND ----------
# DBTITLE 1,Set up catalog and schema
# MAGIC %sql
# MAGIC -- Use existing catalog:
# MAGIC USE CATALOG ml;
# MAGIC -- Create a new schema
# MAGIC CREATE SCHEMA IF NOT EXISTS surrogate_model;
# MAGIC USE SCHEMA surrogate_model;
# COMMAND ----------
# MAGIC %md ### Create/modify the feature stores
# COMMAND ----------
# MAGIC %sql
# MAGIC -- the following code will upsert. To overwrite, uncomment this line to first drop the table
# MAGIC -- DROP TABLE ml.surrogate_model.building_features
# COMMAND ----------
# DBTITLE 1,Create a FeatureEngineeringClient
fe = FeatureEngineeringClient()
# COMMAND ----------
# DBTITLE 1,Write out building metadata feature store
table_name = "ml.surrogate_model.building_features"
df = building_metadata_with_weather_index
if spark.catalog.tableExists(table_name):
fe.write_table(name=table_name, df=df, mode="merge")
else:
fe.create_table(
name=table_name,
primary_keys=["building_id", "upgrade_id", "weather_file_city"],
df=df,
schema=df.schema,
description="building metadata features",
)
# COMMAND ----------
# DBTITLE 1,Write out weather data feature store
table_name = "ml.surrogate_model.weather_features_hourly"
df = weather_features_indexed
if spark.catalog.tableExists(table_name):
fe.write_table(
name=table_name,
df=df,
mode="merge",
)
else:
fe.create_table(
name=table_name,
primary_keys=["weather_file_city"],
df=df,
schema=df.schema,
description="hourly weather timeseries array features",
)