Skip to content

Commit

Permalink
mappings
Browse files Browse the repository at this point in the history
  • Loading branch information
mjaquiery committed Apr 16, 2024
1 parent 8cb9655 commit ac33d3e
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 53 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ If you launch the program using the commands above, you will be prompted to ente

### Using the setup wizard

First, you'll be asked for the Galv server URL.
First, you'll be asked for the [Galv server](https://github.com/Battery-Intelligence-Lab/galv-backend) URL.
This should be the URL of the Galv server you have set up.
Providing a frontend URL will not work, as the harvester needs to communicate with the backend.

Expand Down
45 changes: 45 additions & 0 deletions harvester/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Harvester details

## Harvest process

Harvesters communicate with a [Galv server](https://github.com/Battery-Intelligence-Lab/galv-backend) to upload data files.
The Harvester runs on a simple loop.
**Calls to the server are in bold.** *Server responses are in italics.*:
1. Check for settings updates, including new Monitored Paths
2. For each Monitored Path:
1. For each file in the Monitored Path (recursively):
1. Attempt to open the file using the appropriate parser
2. If the file can be opened, **report the file size to the server**
3. *The server responds with the file status.*
4. If the status is one of 'STABLE' or 'RETRY IMPORT', the file is opened for parsing.
1. **Report the file metadata scraped by the parser**
2. **Report a summary of the file (first 10 rows for all columns)**
3. *The server may respond with a mapping object, used to rename/rescale columns.*
4. If no mapping object is returned, the file is left for later (user input required on the server side).
5. The file contents are loaded into a Dask dataframe, and the mapping object is applied.
6. **The dataframe is uploaded to the server as .parquet files.**
7. Temporary files are deleted.

## Mapping object
The mapping object is a dictionary with the following structure:
```json
{
"column_name_in_file": {
"new_name": "new_column_name",
"multiplier": 1.0,
"addition": 0.0,
"data_type": "bool|int|float|str|datetime64[ns]"
}
}
```
Columns will be coerced to the specified data type.
Coercion is done using the `pd.Series.asdtype()` function, except for datetime64[ns] columns,
which are coerced using `pd.to_datetime()`.

Numerical (int/float) columns will be rebased and rescaled according to the `multiplier` and `addition` fields.
New column values = (old column values + `addition`) * `multiplier`.

**Columns that are not in the mapping object are converted to float.**
This is to save space. While parquet files can handle strings fairly well,
they are not efficient at storing strings that are mostly numbers because
they are stored using a dictionary encoding suited to reoccurring strings.
218 changes: 171 additions & 47 deletions harvester/harvest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import json
import dask.dataframe
import requests
import fastnumbers

from . import settings
from .parse.exceptions import UnsupportedFileTypeError
Expand Down Expand Up @@ -42,6 +43,7 @@ class HarvestProcessor:
]

def __init__(self, file_path: str, monitored_path: dict):
self.mapping = None
self.file_path = file_path
self.monitored_path = monitored_path
for input_file_cls in self.registered_input_files:
Expand Down Expand Up @@ -92,15 +94,16 @@ def harvest(self):
self._report_file_metadata()
column_time = time.time()
logger.info(f"Metadata reported in {column_time - metadata_time:.2f} seconds")
self._report_column_metadata()
data_prep_time = time.time()
logger.info(f"Column metadata reported in {data_prep_time - column_time:.2f} seconds")
self._prepare_data()
upload_time = time.time()
logger.info(f"Data prepared in {upload_time - data_prep_time:.2f} seconds")
self._upload_data()
logger.info(f"Data uploaded in {time.time() - upload_time:.2f} seconds")
self._delete_temp_files()
self._report_summary()
if self.mapping is not None:
data_prep_time = time.time()
logger.info(f"Column metadata reported in {data_prep_time - column_time:.2f} seconds")
self._prepare_data()
upload_time = time.time()
logger.info(f"Data prepared in {upload_time - data_prep_time:.2f} seconds")
self._upload_data()
logger.info(f"Data uploaded in {time.time() - upload_time:.2f} seconds")
self._delete_temp_files()

