import warnings
from typing import Dict, Optional
import pandas as pd
from diffprivlib.utils import PrivacyLeakWarning
from diffprivlib_logger import deserialise_pipeline
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from lomas_server.admin_database.admin_database import AdminDatabase
from lomas_server.constants import DPLibraries
from lomas_server.data_connector.data_connector import DataConnector
from lomas_server.dp_queries.dp_libraries.utils import (
handle_missing_data,
serialise_model,
)
from lomas_server.dp_queries.dp_querier import DPQuerier
from lomas_server.utils.error_handler import (
ExternalLibraryException,
InternalServerException,
)
from lomas_server.utils.query_models import (
DiffPrivLibQueryModel,
DiffPrivLibRequestModel,
)
[docs]
class DiffPrivLibQuerier(
DPQuerier[DiffPrivLibRequestModel, DiffPrivLibQueryModel]
):
"""
Concrete implementation of the DPQuerier ABC for the DiffPrivLib library.
"""
def __init__(
self,
data_connector: DataConnector,
admin_database: AdminDatabase,
) -> None:
super().__init__(data_connector, admin_database)
self.dpl_pipeline: Optional[Pipeline] = None
self.x_test: Optional[pd.DataFrame] = None
self.y_test: Optional[pd.DataFrame] = None
[docs]
def fit_model_on_data(
self, query_json: DiffPrivLibRequestModel
) -> tuple[Pipeline, pd.DataFrame, pd.DataFrame]:
"""Perform necessary steps to fit the model on the data
Args:
query_json (BaseModel): The JSON request object for the query.
Raises:
ExternalLibraryException: For exceptions from libraries
external to this package.
Returns:
dpl_pipeline (dpl model): the fitted model on the training data
x_test (pd.DataFrame): test data feature
y_test (pd.DataFrame): test data target
"""
# Prepare data
raw_data = self.data_connector.get_pandas_df()
data = handle_missing_data(raw_data, query_json.imputer_strategy)
x_train, x_test, y_train, y_test = split_train_test_data(
data, query_json
)
# Prepare DiffPrivLib pipeline
dpl_pipeline = deserialise_pipeline(query_json.diffprivlib_json)
# Fit the pipeline on the training set
warnings.simplefilter("error", PrivacyLeakWarning)
try:
if y_train is not None:
y_train = y_train.values.ravel()
dpl_pipeline = dpl_pipeline.fit(x_train, y_train)
except PrivacyLeakWarning as e:
raise ExternalLibraryException(
DPLibraries.DIFFPRIVLIB,
f"PrivacyLeakWarning: {e}. "
+ "Lomas server cannot fit pipeline on data, "
+ "PrivacyLeakWarning is a blocker.",
) from e
except Exception as e:
raise ExternalLibraryException(
DPLibraries.DIFFPRIVLIB,
f"Cannot fit pipeline on data because {e}",
) from e
return dpl_pipeline, x_test, y_test
[docs]
def cost(self, query_json: DiffPrivLibRequestModel) -> tuple[float, float]:
"""Estimate cost of query
Args:
query_json (DiffPrivLibRequestModel): The request model object.
Raises:
ExternalLibraryException: For exceptions from libraries
external to this package.
Returns:
tuple[float, float]: The tuple of costs, the first value
is the epsilon cost, the second value is the delta value.
"""
self.dpl_pipeline, self.x_test, self.y_test = self.fit_model_on_data(
query_json
)
# Compute budget
spent_epsilon = 0.0
spent_delta = 0.0
for step in self.dpl_pipeline.steps:
spent_epsilon += step[1].accountant.spent_budget[0][0]
spent_delta += step[1].accountant.spent_budget[0][1]
return spent_epsilon, spent_delta
[docs]
def query(
self,
query_json: DiffPrivLibQueryModel, # pylint: disable=unused-argument
) -> Dict:
"""Perform the query and return the response.
Args:
query_json (DiffPrivLibQueryModel): The request model object.
Raises:
ExternalLibraryException: For exceptions from libraries
external to this package.
InvalidQueryException: If the budget values are too small to
perform the query.
Returns:
dict: The dictionary encoding of the resulting pd.DataFrame.
"""
if self.dpl_pipeline is None:
raise InternalServerException(
"DiffPrivLib `query` method called before `cost` method"
)
# Model accuracy
score = self.dpl_pipeline.score(self.x_test, self.y_test)
# Serialise model
query_response = {
"score": score,
"model": serialise_model(self.dpl_pipeline),
}
return query_response
[docs]
def split_train_test_data(
df: pd.DataFrame, query_json: DiffPrivLibRequestModel
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Split the data between train and test set
Args:
df (pd.DataFrame): dataframe with the data
query_json (DiffPrivLibRequestModel): user input query indication
feature_columns (list[str]): columns from data to use as features
target_columns (list[str]): columns from data to use as target (to predict)
test_size (float): proportion of data in the test set
test_train_split_seed (int): seed for the random train-test split
Returns:
x_train (pd.DataFrame): training data features
x_test (pd.DataFrame): testing data features
y_train (pd.DataFrame): training data target
y_test (pd.DataFrame): testing data target
"""
feature_data = df[query_json.feature_columns]
if query_json.target_columns is None:
x_train, x_test = train_test_split(
feature_data,
test_size=query_json.test_size,
random_state=query_json.test_train_split_seed,
)
y_train, y_test = None, None
else:
label_data = df[query_json.target_columns]
x_train, x_test, y_train, y_test = train_test_split(
feature_data,
label_data,
test_size=query_json.test_size,
random_state=query_json.test_train_split_seed,
)
return x_train, x_test, y_train, y_test