Skip to content

Latest commit

 

History

History
205 lines (143 loc) · 6.44 KB

README.rst

File metadata and controls

205 lines (143 loc) · 6.44 KB

xpypact: FISPACT output to Polars or DuckDB datasets converter

Aggregate results of FISPACT runs to efficient datasets.

Maintained License PyPI - Python Version PyPI Documentation Status

Note

This document is in progress.

The module loads FISPACT JSON output files and converts to Polars dataframes with minor data normalization. The dataframes can be stored either to parquet files or DuckDb database. This allows efficient data aggregation and analysis. Multiple JSON files for different FISPACT runs can be combined using simple additional identification. So far, we use just two-dimensional identification by material and case. The case usually identifies certain neutron flux energy distribution.

  • export to DuckDB
  • export to parquet files
  • neutron flux presentation conversion

From PyPI

pip install xpypact

As dependency

poetry add xpypact

From source

pip install htpps://github.com/MC-kit/xpypact.git
from xpypact import FullDataCollector, Inventory

def get_material_id(p: Path) -> int:
    ...

def get_case_id(p: Path) -> int:
    ...

jsons = [path1, path2, ...]
material_ids = {p: get_material_id(p) for p in jsons }
case_ids = {c: get_case_id(p) for p in jsons }

collector = FullDataCollector()

if sequential_load:
    for json in jsons:
        inventory = Inventory.from_json(json)
        collector.append(inventory, material_id=material_ids[json], case_id=case_ids[json])

else:  # multithreading is allowed for collector as well

    task_list = ...  # list of tuples[directory, case_id, tasks_sequence]
    threads = 16  # whatever

    def _find_path(arg) -> tuple[int, int, Path]:
        _case, path, inventory = arg
        json_path: Path = (Path(path) / inventory).with_suffix(".json")
        if not json_path.exists():
            msg = f"Cannot find file {json_path}"
            raise FindPathError(msg)
        try:
            material_id = int(inventory[_LEN_INVENTORY:])
            case_str = json_path.parent.parts[-1]
            case_id = int(case_str[_LEN_CASE:])
        except (ValueError, IndexError) as x:
            msg = f"Cannot define material_id and case_id from {json_path}"
            raise FindPathError(msg) from x
        if case_id != _case:
            msg = f"Contradicting values of case_id in case path and database: {case_id} != {_case}"
            raise FindPathError(msg)
        return material_id, case_id, json_path

    with futures.ThreadPoolExecutor(max_workers=threads) as executor:
        mcp_futures = [
            executor.submit(_find_path, arg)
            for arg in (
                (task_case[0], task_case[1], task)
                for task_case in task_list
                for task in task_case[2].split(",")
                if task.startswith("inventory-")
            )
        ]

    mips = [x.result() for x in futures.as_completed(mcp_futures)]
    mips.sort(key=lambda x: x[0:2])  # sort by material_id, case_id

    def _load_json(arg) -> None:
        collector, material_id, case_id, json_path = arg
        collector.append(from_json(json_path.read_text(encoding="utf8")), material_id, case_id)

    with futures.ThreadPoolExecutor(max_workers=threads) as executor:
        executor.map(_load_json, ((collector, *mip) for mip in mips))


collected = collector.get_result()

# save to parquet files

collected.save_to_parquets(Path.cwd() / "parquets")

# or use DuckDB database

import from xpypact.dao save
import duckdb as db

con = db.connect()
save(con, collected)

gamma_from_db = con.sql(
    """
    select
    g, rate
    from timestep_gamma
    where material_id = 1 and case_id = 54 and time_step_number = 7
    order by g
    """,
).fetchall()
Tests Coverage pre-commit linter & style

Just follow ordinary practice: