Skip to content

preprocessors

This module contains the preprocessors used in the pyaki package.

CreatininePreProcessor

Bases: Preprocessor

Preprocessor for processing the creatinine dataset.

Parameters:

Name Type Description Default
stay_identifier str

The column name that identifies stays or admissions in the dataset.

"stay_id"
time_identifier str

The column name that identifies the timestamp or time variable in the dataset.

"charttime"
creatinine_column str

The column name that represents the creatinine values in the dataset.

"creat"
ffill bool

Flag indicating whether to perform forward filling on missing values.

True
threshold int

The threshold value for limiting the forward filling range.

72
Source code in pyaki/preprocessors.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
class CreatininePreProcessor(Preprocessor):
    """
    Preprocessor for processing the creatinine dataset.

    Parameters
    ----------
    stay_identifier : str, default: "stay_id"
        The column name that identifies stays or admissions in the dataset.
    time_identifier : str, default: "charttime"
        The column name that identifies the timestamp or time variable in the dataset.
    creatinine_column : str, default: "creat"
        The column name that represents the creatinine values in the dataset.
    ffill : bool, default: True
        Flag indicating whether to perform forward filling on missing values.
    threshold : int, default: 72
        The threshold value for limiting the forward filling range.
    """

    def __init__(
        self,
        stay_identifier: str = "stay_id",
        time_identifier: str = "charttime",
        creatinine_column: str = "creat",
        ffill: bool = True,
        threshold: int = 72,
    ) -> None:
        super().__init__(stay_identifier, time_identifier)

        self._ffill: bool = ffill
        self._threshold: Optional[int] = threshold
        self._creatinine_column: str = creatinine_column

    @dataset_as_df(df=DatasetType.CREATININE)
    @df_to_dataset(DatasetType.CREATININE)
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Process the creatinine dataset by resampling and performing forward filling on missing values.

        Parameters
        ----------
        df : pd.DataFrame
            The input creatinine dataset as a pandas DataFrame.

        Returns
        -------
        pd.DataFrame
            The processed creatinine dataset as a pandas DataFrame.
        """
        df = df.groupby(self._stay_identifier).resample("1h").mean()  # type: ignore
        if not self._ffill:
            return df

        df[df[self._creatinine_column] == 0] = None
        return df.ffill(limit=self._threshold)

process(df)

Process the creatinine dataset by resampling and performing forward filling on missing values.

Parameters:

Name Type Description Default
df DataFrame

The input creatinine dataset as a pandas DataFrame.

required

Returns:

Type Description
DataFrame

The processed creatinine dataset as a pandas DataFrame.

Source code in pyaki/preprocessors.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
@dataset_as_df(df=DatasetType.CREATININE)
@df_to_dataset(DatasetType.CREATININE)
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Process the creatinine dataset by resampling and performing forward filling on missing values.

    Parameters
    ----------
    df : pd.DataFrame
        The input creatinine dataset as a pandas DataFrame.

    Returns
    -------
    pd.DataFrame
        The processed creatinine dataset as a pandas DataFrame.
    """
    df = df.groupby(self._stay_identifier).resample("1h").mean()  # type: ignore
    if not self._ffill:
        return df

    df[df[self._creatinine_column] == 0] = None
    return df.ffill(limit=self._threshold)

DemographicsPreProcessor

Bases: Preprocessor

Preprocessor for processing the demographics dataset.

Source code in pyaki/preprocessors.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class DemographicsPreProcessor(Preprocessor):
    """Preprocessor for processing the demographics dataset."""

    @dataset_as_df(df=DatasetType.DEMOGRAPHICS)
    @df_to_dataset(DatasetType.DEMOGRAPHICS)
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Process the demographics dataset by aggregating the data based on stay identifiers.

        Parameters
        ----------
        df : pd.DataFrame
            The input demographics dataset as a pandas DataFrame.

        Returns
        -------
        pd.DataFrame
            The processed demographics dataset as a pandas DataFrame.
        """
        return df.groupby(self._stay_identifier).last()

process(df)

Process the demographics dataset by aggregating the data based on stay identifiers.

Parameters:

Name Type Description Default
df DataFrame

The input demographics dataset as a pandas DataFrame.

required

Returns:

Type Description
DataFrame

The processed demographics dataset as a pandas DataFrame.

Source code in pyaki/preprocessors.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
@dataset_as_df(df=DatasetType.DEMOGRAPHICS)
@df_to_dataset(DatasetType.DEMOGRAPHICS)
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Process the demographics dataset by aggregating the data based on stay identifiers.

    Parameters
    ----------
    df : pd.DataFrame
        The input demographics dataset as a pandas DataFrame.

    Returns
    -------
    pd.DataFrame
        The processed demographics dataset as a pandas DataFrame.
    """
    return df.groupby(self._stay_identifier).last()

Preprocessor

Bases: ABC

Abstract base class for preprocessors.

Parameters:

Name Type Description Default
stay_identifier str

