import warnings
import pytest
from diffprivlib import models
from diffprivlib.utils import (
DiffprivlibCompatibilityWarning,
PrivacyLeakWarning,
)
from diffprivlib_logger import serialise_pipeline
from fastapi import status
from fastapi.testclient import TestClient
from sklearn.pipeline import Pipeline
from lomas_core.constants import DPLibraries
from lomas_core.models.exceptions import (
ExternalLibraryExceptionModel,
InvalidQueryExceptionModel,
UnauthorizedAccessExceptionModel,
)
from lomas_core.models.requests_examples import (
example_diffprivlib,
example_dummy_diffprivlib,
)
from lomas_core.models.responses import (
CostResponse,
DiffPrivLibQueryResult,
QueryResponse,
)
from lomas_server.app import app
from lomas_server.tests.test_api_root import TestSetupRootAPIEndpoint
from lomas_server.tests.utils import submit_job_wait, wait_for_job
[docs]
def validate_pipeline(client, response) -> QueryResponse:
"""Validate that the pipeline ran successfully.
Returns a model and a score.
"""
assert response.status_code == status.HTTP_202_ACCEPTED
job_uid = response.json()["uid"]
job = wait_for_job(client, f"/status/{job_uid}")
r_model = QueryResponse.model_validate(job.result)
assert isinstance(r_model.result, DiffPrivLibQueryResult)
return r_model
[docs]
class TestDiffPrivLibEndpoint(TestSetupRootAPIEndpoint):
"""Test DiffPrivLib Endpoint with different models."""
[docs]
@pytest.mark.long
def test_diffprivlib_query(self) -> None:
"""Test diffprivlib query."""
with TestClient(app, headers=self.headers) as client:
# Expect to work
response = client.post(
"/diffprivlib_query",
json=example_diffprivlib,
headers=self.headers,
)
r_model = validate_pipeline(client, response)
assert r_model.requested_by == self.user_name
assert r_model.result.score >= 0
assert r_model.epsilon > 0
assert r_model.delta == 0
# # Should work for different imputation strategy (but does not yet #255)
def test_imputation(diffprivlib_body, imputer_strategy):
diffprivlib_body = dict(diffprivlib_body)
diffprivlib_body["imputer_strategy"] = imputer_strategy
job = submit_job_wait(
client,
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
assert response.status_code == status.HTTP_202_ACCEPTED
return job
job = test_imputation(example_diffprivlib, "mean")
assert job.status == "complete"
job = test_imputation(example_diffprivlib, "median")
assert job.status == "complete"
job = test_imputation(example_diffprivlib, "most_frequent")
assert job.status == "complete"
# Should not work unknow imputation strategy
job = test_imputation(example_diffprivlib, "i_do_not_exist")
assert job.status == "failed"
assert job.status_code == status.HTTP_400_BAD_REQUEST
assert job.error == InvalidQueryExceptionModel(
message="Imputation strategy i_do_not_exist not supported."
)
[docs]
@pytest.mark.long
def test_diffprivlib_privacy_leak(self) -> None:
"""Test diffprivlib privacy leak error."""
with TestClient(app, headers=self.headers) as client:
# Should still work: automatically added for first step
warnings.simplefilter("error", PrivacyLeakWarning)
diffprivlib_body = dict(example_diffprivlib)
dpl_pipeline = Pipeline(
[
("scaler", models.StandardScaler(epsilon=0.5)),
("classifier", models.LogisticRegression(epsilon=1.0, data_norm=7000.0)),
]
)
dpl_string = serialise_pipeline(dpl_pipeline)
diffprivlib_body["diffprivlib_json"] = dpl_string
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
# Should not work: Privacy Leak Warning on data norm
warnings.simplefilter("error", PrivacyLeakWarning)
diffprivlib_body = dict(example_diffprivlib)
dpl_pipeline = Pipeline(
[
("scaler", models.StandardScaler(epsilon=0.5)),
("classifier", models.LogisticRegression(epsilon=1.0)),
]
)
dpl_string = serialise_pipeline(dpl_pipeline)
diffprivlib_body["diffprivlib_json"] = dpl_string
job = submit_job_wait(
client,
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
assert job.status == "failed"
assert job.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
assert job.error == ExternalLibraryExceptionModel(
message="PrivacyLeakWarning: "
+ "Data norm has not been specified and will be calculated on the data provided. "
+ "This will result in additional privacy leakage. "
+ "To ensure differential privacy and no additional privacy leakage, specify "
+ "`data_norm` at initialisation. "
+ "Lomas server cannot fit pipeline on data, "
+ "PrivacyLeakWarning is a blocker.",
library=DPLibraries.DIFFPRIVLIB,
)
# Should not work: Privacy Leak Warning on bounds
diffprivlib_body = dict(example_diffprivlib)
dpl_pipeline = Pipeline(
[
("scaler", models.StandardScaler(epsilon=0.5)),
("classifier", models.GaussianNB(epsilon=1.0)),
]
)
dpl_string = serialise_pipeline(dpl_pipeline)
diffprivlib_body["diffprivlib_json"] = dpl_string
job = submit_job_wait(
client,
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
assert job.status == "failed"
assert job.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
assert job.error == ExternalLibraryExceptionModel(
message="PrivacyLeakWarning: "
+ "Bounds have not been specified and will be calculated on the data provided. "
+ "This will result in additional privacy leakage. "
+ "To ensure differential privacy and no additional privacy leakage, "
+ "specify bounds for each dimension. "
+ "Lomas server cannot fit pipeline on data, "
+ "PrivacyLeakWarning is a blocker.",
library=DPLibraries.DIFFPRIVLIB,
)
[docs]
def test_diffprivlib_compatibility_error(self) -> None:
"""Test diffprivlib compatibility error."""
# Should not work: Compatibility Warning
warnings.simplefilter("error", DiffprivlibCompatibilityWarning)
with pytest.raises(DiffprivlibCompatibilityWarning):
Pipeline(
[
("scaler", models.StandardScaler(epsilon=0.5)),
(
"classifier",
models.LogisticRegression(epsilon=1.0, svd_solver="full"),
),
]
)
[docs]
def test_logistic_regression_models(self) -> None:
"""Test diffprivlib query: Logistic Regression."""
with TestClient(app, headers=self.headers) as client:
bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0])
# Test Logistic Regression
pipeline = Pipeline(
[
(
"scaler",
models.StandardScaler(epsilon=0.5, bounds=bounds),
),
(
"classifier",
models.LogisticRegression(epsilon=1.0, data_norm=83.69),
),
]
)
diffprivlib_body = dict(example_diffprivlib)
diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline)
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
[docs]
def test_linear_regression_models(self) -> None:
"""Test diffprivlib query: Linear Regression."""
with TestClient(app, headers=self.headers) as client:
# Test Linear Regression
pipeline = Pipeline(
[
(
"lr",
models.LinearRegression(
epsilon=2.0,
bounds_X=(30.0, 65.0),
bounds_y=(150.0, 250.0),
),
),
]
)
diffprivlib_body = dict(example_diffprivlib)
diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline)
diffprivlib_body["feature_columns"] = ["bill_length_mm"]
diffprivlib_body["target_columns"] = ["flipper_length_mm"]
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
# Test Linear Regression: no bounds should also work
pipeline = Pipeline(
[
(
"lr",
models.LinearRegression(
epsilon=2.0,
),
),
]
)
diffprivlib_body = dict(example_diffprivlib)
diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline)
diffprivlib_body["feature_columns"] = ["bill_length_mm"]
diffprivlib_body["target_columns"] = ["flipper_length_mm"]
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
[docs]
def test_linear_regression_models_same_columns(self) -> None:
"""Test diffprivlib query: Same columns."""
with TestClient(app, headers=self.headers) as client:
# Test Linear Regression
pipeline = Pipeline(
[
(
"lr",
models.LinearRegression(
epsilon=2.0,
bounds_X=(30.0, 65.0),
bounds_y=(150.0, 250.0),
),
),
]
)
diffprivlib_body = dict(example_diffprivlib)
diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline)
# Should fail (same column in target and feature)
diffprivlib_body["feature_columns"] = ["bill_length_mm"]
diffprivlib_body["target_columns"] = ["bill_length_mm"]
job = submit_job_wait(
client,
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
assert job.status == "failed"
assert job.status_code == status.HTTP_400_BAD_REQUEST
assert job.error == InvalidQueryExceptionModel(
message="Columns cannot be both feature and target: bill_length_mm"
)
[docs]
def test_naives_bayes_model(self) -> None:
"""Test diffprivlib query: Gaussian Naives Bayes."""
with TestClient(app, headers=self.headers) as client:
bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0])
# Test Gaussian Naives Bayes
pipeline = Pipeline(
[
(
"scaler",
models.StandardScaler(epsilon=0.5, bounds=bounds),
),
(
"gaussian",
models.GaussianNB(epsilon=1.0, bounds=bounds, priors=(0.3, 0.3, 0.4)),
),
]
)
diffprivlib_body = dict(example_diffprivlib)
diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline)
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
[docs]
def test_trees_models(self) -> None:
"""Test diffprivlib query: Random Forest, Decision Tree."""
with TestClient(app, headers=self.headers) as client:
bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0])
# Test Random Forest
pipeline = Pipeline(
[
(
"rf",
models.RandomForestClassifier(
n_estimators=10,
epsilon=2.0,
bounds=bounds,
classes=["Adelie", "Chinstrap", "Gentoo"],
),
),
]
)
diffprivlib_body = dict(example_diffprivlib)
diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline)
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
# Test Decision Tree Classifier
pipeline = Pipeline(
[
(
"dtc",
models.DecisionTreeClassifier(
epsilon=2.0,
bounds=bounds,
classes=["Adelie", "Chinstrap", "Gentoo"],
),
),
]
)
diffprivlib_body = dict(example_diffprivlib)
diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline)
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
[docs]
def test_clustering_models(self) -> None:
"""Test diffprivlib query: K-Means."""
with TestClient(app, headers=self.headers) as client:
bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0])
# Test K-MEANS
pipeline = Pipeline(
[
(
"kmeans",
models.KMeans(n_clusters=8, epsilon=2.0, bounds=bounds),
),
]
)
diffprivlib_body = dict(example_diffprivlib)
diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline)
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
diffprivlib_body["target_columns"] = None
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
[docs]
def test_dimension_reduction_models(self) -> None:
"""Test diffprivlib query: PCA."""
with TestClient(app, headers=self.headers) as client:
bounds = ([30.0, 13.0, 150.0, 2000.0], [65.0, 23.0, 250.0, 7000.0])
# Test PCA
pipeline = Pipeline(
[
(
"pca",
models.PCA(
n_components=8,
epsilon=2.0,
bounds=bounds,
data_norm=100,
),
),
]
)
diffprivlib_body = dict(example_diffprivlib)
diffprivlib_body["diffprivlib_json"] = serialise_pipeline(pipeline)
response = client.post(
"/diffprivlib_query",
json=diffprivlib_body,
headers=self.headers,
)
validate_pipeline(client, response)
[docs]
def test_dummy_diffprivlib_query(self) -> None:
"""Test_dummy_diffprivlib_query."""
with TestClient(app, headers=self.headers) as client:
# Expect to work
job = submit_job_wait(
client,
"/dummy_diffprivlib_query",
json=example_dummy_diffprivlib,
headers=self.headers,
)
r_model = QueryResponse.model_validate(job.result)
assert isinstance(r_model.result, DiffPrivLibQueryResult)
assert r_model.result.score >= 0
# Expect to fail: user does not have access to dataset
body = dict(example_dummy_diffprivlib)
body["dataset_name"] = "IRIS"
response = client.post(
"/dummy_diffprivlib_query",
json=body,
headers=self.headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert (
response.json()
== UnauthorizedAccessExceptionModel(
message=f"{self.user_name} does not have access to IRIS."
).model_dump()
)
[docs]
def test_diffprivlib_cost(self) -> None:
"""Test_diffprivlib_cost."""
with TestClient(app, headers=self.headers) as client:
# Expect to work
job = submit_job_wait(
client,
"/estimate_diffprivlib_cost",
json=example_diffprivlib,
headers=self.headers,
)
r_model = CostResponse.model_validate(job.result)
assert r_model.epsilon == 1.5
assert r_model.delta == 0
# Expect to fail: user does have access to dataset
body = dict(example_diffprivlib)
body["dataset_name"] = "IRIS"
response = client.post(
"/estimate_diffprivlib_cost",
json=body,
headers=self.headers,
)
assert response.status_code == status.HTTP_403_FORBIDDEN
assert (
response.json()
== UnauthorizedAccessExceptionModel(
message=f"{self.user_name} does not have access to IRIS."
).model_dump()
)