Skip to content

kdigo

This module contains the analysis class for processing AKI stages from time series data.

Analyser

Class for data analysis using probes and preprocessors.

This class provides functionality for analyzing data using a collection of probes and preprocessors. It processes the input data through the specified preprocessors and applies the probes to perform the analysis. The analysis results are returned as a DataFrame.

Parameters:

Name Type Description Default
data list[Dataset]

A list of Dataset objects containing the input data.

required
probes list[Probe]

A list of Probe objects representing the analysis probes to apply. If not provided, default probes including UrineOutputProbe, AbsoluteCreatinineProbe, and RelativeCreatinineProbe will be used.

None
preprocessors list[Preprocessor]

A list of Preprocessor objects representing the preprocessors to apply on the input data. If not provided, default preprocessors including UrineOutputPreProcessor, CreatininePreProcessor, and DemographicsPreProcessor will be used.

None
stay_identifier str

The column name in the input data representing the stay identifier.

"stay_id"
time_identifier str

The column name in the input data representing the time identifier.

"charttime"

Examples:

Instantiate the Analyser class with custom data, probes, and preprocessors

1
>>> analyser = Analyser(data=my_datasets, probes=[MyProbe()], preprocessors=[MyPreprocessor()])

Process stays and obtain the analysis results

1
>>> result_df = analyser.process_stays()

Source code in pyaki/kdigo.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
class Analyser:
    """
    Class for data analysis using probes and preprocessors.

    This class provides functionality for analyzing data using a collection of probes and preprocessors.
    It processes the input data through the specified preprocessors and applies the probes to perform
    the analysis. The analysis results are returned as a DataFrame.

    Parameters
    ----------
    data : list[Dataset]
        A list of Dataset objects containing the input data.
    probes : list[Probe], optional
        A list of Probe objects representing the analysis probes to apply. If not provided, default
        probes including UrineOutputProbe, AbsoluteCreatinineProbe, and RelativeCreatinineProbe will be used.
    preprocessors : list[Preprocessor], optional
        A list of Preprocessor objects representing the preprocessors to apply on the input data. If not
        provided, default preprocessors including UrineOutputPreProcessor, CreatininePreProcessor, and
        DemographicsPreProcessor will be used.
    stay_identifier : str, default: "stay_id"
        The column name in the input data representing the stay identifier.
    time_identifier : str, default: "charttime"
        The column name in the input data representing the time identifier.

    Examples
    --------
    Instantiate the Analyser class with custom data, probes, and preprocessors
    ```pycon
    >>> analyser = Analyser(data=my_datasets, probes=[MyProbe()], preprocessors=[MyPreprocessor()])
    ```

    Process stays and obtain the analysis results
    ```pycon
    >>> result_df = analyser.process_stays()
    ```
    """

    def __init__(
        self,
        data: list[Dataset],
        probes: Optional[list[Probe]] = None,
        preprocessors: Optional[list[Preprocessor]] = None,
        stay_identifier: str = "stay_id",
        time_identifier: str = "charttime",
    ) -> None:
        if probes is None:  # apply default probes if not provided
            probes = [
                UrineOutputProbe(),
                AbsoluteCreatinineProbe(),
                RelativeCreatinineProbe(),
                RRTProbe(),
            ]
        if preprocessors is None:  # apply default preprocessors if not provided
            preprocessors = [
                TimeIndexCreator(stay_identifier=stay_identifier, time_identifier=time_identifier),
                UrineOutputPreProcessor(stay_identifier=stay_identifier, time_identifier=time_identifier),
                CreatininePreProcessor(stay_identifier=stay_identifier, time_identifier=time_identifier),
                DemographicsPreProcessor(stay_identifier=stay_identifier),
                RRTPreProcessor(stay_identifier=stay_identifier, time_identifier=time_identifier),
            ]

        # validate datasets
        self.validate_data(data)

        # apply preprocessors to the input data
        logger.info("Start preprocessing")
        for preprocessor in preprocessors:
            data = preprocessor.process(data)

        logger.info("Finish preprocessing")

        self._data: list[Dataset] = data
        self._probes: list[Probe] = probes
        self._stay_identifier: str = stay_identifier

    def validate_data(self, datasets: list[Dataset]) -> None:
        """
        validate the input data for negative values.

        Parameters
        ----------
        datasets : list[Dataset]
            A list of Dataset objects containing the input data.

        Raises
        ------
        ValueError
            If any of the datasets contain negative values.
        """
        for dtype, df in datasets:
            try:
                if (df < 0).values.any():
                    raise ValueError(f"Dataset of Type {dtype} contains negative data")
            except TypeError:
                continue

    def process_stays(self) -> pd.DataFrame:
        """
        Process all stays in the input data.

        This method processes all stays in the input data by applying the configured probes.
        The analysis results for all stays are concatenated and returned as a single DataFrame.

        Returns
        -------
        pd.DataFrame
            The analysis results for all stays.
        """
        logger.info("Start probing")

        (_, df), *datasets = self._data
        stay_ids: pd.Index = df.index.get_level_values("stay_id").unique()
        for _, df in datasets:
            stay_ids.join(df.index.get_level_values("stay_id").unique())

        data: pd.DataFrame = self.process_stay(stay_ids.values[0])
        for stay_id in stay_ids.values[1:]:
            data = pd.concat([data, self.process_stay(stay_id)])

        logger.info("Finish probing")
        return data

    def process_stay(self, stay_id: str) -> pd.DataFrame:
        """
        Process a specific stay in the input data by patient identificator.

        This method processes a specific stay in the input data by applying the configured probes and preprocessors.
        The analysis results for the stay are returned as a DataFrame.

        Parameters
        ----------
        stay_id : str
            The identifier of the stay to process.

        Returns
        -------
        pd.DataFrame
            The analysis results for the specific stay.
        """
        logger.debug("Processing stay with id: %s", stay_id)

        datasets: list[Dataset] = [
            Dataset(dtype, data.loc[stay_id])  # type: ignore
            for dtype, data in self._data
            if stay_id in data.index
        ]

        for probe in self._probes:
            datasets = probe.probe(datasets)

        (_, df), *datasets = datasets
        for _, _df in datasets:
            if isinstance(_df, pd.Series):
                _df = pd.DataFrame([_df], index=df.index)
            columns = set(_df.columns) - set(df.columns)
            df = df.merge(_df[[*columns]], how="outer", left_index=True, right_index=True)

        df["stage"] = df.filter(like="stage").max(axis=1)
        return df.set_index(
            pd.MultiIndex.from_arrays(
                [[stay_id] * len(df), df.index.values],
                names=(self._stay_identifier, df.index.name),
            )
        )