The column name that identifies stays or admissions in the dataset.

"stay_id"
time_identifier str

The column name that identifies the timestamp or time variable in the dataset.

"charttime"
Source code in pyaki/preprocessors.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Preprocessor(ABC):
    """
    Abstract base class for preprocessors.

    Parameters
    ----------
    stay_identifier : str, default: "stay_id"
        The column name that identifies stays or admissions in the dataset.
    time_identifier : str, default: "charttime"
        The column name that identifies the timestamp or time variable in the dataset.
    """

    def __init__(self, stay_identifier: str = "stay_id", time_identifier: str = "charttime") -> None:
        super().__init__()

        self._stay_identifier: str = stay_identifier
        self._time_identifier: str = time_identifier

    def process(self, datasets: list[Dataset]) -> list[Dataset]:
        """
        Process the given list of datasets and return the processed datasets.

        Parameters
        ----------
        datasets : list[Dataset]
            The list of datasets to be processed.

        Returns
        -------
        list[Dataset]
            The processed datasets.
        """
        raise NotImplementedError()

process(datasets)

Process the given list of datasets and return the processed datasets.

Parameters:

Name Type Description Default
datasets list[Dataset]

The list of datasets to be processed.

required

Returns:

Type Description
list[Dataset]

The processed datasets.

Source code in pyaki/preprocessors.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def process(self, datasets: list[Dataset]) -> list[Dataset]:
    """
    Process the given list of datasets and return the processed datasets.

    Parameters
    ----------
    datasets : list[Dataset]
        The list of datasets to be processed.

    Returns
    -------
    list[Dataset]
        The processed datasets.
    """
    raise NotImplementedError()

RRTPreProcessor

Bases: Preprocessor

Preprocessor for processing the RRT dataset.

Source code in pyaki/preprocessors.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
class RRTPreProcessor(Preprocessor):
    """Preprocessor for processing the RRT dataset."""

    @dataset_as_df(df=DatasetType.RRT)
    @df_to_dataset(DatasetType.RRT)
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Process the RRT dataset by upsampling the data and forward filling the last value. We expect the dataframe to contain a 1 for RRT in progress, and 0 for RRT not in progress.

        Parameters
        ----------
        df : pd.DataFrame
            The input RRT dataset as a pandas DataFrame.

        Returns
        -------
        pd.DataFrame
            The processed RRT dataset as a pandas DataFrame.
        """
        df = df.groupby(self._stay_identifier).resample("1h").last()  # type: ignore
        return df.ffill()

process(df)

Process the RRT dataset by upsampling the data and forward filling the last value. We expect the dataframe to contain a 1 for RRT in progress, and 0 for RRT not in progress.

Parameters:

Name Type Description Default
df DataFrame

The input RRT dataset as a pandas DataFrame.

required

Returns:

Type Description
DataFrame

The processed RRT dataset as a pandas DataFrame.

Source code in pyaki/preprocessors.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
@dataset_as_df(df=DatasetType.RRT)
@df_to_dataset(DatasetType.RRT)
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Process the RRT dataset by upsampling the data and forward filling the last value. We expect the dataframe to contain a 1 for RRT in progress, and 0 for RRT not in progress.

    Parameters
    ----------
    df : pd.DataFrame
        The input RRT dataset as a pandas DataFrame.

    Returns
    -------
    pd.DataFrame
        The processed RRT dataset as a pandas DataFrame.
    """
    df = df.groupby(self._stay_identifier).resample("1h").last()  # type: ignore
    return df.ffill()

TimeIndexCreator

Bases: Preprocessor

Preprocessor for creating a time index in the datasets.

Attributes:

Name Type Description
DATASETS list[DatasetType]

The list of dataset types that require a time index.

Source code in pyaki/preprocessors.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class TimeIndexCreator(Preprocessor):
    """
    Preprocessor for creating a time index in the datasets.

    Attributes
    ----------
    DATASETS : list[DatasetType]
        The list of dataset types that require a time index.
    """

    DATASETS: list[DatasetType] = [
        DatasetType.CREATININE,
        DatasetType.URINEOUTPUT,
        DatasetType.RRT,
    ]

    def process(self, datasets: list[Dataset]) -> list[Dataset]:
        """
        Process the datasets by creating a time index if the dataset type requires it.

        Parameters
        ----------
        datasets : list[Dataset]
            The list of datasets to be processed.

        Returns
        -------
        list[Dataset]
            The processed datasets.
        """
        _datasets = []
        for dtype, df in datasets:
            if dtype not in self.DATASETS or self._time_identifier not in df.columns:
                _datasets.append(Dataset(dtype, df))
                continue

            if not is_datetime64_any_dtype(df[self._time_identifier]):
                df[self._time_identifier] = pd.to_datetime(df[self._time_identifier])

            _datasets.append(Dataset(dtype, df.set_index(self._time_identifier)))

        return _datasets

process(datasets)

Process the datasets by creating a time index if the dataset type requires it.

