Skip to content

preprocessors

This module contains the preprocessors used in the pyaki package.

CreatininePreProcessor

Bases: Preprocessor

Preprocessor for processing the creatinine dataset.

Parameters:

Name Type Description Default
stay_identifier str

The column name that identifies stays or admissions in the dataset.

"stay_id"
time_identifier str

The column name that identifies the timestamp or time variable in the dataset.

"charttime"
ffill bool

Flag indicating whether to perform forward filling on missing values.

True
threshold int

The threshold value for limiting the forward filling range.

72
Source code in pyaki/preprocessors.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class CreatininePreProcessor(Preprocessor):
    """
    Preprocessor for processing the creatinine dataset.

    Parameters
    ----------
    stay_identifier : str, default: "stay_id"
        The column name that identifies stays or admissions in the dataset.
    time_identifier : str, default: "charttime"
        The column name that identifies the timestamp or time variable in the dataset.
    ffill : bool, default: True
        Flag indicating whether to perform forward filling on missing values.
    threshold : int, default: 72
        The threshold value for limiting the forward filling range.
    """

    def __init__(
        self,
        stay_identifier: str = "stay_id",
        time_identifier: str = "charttime",
        ffill: bool = True,
        threshold: int = 72,
    ) -> None:
        super().__init__(stay_identifier, time_identifier)

        self._ffill: bool = ffill
        self._threshold: Optional[int] = threshold

    @dataset_as_df(df=DatasetType.CREATININE)
    @df_to_dataset(DatasetType.CREATININE)
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Process the creatinine dataset by resampling and performing forward filling on missing values.

        Parameters
        ----------
        df : pd.DataFrame
            The input creatinine dataset as a pandas DataFrame.

        Returns
        -------
        pd.DataFrame
            The processed creatinine dataset as a pandas DataFrame.
        """
        df = df.groupby(self._stay_identifier).resample("1h").mean()  # type: ignore
        if not self._ffill:
            return df

        df[df["creat"] == 0] = None
        return df.ffill(limit=self._threshold)

process(df)

Process the creatinine dataset by resampling and performing forward filling on missing values.

Parameters:

Name Type Description Default
df DataFrame

The input creatinine dataset as a pandas DataFrame.

required

Returns:

Type Description
DataFrame

The processed creatinine dataset as a pandas DataFrame.

Source code in pyaki/preprocessors.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
@dataset_as_df(df=DatasetType.CREATININE)
@df_to_dataset(DatasetType.CREATININE)
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Process the creatinine dataset by resampling and performing forward filling on missing values.

    Parameters
    ----------
    df : pd.DataFrame
        The input creatinine dataset as a pandas DataFrame.

    Returns
    -------
    pd.DataFrame
        The processed creatinine dataset as a pandas DataFrame.
    """
    df = df.groupby(self._stay_identifier).resample("1h").mean()  # type: ignore
    if not self._ffill:
        return df

    df[df["creat"] == 0] = None
    return df.ffill(limit=self._threshold)

DemographicsPreProcessor

Bases: Preprocessor

Preprocessor for processing the demographics dataset.

Source code in pyaki/preprocessors.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
class DemographicsPreProcessor(Preprocessor):
    """Preprocessor for processing the demographics dataset."""

    @dataset_as_df(df=DatasetType.DEMOGRAPHICS)
    @df_to_dataset(DatasetType.DEMOGRAPHICS)
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Process the demographics dataset by aggregating the data based on stay identifiers.

        Parameters
        ----------
        df : pd.DataFrame
            The input demographics dataset as a pandas DataFrame.

        Returns
        -------
        pd.DataFrame
            The processed demographics dataset as a pandas DataFrame.
        """
        return df.groupby(self._stay_identifier).last()

process(df)

Process the demographics dataset by aggregating the data based on stay identifiers.

Parameters:

Name Type Description Default
df DataFrame

The input demographics dataset as a pandas DataFrame.

required

Returns:

Type Description
DataFrame

The processed demographics dataset as a pandas DataFrame.

Source code in pyaki/preprocessors.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@dataset_as_df(df=DatasetType.DEMOGRAPHICS)
@df_to_dataset(DatasetType.DEMOGRAPHICS)
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Process the demographics dataset by aggregating the data based on stay identifiers.

    Parameters
    ----------
    df : pd.DataFrame
        The input demographics dataset as a pandas DataFrame.

    Returns
    -------
    pd.DataFrame
        The processed demographics dataset as a pandas DataFrame.
    """
    return df.groupby(self._stay_identifier).last()

Preprocessor

Bases: ABC

Abstract base class for preprocessors.

Parameters:

Name Type Description Default
stay_identifier str

The column name that identifies stays or admissions in the dataset.

