Source code for lomas_server.tests.test_api_diffprivlib

import warnings

import pytest
from diffprivlib import models
from diffprivlib.utils import (
    DiffprivlibCompatibilityWarning,
    PrivacyLeakWarning,
)
from diffprivlib_logger import serialise_pipeline
from fastapi import status
from fastapi.testclient import TestClient
from sklearn.pipeline import Pipeline

from lomas_core.constants import DPLibraries
from lomas_core.models.exceptions import (
    ExternalLibraryExceptionModel,
    InvalidQueryExceptionModel,
    UnauthorizedAccessExceptionModel,
)
from lomas_core.models.requests_examples import (
    example_diffprivlib,
    example_dummy_diffprivlib,
)
from lomas_core.models.responses import (
    CostResponse,
    DiffPrivLibQueryResult,
    QueryResponse,
)
from lomas_server.app import app
from lomas_server.tests.test_api_root import TestSetupRootAPIEndpoint
from lomas_server.tests.utils import submit_job_wait, wait_for_job


[docs] def validate_pipeline(client, response) -> QueryResponse: """Validate that the pipeline ran successfully. Returns a model and a score. """ assert response.status_code == status.HTTP_202_ACCEPTED job_uid = response.json()["uid"] job = wait_for_job(client, f"/status/{job_uid}") r_model = QueryResponse.model_validate(job.result) assert isinstance(r_model.result, DiffPrivLibQueryResult) return r_model
[docs] class TestDiffPrivLibEndpoint(TestSetupRootAPIEndpoint): """Test DiffPrivLib Endpoint with different models."""
[docs] @pytest.mark.long def test_diffprivlib_query(self) -> None: """Test diffprivlib query.""" with TestClient(app, headers=self.headers) as client: # Expect to work response = client.post( "/diffprivlib_query", json=example_diffprivlib, headers=self.headers, ) r_model = validate_pipeline(client, response) assert r_model.requested_by == self.user_name assert r_model.result.score >= 0 assert r_model.epsilon > 0 assert r_model.delta == 0 # # Should work for different imputation strategy (but does not yet #255) def test_imputation(diffprivlib_body, imputer_strategy): diffprivlib_body = dict(diffprivlib_body) diffprivlib_body["imputer_strategy"] = imputer_strategy job = submit_job_wait( client, "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) assert response.status_code == status.HTTP_202_ACCEPTED return job job = test_imputation(example_diffprivlib, "mean") assert job.status == "complete" job = test_imputation(example_diffprivlib, "median") assert job.status == "complete" job = test_imputation(example_diffprivlib, "most_frequent") assert job.status == "complete" # Should not work unknow imputation strategy job = test_imputation(example_diffprivlib, "i_do_not_exist") assert job.status == "failed" assert job.status_code == status.HTTP_400_BAD_REQUEST assert job.error == InvalidQueryExceptionModel( message="Imputation strategy i_do_not_exist not supported." )
[docs] @pytest.mark.long def test_diffprivlib_privacy_leak(self) -> None: """Test diffprivlib privacy leak error.""" with TestClient(app, headers=self.headers) as client: # Should still work: automatically added for first step warnings.simplefilter("error", PrivacyLeakWarning) diffprivlib_body = dict(example_diffprivlib) dpl_pipeline = Pipeline( [ ("scaler", models.StandardScaler(epsilon=0.5)), ("classifier", models.LogisticRegression(epsilon=1.0, data_norm=7000.0)), ] ) dpl_string = serialise_pipeline(dpl_pipeline) diffprivlib_body["diffprivlib_json"] = dpl_string response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response) # Should not work: Privacy Leak Warning on data norm warnings.simplefilter("error", PrivacyLeakWarning) diffprivlib_body = dict(example_diffprivlib) dpl_pipeline = Pipeline( [ ("scaler", models.StandardScaler(epsilon=0.5)), ("classifier", models.LogisticRegression(epsilon=1.0)), ] ) dpl_string = serialise_pipeline(dpl_pipeline) diffprivlib_body["diffprivlib_json"] = dpl_string job = submit_job_wait( client, "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) assert job.status == "failed" assert job.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert job.error == ExternalLibraryExceptionModel( message="PrivacyLeakWarning: " + "Data norm has not been specified and will be calculated on the data provided. " + "This will result in additional privacy leakage. " + "To ensure differential privacy and no additional privacy leakage, specify " + "`data_norm` at initialisation. " + "Lomas server cannot fit pipeline on data, " + "PrivacyLeakWarning is a blocker.", library=DPLibraries.DIFFPRIVLIB, ) # Should not work: Privacy Leak Warning on bounds diffprivlib_body = dict(example_diffprivlib) dpl_pipeline = Pipeline( [ ("scaler", models.StandardScaler(epsilon=0.5)), ("classifier", models.GaussianNB(epsilon=1.0)), ] ) dpl_string = serialise_pipeline(dpl_pipeline) diffprivlib_body["diffprivlib_json"] = dpl_string job = submit_job_wait( client, "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) assert job.status == "failed" assert job.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY assert job.error == ExternalLibraryExceptionModel( message="PrivacyLeakWarning: " + "Bounds have not been specified and will be calculated on the data provided. " + "This will result in additional privacy leakage. " + "To ensure differential privacy and no additional privacy leakage, " + "specify bounds for each dimension. " + "Lomas server cannot fit pipeline on data, " + "PrivacyLeakWarning is a blocker.", library=DPLibraries.DIFFPRIVLIB, )
[docs] def test_diffprivlib_compatibility_error(self) -> None: """Test diffprivlib compatibility error.""" # Should not work: Compatibility Warning warnings.simplefilter("error", DiffprivlibCompatibilityWarning) with pytest.raises(DiffprivlibCompatibilityWarning): Pipeline( [ ("scaler", models.StandardScaler(epsilon=0.5)), ( "classifier", models.LogisticRegression(epsilon=1.0, svd_solver="full"), ), ] )
[docs] def test_logistic_regression_models(self) -> None: """Test diffprivlib query: Logistic Regression.""" with TestClient(app, headers=self.headers) as client: bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0]) # Test Logistic Regression pipeline = Pipeline( [ ( "scaler", models.StandardScaler(epsilon=0.5, bounds=bounds), ), ( "classifier", models.LogisticRegression(epsilon=1.0, data_norm=83.69), ), ] ) diffprivlib_body = dict(example_diffprivlib) diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline) response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response)
[docs] def test_linear_regression_models(self) -> None: """Test diffprivlib query: Linear Regression.""" with TestClient(app, headers=self.headers) as client: # Test Linear Regression pipeline = Pipeline( [ ( "lr", models.LinearRegression( epsilon=2.0, bounds_X=(30.0, 65.0), bounds_y=(150.0, 250.0), ), ), ] ) diffprivlib_body = dict(example_diffprivlib) diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline) diffprivlib_body["feature_columns"] = ["bill_length_mm"] diffprivlib_body["target_columns"] = ["flipper_length_mm"] response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response) # Test Linear Regression: no bounds should also work pipeline = Pipeline( [ ( "lr", models.LinearRegression( epsilon=2.0, ), ), ] ) diffprivlib_body = dict(example_diffprivlib) diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline) diffprivlib_body["feature_columns"] = ["bill_length_mm"] diffprivlib_body["target_columns"] = ["flipper_length_mm"] response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response)
[docs] def test_linear_regression_models_same_columns(self) -> None: """Test diffprivlib query: Same columns.""" with TestClient(app, headers=self.headers) as client: # Test Linear Regression pipeline = Pipeline( [ ( "lr", models.LinearRegression( epsilon=2.0, bounds_X=(30.0, 65.0), bounds_y=(150.0, 250.0), ), ), ] ) diffprivlib_body = dict(example_diffprivlib) diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline) # Should fail (same column in target and feature) diffprivlib_body["feature_columns"] = ["bill_length_mm"] diffprivlib_body["target_columns"] = ["bill_length_mm"] job = submit_job_wait( client, "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) assert job.status == "failed" assert job.status_code == status.HTTP_400_BAD_REQUEST assert job.error == InvalidQueryExceptionModel( message="Columns cannot be both feature and target: bill_length_mm" )
[docs] def test_naives_bayes_model(self) -> None: """Test diffprivlib query: Gaussian Naives Bayes.""" with TestClient(app, headers=self.headers) as client: bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0]) # Test Gaussian Naives Bayes pipeline = Pipeline( [ ( "scaler", models.StandardScaler(epsilon=0.5, bounds=bounds), ), ( "gaussian", models.GaussianNB(epsilon=1.0, bounds=bounds, priors=(0.3, 0.3, 0.4)), ), ] ) diffprivlib_body = dict(example_diffprivlib) diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline) response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response)
[docs] def test_trees_models(self) -> None: """Test diffprivlib query: Random Forest, Decision Tree.""" with TestClient(app, headers=self.headers) as client: bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0]) # Test Random Forest pipeline = Pipeline( [ ( "rf", models.RandomForestClassifier( n_estimators=10, epsilon=2.0, bounds=bounds, classes=["Adelie", "Chinstrap", "Gentoo"], ), ), ] ) diffprivlib_body = dict(example_diffprivlib) diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline) response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response) # Test Decision Tree Classifier pipeline = Pipeline( [ ( "dtc", models.DecisionTreeClassifier( epsilon=2.0, bounds=bounds, classes=["Adelie", "Chinstrap", "Gentoo"], ), ), ] ) diffprivlib_body = dict(example_diffprivlib) diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline) response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response)
[docs] def test_clustering_models(self) -> None: """Test diffprivlib query: K-Means.""" with TestClient(app, headers=self.headers) as client: bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0]) # Test K-MEANS pipeline = Pipeline( [ ( "kmeans", models.KMeans(n_clusters=8, epsilon=2.0, bounds=bounds), ), ] ) diffprivlib_body = dict(example_diffprivlib) diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline) response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response) diffprivlib_body["target_columns"] = None response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response)
[docs] def test_dimension_reduction_models(self) -> None: """Test diffprivlib query: PCA.""" with TestClient(app, headers=self.headers) as client: bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0]) # Test PCA pipeline = Pipeline( [ ( "pca", models.PCA( n_components=8, epsilon=2.0, bounds=bounds, data_norm=100, ), ), ] ) diffprivlib_body = dict(example_diffprivlib) diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline) response = client.post( "/diffprivlib_query", json=diffprivlib_body, headers=self.headers, ) validate_pipeline(client, response)
[docs] def test_dummy_diffprivlib_query(self) -> None: """Test_dummy_diffprivlib_query.""" with TestClient(app, headers=self.headers) as client: # Expect to work job = submit_job_wait( client, "/dummy_diffprivlib_query", json=example_dummy_diffprivlib, headers=self.headers, ) r_model = QueryResponse.model_validate(job.result) assert isinstance(r_model.result, DiffPrivLibQueryResult) assert r_model.result.score >= 0 # Expect to fail: user does not have access to dataset body = dict(example_dummy_diffprivlib) body["dataset_name"] = "IRIS" response = client.post( "/dummy_diffprivlib_query", json=body, headers=self.headers, ) assert response.status_code == status.HTTP_403_FORBIDDEN assert ( response.json() == UnauthorizedAccessExceptionModel( message=f"{self.user_name} does not have access to IRIS." ).model_dump() )
[docs] def test_diffprivlib_cost(self) -> None: """Test_diffprivlib_cost.""" with TestClient(app, headers=self.headers) as client: # Expect to work job = submit_job_wait( client, "/estimate_diffprivlib_cost", json=example_diffprivlib, headers=self.headers, ) r_model = CostResponse.model_validate(job.result) assert r_model.epsilon == 1.5 assert r_model.delta == 0 # Expect to fail: user does have access to dataset body = dict(example_diffprivlib) body["dataset_name"] = "IRIS" response = client.post( "/estimate_diffprivlib_cost", json=body, headers=self.headers, ) assert response.status_code == status.HTTP_403_FORBIDDEN assert ( response.json() == UnauthorizedAccessExceptionModel( message=f"{self.user_name} does not have access to IRIS." ).model_dump() )