Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,26 @@ Profile a dataset and generate a Scan Report.
```python
from nuh_helper import generate_scan_report


csv_files = [
"patients.csv",
"visits.csv",
]

generate_scan_report(csv_files, min_cell_count=5)
```

### Excluding columns from profiling

Some columns (e.g. dates, free-text notes) are not useful in a scan report and can be excluded per file using `excluded_columns`:

```python
generate_scan_report(
csv_files,
excluded_columns={
"patients.csv": ["dob", "nhs_number"],
"visits.csv": ["visit_date"],
},
)
```

Keys are CSV filenames. Excluded columns still appear in the Field Overview and as column headers in the value sheets, but no values are collected or shown for them. Tables not listed in the dict are unaffected.
22 changes: 20 additions & 2 deletions nuh_helper/profile/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,33 @@ def generate_scan_report(
csv_files: list[str],
output_path: str = SCAN_REPORT_FILE_NAME,
min_cell_count: int = 1,
excluded_columns: dict[str, list[str]] | None = None,
) -> str:
"""Generate a WhiteRabbit-compatible Scan Report from CSV files.

Args:
csv_files: Paths to the CSV files to profile.
output_path: Path for the output Excel file.
min_cell_count: Minimum frequency for a value to appear in the report.
excluded_columns: Per-file columns to skip when collecting values. Keys
are CSV filenames (e.g. ``"patients.csv"``), values are lists of
column names. The columns still appear in the Field Overview and
value sheet headers, but no values are collected for them. For
example::

excluded_columns={"patients.csv": ["dob", "nhs_number"]}
"""
logger.info("Generating scan report for %d table(s)", len(csv_files))

tables = []

for csv_file in csv_files:
csv_file = Path(csv_file)
header = read_csv_header(csv_file.as_posix())
logger.info("Scanning '%s' (%d field(s))", csv_file.name, len(header))
fields = header
logger.info("Scanning '%s' (%d field(s))", csv_file.name, len(fields))
tables.append(
{"name": csv_file.name, "path": csv_file.as_posix(), "fields": header}
{"name": csv_file.name, "path": csv_file.as_posix(), "fields": fields}
)

tables.sort(key=lambda t: t["name"])
Expand Down Expand Up @@ -112,6 +128,8 @@ def generate_scan_report(
for table in tables:
table_name_indexed = indexed_names[table["name"]]
value_data, row_count = scan_csv_values(table["path"], min_cell_count)
for col in (excluded_columns or {}).get(table["name"], []):
value_data[col] = []
table_value_data[table_name_indexed] = value_data

table_sheet.append(
Expand Down
141 changes: 141 additions & 0 deletions tests/test_profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import csv
from pathlib import Path

import openpyxl
import pytest

from nuh_helper import generate_scan_report


@pytest.fixture
def simple_csv(tmp_path: Path) -> Path:
"""CSV with columns: id, name, dob, score."""
path = tmp_path / "patients.csv"
with open(path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["id", "name", "dob", "score"])
writer.writerow(["1", "Alice", "1980-01-01", "10"])
writer.writerow(["2", "Bob", "1990-06-15", "20"])
return path


@pytest.fixture
def second_csv(tmp_path: Path) -> Path:
"""CSV with columns: visit_id, patient_id, visit_date, result."""
path = tmp_path / "visits.csv"
with open(path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["visit_id", "patient_id", "visit_date", "result"])
writer.writerow(["101", "1", "2024-01-10", "normal"])
writer.writerow(["102", "2", "2024-02-20", "abnormal"])
return path


def field_overview_fields(wb: openpyxl.Workbook, table_name: str) -> list[str]:
ws = wb["Field Overview"]
return [
row[1].value
for row in ws.iter_rows(min_row=2)
if row[0].value == table_name and row[1].value
]


def value_sheet_columns(wb: openpyxl.Workbook, sheet_name: str) -> list[str]:
ws = wb[sheet_name]
headers = [cell.value for cell in next(ws.iter_rows(min_row=1, max_row=1))]
return [headers[i] for i in range(0, len(headers), 2)]


def value_sheet_data(
wb: openpyxl.Workbook, sheet_name: str, column: str
) -> list[str]:
"""Return the values listed under a given column in a value sheet."""
ws = wb[sheet_name]
rows = list(ws.iter_rows(values_only=True))
header = rows[0]
col_index = next(i for i, v in enumerate(header) if v == column)
return [row[col_index] for row in rows[1:] if row[col_index] not in (None, "")]


def test_no_excluded_columns(simple_csv: Path, tmp_path: Path) -> None:
out = tmp_path / "report.xlsx"
generate_scan_report([str(simple_csv)], output_path=str(out))
wb = openpyxl.load_workbook(out)
assert field_overview_fields(wb, "patients.csv") == ["id", "name", "dob", "score"]
assert value_sheet_columns(wb, "patients.csv") == ["id", "name", "dob", "score"]


def test_excluded_column_still_in_field_overview(
simple_csv: Path, tmp_path: Path
) -> None:
out = tmp_path / "report.xlsx"
generate_scan_report(
[str(simple_csv)],
output_path=str(out),
excluded_columns={"patients.csv": ["dob"]},
)
wb = openpyxl.load_workbook(out)
assert "dob" in field_overview_fields(wb, "patients.csv")


def test_excluded_column_still_in_value_sheet_header(
simple_csv: Path, tmp_path: Path
) -> None:
out = tmp_path / "report.xlsx"
generate_scan_report(
[str(simple_csv)],
output_path=str(out),
excluded_columns={"patients.csv": ["dob"]},
)
wb = openpyxl.load_workbook(out)
assert "dob" in value_sheet_columns(wb, "patients.csv")


def test_excluded_column_has_no_values(simple_csv: Path, tmp_path: Path) -> None:
out = tmp_path / "report.xlsx"
generate_scan_report(
[str(simple_csv)],
output_path=str(out),
excluded_columns={"patients.csv": ["dob"]},
)
wb = openpyxl.load_workbook(out)
assert value_sheet_data(wb, "patients.csv", "dob") == []


def test_non_excluded_column_still_has_values(simple_csv: Path, tmp_path: Path) -> None:
out = tmp_path / "report.xlsx"
generate_scan_report(
[str(simple_csv)],
output_path=str(out),
excluded_columns={"patients.csv": ["dob"]},
)
wb = openpyxl.load_workbook(out)
assert value_sheet_data(wb, "patients.csv", "name") != []


def test_exclusions_are_per_table(
simple_csv: Path, second_csv: Path, tmp_path: Path
) -> None:
"""Exclusions on one table must not affect another."""
out = tmp_path / "report.xlsx"
generate_scan_report(
[str(simple_csv), str(second_csv)],
output_path=str(out),
excluded_columns={"patients.csv": ["dob"]},
)
wb = openpyxl.load_workbook(out)
assert value_sheet_data(wb, "patients.csv", "dob") == []
assert value_sheet_data(wb, "visits.csv", "visit_date") != []


def test_excluded_nonexistent_column_is_ignored(
simple_csv: Path, tmp_path: Path
) -> None:
out = tmp_path / "report.xlsx"
generate_scan_report(
[str(simple_csv)],
output_path=str(out),
excluded_columns={"patients.csv": ["nonexistent"]},
)
wb = openpyxl.load_workbook(out)
assert value_sheet_data(wb, "patients.csv", "name") != []
Loading