process_stay(stay_id)

Process a specific stay in the input data by patient identificator.

This method processes a specific stay in the input data by applying the configured probes and preprocessors. The analysis results for the stay are returned as a DataFrame.

Parameters:

Name Type Description Default
stay_id str

The identifier of the stay to process.

required

Returns:

Type Description
DataFrame

The analysis results for the specific stay.

Source code in pyaki/kdigo.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def process_stay(self, stay_id: str) -> pd.DataFrame:
    """
    Process a specific stay in the input data by patient identificator.

    This method processes a specific stay in the input data by applying the configured probes and preprocessors.
    The analysis results for the stay are returned as a DataFrame.

    Parameters
    ----------
    stay_id : str
        The identifier of the stay to process.

    Returns
    -------
    pd.DataFrame
        The analysis results for the specific stay.
    """
    logger.debug("Processing stay with id: %s", stay_id)

    datasets: list[Dataset] = [
        Dataset(dtype, data.loc[stay_id])  # type: ignore
        for dtype, data in self._data
        if stay_id in data.index
    ]

    for probe in self._probes:
        datasets = probe.probe(datasets)

    (_, df), *datasets = datasets
    for _, _df in datasets:
        if isinstance(_df, pd.Series):
            _df = pd.DataFrame([_df], index=df.index)
        columns = set(_df.columns) - set(df.columns)
        df = df.merge(_df[[*columns]], how="outer", left_index=True, right_index=True)

    df["stage"] = df.filter(like="stage").max(axis=1)
    return df.set_index(
        pd.MultiIndex.from_arrays(
            [[stay_id] * len(df), df.index.values],
            names=(self._stay_identifier, df.index.name),
        )
    )

process_stays()

Process all stays in the input data.

This method processes all stays in the input data by applying the configured probes. The analysis results for all stays are concatenated and returned as a single DataFrame.

Returns:

Type Description
DataFrame

The analysis results for all stays.

Source code in pyaki/kdigo.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def process_stays(self) -> pd.DataFrame:
    """
    Process all stays in the input data.

    This method processes all stays in the input data by applying the configured probes.
    The analysis results for all stays are concatenated and returned as a single DataFrame.

    Returns
    -------
    pd.DataFrame
        The analysis results for all stays.
    """
    logger.info("Start probing")

    (_, df), *datasets = self._data
    stay_ids: pd.Index = df.index.get_level_values("stay_id").unique()
    for _, df in datasets:
        stay_ids.join(df.index.get_level_values("stay_id").unique())

    data: pd.DataFrame = self.process_stay(stay_ids.values[0])
    for stay_id in stay_ids.values[1:]:
        data = pd.concat([data, self.process_stay(stay_id)])

    logger.info("Finish probing")
    return data

validate_data(datasets)

validate the input data for negative values.

Parameters:

Name Type Description Default
datasets list[Dataset]

A list of Dataset objects containing the input data.

required

Raises:

Type Description
ValueError

If any of the datasets contain negative values.

Source code in pyaki/kdigo.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def validate_data(self, datasets: list[Dataset]) -> None:
    """
    validate the input data for negative values.

    Parameters
    ----------
    datasets : list[Dataset]
        A list of Dataset objects containing the input data.

    Raises
    ------
    ValueError
        If any of the datasets contain negative values.
    """
    for dtype, df in datasets:
        try:
            if (df < 0).values.any():
                raise ValueError(f"Dataset of Type {dtype} contains negative data")
        except TypeError:
            continue