def _report_file_metadata(self):
"""
Expand Down Expand Up @@ -132,39 +135,32 @@ def _report_file_metadata(self):
raise RuntimeError("API Error: server responded with error")
self.server_metadata = report.json()['upload_info']

def _report_column_metadata(self):
def _report_summary(self):
"""
Report the column metadata to the server.
Data include the column names, types, units, and whether they relate to recognised standard columns.
"""
columns = self.server_metadata.get('columns')
if len(columns):
mapping = {c.get('name'): c.get('id') for c in columns}
else:
mapping = self.input_file.get_file_column_to_standard_column_mapping()

# Use first row to determine column data types
columns_with_data = [c for c in self.input_file.column_info.keys() if self.input_file.column_info[c].get('has_data')]
first_row = next(self.input_file.load_data(self.input_file.file_path, columns_with_data))
column_data = {}
summary_row_count = 10
summary_data = []
iterator = self.input_file.load_data(
self.file_path,
[c for c in self.input_file.column_info.keys() if self.input_file.column_info[c].get('has_data')]
)
for row in iterator:
summary_data.append(row)
if len(summary_data) >= summary_row_count:
break

for k, v in first_row.items():
column_data[k] = {'data_type': type(v).__name__}
if k in mapping:
column_data[k]['column_type_id'] = mapping[k]
else:
column_data[k]['column_name'] = k
if 'unit' in self.input_file.column_info[k]:
column_data[k]['unit_symbol'] = self.input_file.column_info[k].get('unit')
summary = pandas.DataFrame(summary_data)

# Upload results
report = report_harvest_result(
path=self.file_path,
monitored_path_uuid=self.monitored_path.get('uuid'),
content={
'task': settings.HARVESTER_TASK_IMPORT,
'stage': settings.HARVEST_STAGE_COLUMN_METADATA,
'data': column_data
'stage': settings.HARVEST_STAGE_DATA_SUMMARY,
'data': summary.to_json()
}
)
if report is None:
Expand All @@ -177,13 +173,62 @@ def _report_column_metadata(self):
logger.error(f"Report Column Metadata - API Error: {report.status_code}")
raise RuntimeError("API Error: server responded with error")

mapping_url = report.json()['mapping']
if mapping_url is None:
logger.info("Mapping could not be automatically determined. Will revisit when user determines mapping.")
return
mapping_request = requests.get(mapping_url, headers={'Authorization': f"Harvester {get_setting('api_key')}"})
if mapping_request is None:
logger.error(f"Report Column Metadata - API Error: no response from server")
raise RuntimeError("API Error: no response from server")
if not mapping_request.ok:
try:
logger.error(f"Report Column Metadata - API responded with Error: {mapping_request.json()['error']}")
except BaseException:
logger.error(f"Report Column Metadata - API Error: {mapping_request.status_code}")
raise RuntimeError("API Error: server responded with error")
self.mapping = mapping_request.json().get('rendered_map')
if not self.mapping:
if mapping_request:
logger.error(f"Server returned mapping request but no mapping was found")
else:
logger.info("Mapping could not be automatically determined")

def _prepare_data(self):
"""
Read the data from the file and save it as a temporary .parquet file self.data_file
"""
def remap(df, mapping):
"""
Remap the columns in the dataframe according to the mapping.
"""
columns = list(df.columns)
for col_name, mapping in mapping.items():
new_name = mapping['new_name']
if mapping['data_type'] in ["bool", "str"]:
df[new_name] = df[col_name].astype(mapping["data_type"])
elif mapping['data_type'] == 'datetime64[ns]':
df[new_name] = pandas.to_datetime(df[col_name])
else:
if mapping['data_type'] == 'int':
df[new_name] = fastnumbers.try_forceint(df[col_name], map=list, on_fail=math.nan)
else:
df[new_name] = fastnumbers.try_float(df[col_name], map=list, on_fail=math.nan)

addition = mapping.get('addition', 0)
multiplier = mapping.get('multiplier', 1)
df[new_name] = df[new_name] + addition
df[new_name] = df[new_name] * multiplier
df.drop(columns=[col_name], inplace=True)
columns.pop(columns.index(col_name))
# If there are any columns left, they are not in the mapping and should be converted to floats
for col_name in columns:
df[col_name] = fastnumbers.try_float(df[col_name], map=list, on_fail=math.nan)
return df

def partition_generator(generator, partition_line_count=100_000):
def to_df(rows):
return pandas.DataFrame(rows)
return remap(pandas.DataFrame(rows), mapping=self.mapping)

stopping = False
while not stopping:
Expand Down Expand Up @@ -301,16 +346,98 @@ def __del__(self):
import pandas
import dask.dataframe
import shutil
import fastnumbers
import math
from harvester.settings import get_standard_units, get_standard_columns
from harvester.parse.biologic_input_file import BiologicMprInputFile
from harvester.parse.maccor_input_file import MaccorInputFile
os.system('cp .harvester/.harvester.json /harvester_files')
standard_units = get_standard_units()
standard_columns = get_standard_columns()
file_path = '.test-data/test-suite-small/adam_3_C05.mpr'
input_file = BiologicMprInputFile(file_path, standard_units=standard_units, standard_columns=standard_columns)
def partition_generator(generator, partition_line_count = 100_000_000):
file_path = '.test-data/test-suite-small/TPG1+-+Cell+15+-+002.txt'
input_file = MaccorInputFile(file_path, standard_units=standard_units, standard_columns=standard_columns)

mapping = {
"Amps": {
"new_name": "Current_A",
"data_type": "float",
"multiplier": 0.001,
"addition": 0
},
"Rec#": {
"new_name": "Sample_number",
"data_type": "int",
"multiplier": 1,
"addition": 0
},
"Step": {
"new_name": "Step_number",
"data_type": "int",
"multiplier": 1,
"addition": 0
},
"State": {
"new_name": "State",
"data_type": "str"
},
"Volts": {
"new_name": "Voltage_V",
"data_type": "float",
"multiplier": 1,
"addition": 0
},
"Temp 1": {
"new_name": "Temperature_K",
"data_type": "float",
"multiplier": 1,
"addition": 273.15
},
"DPt Time": {
"new_name": "Datetime",
"data_type": "datetime64[ns]"
},
"StepTime": {
"new_name": "Step_time_s",
"data_type": "float",
"multiplier": 1,
"addition": 0
},
"TestTime": {
"new_name": "Elapsed_time_s",
"data_type": "float",
"multiplier": 1,
"addition": 0
}
}

def remap(df, mapping):
columns = list(df.columns)
for col_name, mapping in mapping.items():
new_name = mapping['new_name']
if mapping['data_type'] in ["bool", "str"]:
df[new_name] = df[col_name].astype(mapping["data_type"])
elif mapping['data_type'] == 'datetime64[ns]':
df[new_name] = pandas.to_datetime(df[col_name])
else:
if mapping['data_type'] == 'int':
df[new_name] = fastnumbers.try_forceint(df[col_name], map=list, on_fail=math.nan)
else:
df[new_name] = fastnumbers.try_float(df[col_name], map=list, on_fail=math.nan)

addition = mapping.get('addition', 0)
multiplier = mapping.get('multiplier', 1)
df[new_name] = df[new_name] + addition
df[new_name] = df[new_name] * multiplier
df.drop(columns=[col_name], inplace=True)
columns.pop(columns.index(col_name))
# If there are any columns left, they are not in the mapping and should be converted to floats
for col_name in columns:
df[col_name] = fastnumbers.try_float(df[col_name], map=list, on_fail=math.nan)
return df

def partition_generator(generator, partition_line_count=100_000):
def to_df(rows):
return pandas.DataFrame(rows)
return remap(pandas.DataFrame(rows), mapping=mapping)

stopping = False
while not stopping:
rows = []
Expand All @@ -321,22 +448,19 @@ def to_df(rows):
stopping = True
yield to_df(rows)

generator = input_file.load_data(
partition_line_count = 10_000

reader = input_file.load_data(
file_path,
[c for c in input_file.column_info.keys() if input_file.column_info[c].get('has_data')]
)

data = dask.dataframe.from_map(pandas.DataFrame, partition_generator(generator, partition_line_count=100000))
data.compute()
print(f"Partitions: {data.npartitions}")
data.to_parquet(
"test.tmp.parquet",
write_index=False,
compute=True,
custom_metadata={
'galv-harvester-version': '0.1.0'
}
data = dask.dataframe.from_map(
pandas.DataFrame,
partition_generator(reader, partition_line_count=partition_line_count)
)

data.compute()
print(f"Rows: {data.shape[0].compute()}")
# Then we would upload the data by getting presigned URLs for each partition
shutil.rmtree("test.tmp.parquet")
Expand Down
2 changes: 1 addition & 1 deletion harvester/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def harvest_path(monitored_path: dict):
result = result.json()
status = result['state']
logger.info(f"Server assigned status '{status}'")
if status in ['STABLE', 'RETRY IMPORT']:
if status in ['STABLE', 'RETRY IMPORT', 'MAP ASSIGNED']:
logger.info(f"Parsing file {file_path}")
try:
file.harvest()
Expand Down
2 changes: 1 addition & 1 deletion harvester/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def update_envvars():
HARVESTER_STATUS_SUCCESS = 'success'
HARVESTER_STATUS_ERROR = 'error'
HARVEST_STAGE_FILE_METADATA = 'file metadata'
HARVEST_STAGE_COLUMN_METADATA = 'column metadata'
HARVEST_STAGE_DATA_SUMMARY = 'data summary'
HARVEST_STAGE_UPLOAD_PARQUET = 'upload parquet partitions'
HARVEST_STAGE_UPLOAD_COMPLETE = 'upload complete'
HARVEST_STAGE_COMPLETE = 'harvest complete'
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ click==8.1.3
requests==2.28.1
pandas==2.2.1
dask[complete]==2024.4.1
fastnumbers==5.1.0

# Filetype readers
galvani == 0.4.1
Expand Down
3 changes: 2 additions & 1 deletion start.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def create_monitored_path(
api_url, api_token, harvester_uuid, specified,
team_id, monitor_path, monitor_path_regex
) -> None:
# TODO: Ensure that the team is a member of the harvester's lab
click.echo("The harvester will monitor a path on the server for changes and upload files.")
click.echo(("You must be a Team administrator to create a monitored path. "
"Note that Lab administrators are not necessarily Team administrators."))
Expand Down Expand Up @@ -92,7 +93,7 @@ def monitored_path_exit(error: str):
teams = teams_administered[page:page + page_size]
has_prev = page != 0
has_next = len(teams_administered) > ((page + 1) * page_size)
click.echo("Press a number for the Team that will own this Harvester.")
click.echo("Press a number for the Team that will own this Monitored Path.")
for i, r in enumerate(teams):
s = f"{i}: {r['name']}"
click.echo(s)
Expand Down
Loading

0 comments on commit ac33d3e

Please sign in to comment.