Source code for lomas_server.dp_queries.dp_querier

from abc import ABC, abstractmethod
from typing import Any, Generic, List, TypeVar

from lomas_server.admin_database.admin_database import AdminDatabase
from lomas_server.data_connector.data_connector import DataConnector
from lomas_server.utils.error_handler import (
    KNOWN_EXCEPTIONS,
    InternalServerException,
    InvalidQueryException,
    UnauthorizedAccessException,
)
from lomas_server.utils.query_models import (  # pylint: disable=W0611
    QueryModel,
    RequestModel,
)

RequestModelGeneric = TypeVar("RequestModelGeneric", bound="RequestModel")
QueryModelGeneric = TypeVar("QueryModelGeneric", bound="QueryModel")


[docs] class DPQuerier(ABC, Generic[RequestModelGeneric, QueryModelGeneric]): """ Abstract Base Class for Queriers to external DP library. A querier type is specific to a DP library and a querier instance is specific to a DataConnector instance. """ def __init__( self, data_connector: DataConnector, admin_database: AdminDatabase, ) -> None: """Initialise with specific dataset Args: data_connector (DataConnector): The private dataset to query. admin_database (AdminDatabase): An initialized instance of an AdminDatabase. """ self.data_connector = data_connector self.admin_database = admin_database
[docs] @abstractmethod def cost(self, query_json: RequestModelGeneric) -> tuple[float, float]: """ Estimate cost of query. Args: query_json (RequestModelGeneric): The input object of the request. Must be a subclass of RequestModel. Returns: tuple[float, float]: The tuple of costs, the first value is the epsilon cost, the second value is the delta value. """
[docs] @abstractmethod def query( self, query_json: QueryModelGeneric ) -> dict | int | float | List[Any] | Any | str: """ Perform the query and return the response. Args: query_json (QueryModelGeneric): The input object of the query. Must be a subclass of QueryModel. Returns: dict | int | float | List[Any] | Any | str: The query result, to be added to the response dict. """
[docs] def handle_query( self, query_json: QueryModel, user_name: str, ) -> dict: """ Handle DP query. Args: query_json (RequestModel): The input object of the query. Must be a subclass of QueryModel. user_name (str, optional): User name. Raises: UnauthorizedAccessException: A query is already ongoing for this user, the user does not exist or does not have access to the dataset. InvalidQueryException: If the query is not valid. InternalServerException: For any other unforseen exceptions. Returns: dict: A dictionary containing: - requested_by (str): The user name. - query_response (pd.DataFrame): A DataFrame containing the query response. - spent_epsilon (float): The amount of epsilon budget spent for the query. - spent_delta (float): The amount of delta budget spent for the query. """ # Block access to other queries to user if not self.admin_database.get_and_set_may_user_query( user_name, False ): raise UnauthorizedAccessException( f"User {user_name} is trying to query" + " before end of previous query." ) try: # Get cost of the query eps_cost, delta_cost = self.cost(query_json) # type: ignore [arg-type] # Check that enough budget to do the query try: ( eps_remain, delta_remain, ) = self.admin_database.get_remaining_budget( user_name, query_json.dataset_name ) except UnauthorizedAccessException as e: raise e if (eps_remain < eps_cost) or (delta_remain < delta_cost): raise InvalidQueryException( "Not enough budget for this query epsilon remaining " f"{eps_remain}, delta remaining {delta_remain}." ) # Query try: query_response = self.query(query_json) # type: ignore [arg-type] except KNOWN_EXCEPTIONS as e: raise e except Exception as e: raise InternalServerException(str(e)) from e # Deduce budget from user self.admin_database.update_budget( user_name, query_json.dataset_name, eps_cost, delta_cost ) response = { "requested_by": user_name, "query_response": query_response, "spent_epsilon": eps_cost, "spent_delta": delta_cost, } # Add query to db (for archive) self.admin_database.save_query(user_name, query_json, response) except Exception as e: self.admin_database.set_may_user_query(user_name, True) raise e # Re-enable user to query self.admin_database.set_may_user_query(user_name, True) # Return response return response