"stay_id"
time_identifier str

The column name that identifies the timestamp or time variable in the dataset.

"charttime"
Source code in pyaki/preprocessors.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Preprocessor(ABC):
    """
    Abstract base class for preprocessors.

    Parameters
    ----------
    stay_identifier : str, default: "stay_id"
        The column name that identifies stays or admissions in the dataset.
    time_identifier : str, default: "charttime"
        The column name that identifies the timestamp or time variable in the dataset.
    """

    def __init__(self, stay_identifier: str = "stay_id", time_identifier: str = "charttime") -> None:
        super().__init__()

        self._stay_identifier: str = stay_identifier
        self._time_identifier: str = time_identifier

    def process(self, datasets: list[Dataset]) -> list[Dataset]:
        """
        Process the given list of datasets and return the processed datasets.

        Parameters
        ----------
        datasets : list[Dataset]
            The list of datasets to be processed.

        Returns
        -------
        list[Dataset]
            The processed datasets.
        """
        raise NotImplementedError()

process(datasets)

Process the given list of datasets and return the processed datasets.

Parameters:

Name Type Description Default
datasets list[Dataset]

The list of datasets to be processed.

required

Returns:

Type Description
list[Dataset]

The processed datasets.

Source code in pyaki/preprocessors.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def process(self, datasets: list[Dataset]) -> list[Dataset]:
    """
    Process the given list of datasets and return the processed datasets.

    Parameters
    ----------
    datasets : list[Dataset]
        The list of datasets to be processed.

    Returns
    -------
    list[Dataset]
        The processed datasets.
    """
    raise NotImplementedError()

RRTPreProcessor

Bases: Preprocessor

Preprocessor for processing the RRT dataset.

Source code in pyaki/preprocessors.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
class RRTPreProcessor(Preprocessor):
    """Preprocessor for processing the RRT dataset."""

    @dataset_as_df(df=DatasetType.RRT)
    @df_to_dataset(DatasetType.RRT)
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Process the RRT dataset by upsampling the data and forward filling the last value. We expect the dataframe to contain a 1 for RRT in progress, and 0 for RRT not in progress.

        Parameters
        ----------
        df : pd.DataFrame
            The input RRT dataset as a pandas DataFrame.

        Returns
        -------
        pd.DataFrame
            The processed RRT dataset as a pandas DataFrame.
        """
        df = df.groupby(self._stay_identifier).resample("1h").last()  # type: ignore
        return df.ffill()

process(df)

Process the RRT dataset by upsampling the data and forward filling the last value. We expect the dataframe to contain a 1 for RRT in progress, and 0 for RRT not in progress.

Parameters:

Name Type Description Default
df DataFrame

The input RRT dataset as a pandas DataFrame.

required

Returns:

Type Description
DataFrame

The processed RRT dataset as a pandas DataFrame.

Source code in pyaki/preprocessors.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
@dataset_as_df(df=DatasetType.RRT)
@df_to_dataset(DatasetType.RRT)
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Process the RRT dataset by upsampling the data and forward filling the last value. We expect the dataframe to contain a 1 for RRT in progress, and 0 for RRT not in progress.

    Parameters
    ----------
    df : pd.DataFrame
        The input RRT dataset as a pandas DataFrame.

    Returns
    -------
    pd.DataFrame
        The processed RRT dataset as a pandas DataFrame.
    """
    df = df.groupby(self._stay_identifier).resample("1h").last()  # type: ignore
    return df.ffill()

TimeIndexCreator

Bases: Preprocessor

Preprocessor for creating a time index in the datasets.

Attributes:

Name Type Description
DATASETS list[DatasetType]

The list of dataset types that require a time index.

Source code in pyaki/preprocessors.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class TimeIndexCreator(Preprocessor):
    """
    Preprocessor for creating a time index in the datasets.

    Attributes
    ----------
    DATASETS : list[DatasetType]
        The list of dataset types that require a time index.
    """

    DATASETS: list[DatasetType] = [
        DatasetType.CREATININE,
        DatasetType.URINEOUTPUT,
        DatasetType.RRT,
    ]

    def process(self, datasets: list[Dataset]) -> list[Dataset]:
        """
        Process the datasets by creating a time index if the dataset type requires it.

        Parameters
        ----------
        datasets : list[Dataset]
            The list of datasets to be processed.

        Returns
        -------
        list[Dataset]
            The processed datasets.
        """
        _datasets = []
        for dtype, df in datasets:
            if dtype not in self.DATASETS or self._time_identifier not in df.columns:
                _datasets.append(Dataset(dtype, df))
                continue

            if not is_datetime64_any_dtype(df[self._time_identifier]):
                df[self._time_identifier] = pd.to_datetime(df[self._time_identifier])

            _datasets.append(Dataset(dtype, df.set_index(self._time_identifier)))

        return _datasets

