Source code for lomas_server.dp_queries.dp_libraries.diffprivlib

import pickle
import warnings
from base64 import b64encode
from typing import Dict

import pandas as pd
from diffprivlib.utils import PrivacyLeakWarning
from diffprivlib_logger import deserialise_pipeline
from sklearn.pipeline import Pipeline

from constants import DPLibraries
from dp_queries.dp_libraries.diffprivlib_utils import (
    handle_missing_data,
    split_train_test_data,
)
from dp_queries.dp_querier import DPQuerier
from utils.error_handler import ExternalLibraryException
from utils.input_models import DiffPrivLibInp


[docs] class DiffPrivLibQuerier(DPQuerier): """ Concrete implementation of the DPQuerier ABC for the DiffPrivLib library. """
[docs] def fit_model_on_data( self, query_json: DiffPrivLibInp ) -> tuple[Pipeline, pd.DataFrame, pd.DataFrame]: """Perform necessary steps to fit the model on the data Args: query_json (BaseModel): The JSON request object for the query. Raises: ExternalLibraryException: For exceptions from libraries external to this package. Returns: fitted_dpl_pipeline (dpl model): the fitted model on the training data x_test (pd.DataFrame): test data feature y_test (pd.DataFrame): test data target """ # Prepare data raw_data = self.private_dataset.get_pandas_df() data = handle_missing_data(raw_data, query_json.imputer_strategy) x_train, x_test, y_train, y_test = split_train_test_data( data, query_json ) # Prepare DiffPrivLib pipeline dpl_pipeline = deserialise_pipeline(query_json.diffprivlib_json) # Fit the pipeline on the training set warnings.simplefilter("error", PrivacyLeakWarning) try: fitted_dpl_pipeline = dpl_pipeline.fit(x_train, y_train) except PrivacyLeakWarning as e: raise ExternalLibraryException( DPLibraries.DIFFPRIVLIB, f"PrivacyLeakWarning: {e}. " + "Lomas server cannot fit pipeline on data, " + "PrivacyLeakWarning is a blocker.", ) from e except Exception as e: raise ExternalLibraryException( DPLibraries.DIFFPRIVLIB, f"Cannot fit pipeline on data because {e}", ) from e return fitted_dpl_pipeline, x_test, y_test
[docs] def cost(self, query_json: DiffPrivLibInp) -> tuple[float, float]: """Estimate cost of query Args: query_json (BaseModel): The JSON request object for the query. Raises: ExternalLibraryException: For exceptions from libraries external to this package. Returns: tuple[float, float]: The tuple of costs, the first value is the epsilon cost, the second value is the delta value. """ fitted_dpl_pipeline, _, _ = self.fit_model_on_data(query_json) # Compute budget spent_epsilon = 0.0 spent_delta = 0.0 for step in fitted_dpl_pipeline.steps: spent_epsilon += step[1].accountant.spent_budget[0][0] spent_delta += step[1].accountant.spent_budget[0][1] return spent_epsilon, spent_delta
[docs] def query(self, query_json: DiffPrivLibInp) -> Dict: """Perform the query and return the response. Args: query_json (BaseModel): The JSON request object for the query. Raises: ExternalLibraryException: For exceptions from libraries external to this package. InvalidQueryException: If the budget values are too small to perform the query. Returns: dict: The dictionary encoding of the resulting pd.DataFrame. """ fitted_dpl_pipeline, x_test, y_test = self.fit_model_on_data( query_json ) # Model accuracy score = fitted_dpl_pipeline.score(x_test, y_test) # Serialise model pickled_model = b64encode(pickle.dumps(fitted_dpl_pipeline)) query_response = { "score": score, "model": pickled_model.decode("utf-8"), } return query_response