Source code for lomas_server.routes.utils

import random
import time
from collections.abc import AsyncGenerator
from functools import wraps

from fastapi import Request
from lomas_core.constants import DPLibraries
from lomas_core.error_handler import (
    KNOWN_EXCEPTIONS,
    InternalServerException,
    UnauthorizedAccessException,
)
from lomas_core.models.requests import (
    DummyQueryModel,
    LomasRequestModel,
    QueryModel,
)
from lomas_core.models.responses import CostResponse, QueryResponse

from lomas_server.data_connector.factory import data_connector_factory
from lomas_server.dp_queries.dp_libraries.factory import querier_factory
from lomas_server.dp_queries.dummy_dataset import get_dummy_dataset_for_query
from lomas_server.utils.config import get_config


[docs] def timing_protection(func): """Adds delays to requests response to protect against timing attack.""" @wraps(func) def wrapper(*args, **kwargs): start_time = time.time() response = func(*args, **kwargs) process_time = time.time() - start_time config = get_config() if config.server.time_attack: match config.server.time_attack.method: case "stall": # Slows to a minimum response time defined by magnitude if process_time < config.server.time_attack.magnitude: time.sleep(config.server.time_attack.magnitude - process_time) case "jitter": # Adds some time between 0 and magnitude secs time.sleep( config.server.time_attack.magnitude * random.uniform(0, 1) ) case _: raise InternalServerException("Time attack method not supported.") return response return wrapper
[docs] async def server_live(request: Request) -> AsyncGenerator: """ Checks the server is live and throws an exception otherwise. Args: request (Request): Raw request Raises: InternalServerException: If the server is not live. Returns: AsyncGenerator """ if not request.app.state.server_state["LIVE"]: raise InternalServerException( "Woops, the server did not start correctly." + "Contact the administrator of this service.", ) yield
[docs] @timing_protection def handle_query_on_private_dataset( request: Request, query_json: QueryModel, user_name: str, dp_library: DPLibraries, ) -> QueryResponse: """ Handles queries for the SmartNoiseSQL library. Args: request (Request): Raw request object query_json (BaseModel): A JSON object containing the user request user_name (str): The user name dp_library: Name of the DP library to use for the query Raises: ExternalLibraryException: For exceptions from libraries external to this package. InternalServerException: For any other unforseen exceptions. InvalidQueryException: If there is not enough budget or the dataset does not exist. UnauthorizedAccessException: A query is already ongoing for this user, the user does not exist or does not have access to the dataset. Returns: JSONResponse: A JSON object containing the following: - requested_by (str): The user name. - query_response (pd.DataFrame): A DataFrame containing the query response. - spent_epsilon (float): The amount of epsilon budget spent for the query. - spent_delta (float): The amount of delta budget spent for the query. """ app = request.app data_connector = data_connector_factory( query_json.dataset_name, app.state.admin_database, app.state.private_credentials, ) dp_querier = querier_factory( dp_library, data_connector=data_connector, admin_database=app.state.admin_database, ) try: response = dp_querier.handle_query(query_json, user_name) except KNOWN_EXCEPTIONS as e: raise e except Exception as e: raise InternalServerException(str(e)) from e return response
[docs] def handle_query_on_dummy_dataset( request: Request, query_json: DummyQueryModel, user_name: str, dp_library: DPLibraries, ) -> QueryResponse: """ Handles queries for the SmartNoiseSQL library. Args: request (Request): Raw request object query_json (BaseModel): A JSON object containing the user request user_name (str): The user name dp_library: Name of the DP library to use for the query Raises: ExternalLibraryException: For exceptions from libraries external to this package. InternalServerException: For any other unforseen exceptions. InvalidQueryException: If there is not enough budget or the dataset does not exist. Returns: JSONResponse: A JSON object containing the query response. """ app = request.app dataset_name = query_json.dataset_name if not app.state.admin_database.has_user_access_to_dataset(user_name, dataset_name): raise UnauthorizedAccessException( f"{user_name} does not have access to {dataset_name}.", ) ds_data_connector = get_dummy_dataset_for_query( app.state.admin_database, query_json ) dummy_querier = querier_factory( dp_library, data_connector=ds_data_connector, admin_database=app.state.admin_database, ) try: eps_cost, delta_cost = dummy_querier.cost(query_json) result = dummy_querier.query(query_json) response = QueryResponse( requested_by=user_name, result=result, epsilon=eps_cost, delta=delta_cost ) except KNOWN_EXCEPTIONS as e: raise e except Exception as e: raise InternalServerException(str(e)) from e return response
[docs] @timing_protection def handle_cost_query( request: Request, query_json: LomasRequestModel, user_name: str, dp_library: DPLibraries, ) -> CostResponse: """ Handles cost queries for DP libraries. Args: request (Request): Raw request object query_json (BaseModel): A JSON object containing the user request user_name (str): The user name dp_library: Name of the DP library to use for the query Raises: ExternalLibraryException: For exceptions from libraries external to this package. InternalServerException: For any other unforseen exceptions. InvalidQueryException: The dataset does not exist. Returns: JSONResponse: A JSON object containing: - epsilon_cost (float): The estimated epsilon cost. - delta_cost (float): The estimated delta cost. """ app = request.app dataset_name = query_json.dataset_name if not app.state.admin_database.has_user_access_to_dataset(user_name, dataset_name): raise UnauthorizedAccessException( f"{user_name} does not have access to {dataset_name}.", ) data_connector = data_connector_factory( query_json.dataset_name, app.state.admin_database, app.state.private_credentials, ) dp_querier = querier_factory( dp_library, data_connector=data_connector, admin_database=app.state.admin_database, ) try: eps_cost, delta_cost = dp_querier.cost(query_json) except KNOWN_EXCEPTIONS as e: raise e except Exception as e: raise InternalServerException(str(e)) from e return CostResponse(epsilon=eps_cost, delta=delta_cost)