process(datasets)

Process the datasets by creating a time index if the dataset type requires it.

Parameters:

Name Type Description Default
datasets list[Dataset]

The list of datasets to be processed.

required

Returns:

Type Description
list[Dataset]

The processed datasets.

Source code in pyaki/preprocessors.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def process(self, datasets: list[Dataset]) -> list[Dataset]:
    """
    Process the datasets by creating a time index if the dataset type requires it.

    Parameters
    ----------
    datasets : list[Dataset]
        The list of datasets to be processed.

    Returns
    -------
    list[Dataset]
        The processed datasets.
    """
    _datasets = []
    for dtype, df in datasets:
        if dtype not in self.DATASETS or self._time_identifier not in df.columns:
            _datasets.append(Dataset(dtype, df))
            continue

        if not is_datetime64_any_dtype(df[self._time_identifier]):
            df[self._time_identifier] = pd.to_datetime(df[self._time_identifier])

        _datasets.append(Dataset(dtype, df.set_index(self._time_identifier)))

    return _datasets

UrineOutputPreProcessor

Bases: Preprocessor

Preprocessor for processing the urine output dataset.

Parameters:

Name Type Description Default
stay_identifier str

The column name that identifies stays or admissions in the dataset.

"stay_id"
time_identifier str

The column name that identifies the timestamp or time variable in the dataset.

"charttime"
interpolate bool

Flag indicating whether to perform interpolation on missing values.

True
threshold int

The threshold value for limiting the interpolation range.

6
Source code in pyaki/preprocessors.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class UrineOutputPreProcessor(Preprocessor):
    """
    Preprocessor for processing the urine output dataset.

    Parameters
    ----------
    stay_identifier : str, default: "stay_id"
        The column name that identifies stays or admissions in the dataset.
    time_identifier : str, default: "charttime"
        The column name that identifies the timestamp or time variable in the dataset.
    interpolate : bool, default: True
        Flag indicating whether to perform interpolation on missing values.
    threshold : int, default: 6
        The threshold value for limiting the interpolation range.
    """

    def __init__(
        self,
        stay_identifier: str = "stay_id",
        time_identifier: str = "charttime",
        interpolate: bool = True,
        threshold: int = 6,
    ) -> None:
        super().__init__(stay_identifier, time_identifier)
        self._interpolate: bool = interpolate
        self._threshold: int = threshold

    @dataset_as_df(df=DatasetType.URINEOUTPUT)
    @df_to_dataset(DatasetType.URINEOUTPUT)
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Process the urine output dataset by resampling, interpolating missing values, and applying threshold-based adjustments.

        Parameters
        ----------
        df : pd.DataFrame
            The input urine output dataset as a pandas DataFrame.

        Returns
        -------
        pd.DataFrame
            The processed urine output dataset as a pandas DataFrame.
        """

        df = df.groupby(self._stay_identifier).resample("1h").sum()  # type: ignore
        df[df["urineoutput"] == 0] = None

        if not self._interpolate:
            return df

        mask = df["urineoutput"].isnull()
        df["urineoutput"] /= (
            (mask.cumsum() - mask.cumsum().where(~mask).ffill().fillna(0))
            .shift(1)
            .clip(upper=self._threshold)
            .add(1)
            .fillna(1)
        )
        return df.bfill(limit=self._threshold)

process(df)

Process the urine output dataset by resampling, interpolating missing values, and applying threshold-based adjustments.

Parameters:

Name Type Description Default
df DataFrame

The input urine output dataset as a pandas DataFrame.

required

Returns:

Type Description
DataFrame

The processed urine output dataset as a pandas DataFrame.

Source code in pyaki/preprocessors.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@dataset_as_df(df=DatasetType.URINEOUTPUT)
@df_to_dataset(DatasetType.URINEOUTPUT)
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Process the urine output dataset by resampling, interpolating missing values, and applying threshold-based adjustments.

    Parameters
    ----------
    df : pd.DataFrame
        The input urine output dataset as a pandas DataFrame.

    Returns
    -------
    pd.DataFrame
        The processed urine output dataset as a pandas DataFrame.
    """

    df = df.groupby(self._stay_identifier).resample("1h").sum()  # type: ignore
    df[df["urineoutput"] == 0] = None

    if not self._interpolate:
        return df

    mask = df["urineoutput"].isnull()
    df["urineoutput"] /= (
        (mask.cumsum() - mask.cumsum().where(~mask).ffill().fillna(0))
        .shift(1)
        .clip(upper=self._threshold)
        .add(1)
        .fillna(1)
    )
    return df.bfill(limit=self._threshold)