Source code for lomas_server.dp_queries.dp_querier

from abc import ABC, abstractmethod
from typing import Generic, TypeVar

from lomas_core.error_handler import (
    KNOWN_EXCEPTIONS,
    InternalServerException,
    InvalidQueryException,
    UnauthorizedAccessException,
)
from lomas_core.models.requests import (  # pylint: disable=W0611
    LomasRequestModel,
    QueryModel,
)
from lomas_core.models.responses import (  # pylint: disable=W0611
    QueryResponse,
    QueryResultTypeAlias,
)

from lomas_server.admin_database.admin_database import AdminDatabase
from lomas_server.data_connector.data_connector import DataConnector

RequestModelGeneric = TypeVar("RequestModelGeneric", bound="LomasRequestModel")
QueryModelGeneric = TypeVar("QueryModelGeneric", bound="QueryModel")
QueryResultGeneric = TypeVar("QueryResultGeneric", bound="QueryResultTypeAlias")


[docs] class DPQuerier( ABC, Generic[RequestModelGeneric, QueryModelGeneric, QueryResultGeneric] ): """ Abstract Base Class for Queriers to external DP library. A querier type is specific to a DP library and a querier instance is specific to a DataConnector instance. """ def __init__( self, data_connector: DataConnector, admin_database: AdminDatabase, ) -> None: """Initialise with specific dataset. Args: data_connector (DataConnector): The private dataset to query. admin_database (AdminDatabase): An initialized instance of an AdminDatabase. """ self.data_connector = data_connector self.admin_database = admin_database
[docs] @abstractmethod def cost(self, query_json: RequestModelGeneric) -> tuple[float, float]: """ Estimate cost of query. Args: query_json (RequestModelGeneric): The input object of the request. Must be a subclass of LomasRequestModel. Returns: tuple[float, float]: The tuple of costs, the first value is the epsilon cost, the second value is the delta value. """
[docs] @abstractmethod def query(self, query_json: QueryModelGeneric) -> QueryResultGeneric: """ Perform the query and return the response. Args: query_json (QueryModelGeneric): The input object of the query. Must be a subclass of QueryModel. Returns: dict | int | float | List[Any] | Any | str: The query result, to be added to the response dict. """
[docs] def handle_query( self, query_json: QueryModel, user_name: str, ) -> QueryResponse: """ Handle DP query. Args: query_json (LomasRequestModel): The input object of the query. Must be a subclass of QueryModel. user_name (str, optional): User name. Raises: UnauthorizedAccessException: A query is already ongoing for this user, the user does not exist or does not have access to the dataset. InvalidQueryException: If the query is not valid. InternalServerException: For any other unforseen exceptions. Returns: QueryResponse: The response object. # TODO remove what is next. A dictionary containing: - requested_by (str): The user name. - query_response (pd.DataFrame): A DataFrame containing the query response. - spent_epsilon (float): The amount of epsilon budget spent for the query. - spent_delta (float): The amount of delta budget spent for the query. """ # Block access to other queries to user if not self.admin_database.get_and_set_may_user_query(user_name, False): raise UnauthorizedAccessException( f"User {user_name} is trying to query" + " before end of previous query." ) try: # Get cost of the query eps_cost, delta_cost = self.cost(query_json) # type: ignore [arg-type] # Check that enough budget to do the query try: ( eps_remain, delta_remain, ) = self.admin_database.get_remaining_budget( user_name, query_json.dataset_name ) except UnauthorizedAccessException as e: raise e if (eps_remain < eps_cost) or (delta_remain < delta_cost): raise InvalidQueryException( "Not enough budget for this query epsilon remaining " f"{eps_remain}, delta remaining {delta_remain}." ) # Query try: query_result = self.query(query_json) # type: ignore [arg-type] except KNOWN_EXCEPTIONS as e: raise e except Exception as e: raise InternalServerException(str(e)) from e # Deduce budget from user self.admin_database.update_budget( user_name, query_json.dataset_name, eps_cost, delta_cost ) response = QueryResponse( requested_by=user_name, result=query_result, epsilon=eps_cost, delta=delta_cost, ) # Add query to db (for archive) self.admin_database.save_query( user_name, query_json, response ) # TODO 359 here except Exception as e: self.admin_database.set_may_user_query(user_name, True) raise e # Re-enable user to query self.admin_database.set_may_user_query(user_name, True) # Return response return response