Parameters:

Name Type Description Default
datasets list[Dataset]

The list of datasets to be processed.

required

Returns:

Type Description
list[Dataset]

The processed datasets.

Source code in pyaki/preprocessors.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def process(self, datasets: list[Dataset]) -> list[Dataset]:
    """
    Process the datasets by creating a time index if the dataset type requires it.

    Parameters
    ----------
    datasets : list[Dataset]
        The list of datasets to be processed.

    Returns
    -------
    list[Dataset]
        The processed datasets.
    """
    _datasets = []
    for dtype, df in datasets:
        if dtype not in self.DATASETS or self._time_identifier not in df.columns:
            _datasets.append(Dataset(dtype, df))
            continue

        if not is_datetime64_any_dtype(df[self._time_identifier]):
            df[self._time_identifier] = pd.to_datetime(df[self._time_identifier])

        _datasets.append(Dataset(dtype, df.set_index(self._time_identifier)))

    return _datasets

UrineOutputPreProcessor

Bases: Preprocessor

Preprocessor for processing the urine output dataset.

Parameters:

Name Type Description Default
stay_identifier str

The column name that identifies stays or admissions in the dataset.

"stay_id"
time_identifier str

The column name that identifies the timestamp or time variable in the dataset.

"charttime"
urineoutput_column str

The column name that represents the urine output values in the dataset.

"urineoutput"
interpolate bool

Flag indicating whether to perform interpolation on missing values.

True
threshold int

The threshold value for limiting the interpolation range.

6
Source code in pyaki/preprocessors.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class UrineOutputPreProcessor(Preprocessor):
    """
    Preprocessor for processing the urine output dataset.

    Parameters
    ----------
    stay_identifier : str, default: "stay_id"
        The column name that identifies stays or admissions in the dataset.
    time_identifier : str, default: "charttime"
        The column name that identifies the timestamp or time variable in the dataset.
    urineoutput_column : str, default: "urineoutput"
        The column name that represents the urine output values in the dataset.
    interpolate : bool, default: True
        Flag indicating whether to perform interpolation on missing values.
    threshold : int, default: 6
        The threshold value for limiting the interpolation range.
    """

    def __init__(
        self,
        stay_identifier: str = "stay_id",
        time_identifier: str = "charttime",
        urineoutput_column: str = "urineoutput",
        interpolate: bool = True,
        threshold: int = 6,
    ) -> None:
        super().__init__(stay_identifier, time_identifier)
        self._interpolate: bool = interpolate
        self._threshold: int = threshold
        self._urineoutput_column: str = urineoutput_column

    @dataset_as_df(df=DatasetType.URINEOUTPUT)
    @df_to_dataset(DatasetType.URINEOUTPUT)
    def process(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        Process the urine output dataset by resampling, interpolating missing values, and applying threshold-based adjustments.

        Parameters
        ----------
        df : pd.DataFrame
            The input urine output dataset as a pandas DataFrame.

        Returns
        -------
        pd.DataFrame
            The processed urine output dataset as a pandas DataFrame.
        """

        df = df.groupby(self._stay_identifier).resample("1h").sum()  # type: ignore
        df[df[self._urineoutput_column] == 0] = None

        if not self._interpolate:
            return df

        mask = df[self._urineoutput_column].isnull()
        df[self._urineoutput_column] /= (
            (mask.cumsum() - mask.cumsum().where(~mask).ffill().fillna(0))
            .shift(1)
            .clip(upper=self._threshold)
            .add(1)
            .fillna(1)
        )
        return df.bfill(limit=self._threshold)

process(df)

Process the urine output dataset by resampling, interpolating missing values, and applying threshold-based adjustments.

Parameters:

Name Type Description Default
df DataFrame

The input urine output dataset as a pandas DataFrame.

required

Returns:

Type Description
DataFrame

The processed urine output dataset as a pandas DataFrame.

Source code in pyaki/preprocessors.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@dataset_as_df(df=DatasetType.URINEOUTPUT)
@df_to_dataset(DatasetType.URINEOUTPUT)
def process(self, df: pd.DataFrame) -> pd.DataFrame:
    """
    Process the urine output dataset by resampling, interpolating missing values, and applying threshold-based adjustments.

    Parameters
    ----------
    df : pd.DataFrame
        The input urine output dataset as a pandas DataFrame.

    Returns
    -------
    pd.DataFrame
        The processed urine output dataset as a pandas DataFrame.
    """

    df = df.groupby(self._stay_identifier).resample("1h").sum()  # type: ignore
    df[df[self._urineoutput_column] == 0] = None

    if not self._interpolate:
        return df

    mask = df[self._urineoutput_column].isnull()
    df[self._urineoutput_column] /= (
        (mask.cumsum() - mask.cumsum().where(~mask).ffill().fillna(0))
        .shift(1)
        .clip(upper=self._threshold)
        .add(1)
        .fillna(1)
    )
    return df.bfill(limit=self._threshold)