from typing import List
from pymongo import MongoClient
from pymongo.database import Database
from pymongo.errors import WriteConcernError
from pymongo.results import _WriteResult
from admin_database.admin_database import (
AdminDatabase,
dataset_must_exist,
user_must_exist,
user_must_have_access_to_dataset,
)
from utils.error_handler import InvalidQueryException
[docs]
class AdminMongoDatabase(AdminDatabase):
"""
Overall MongoDB database management for server state.
"""
def __init__(self, connection_string: str, database_name: str) -> None:
"""Connect to database.
Args:
connection_string (str): Connection string to the mongodb
database_name (str): Mongodb database name.
"""
self.db: Database = MongoClient(connection_string)[database_name]
[docs]
def does_user_exist(self, user_name: str) -> bool:
"""Checks if user exist in the database
Args:
user_name (str): name of the user to check
Returns:
bool: True if the user exists, False otherwise.
"""
doc_count = self.db.users.count_documents(
{"user_name": f"{user_name}"}
)
return doc_count > 0
[docs]
def does_dataset_exist(self, dataset_name: str) -> bool:
"""Checks if dataset exist in the database
Args:
dataset_name (str): name of the dataset to check
Returns:
bool: True if the dataset exists, False otherwise.
"""
collection_query = self.db.datasets.find({})
for document in collection_query:
if document["dataset_name"] == dataset_name:
return True
return False
[docs]
@user_must_exist
def set_may_user_query(self, user_name: str, may_query: bool) -> None:
"""Sets if a user may query the server.
(Set False before querying and True after updating budget)
Wrapped by :py:func:`user_must_exist`.
Args:
user_name (str): name of the user
may_query (bool): flag give or remove access to user
Raises:
WriteConcernError: If the result is not acknowledged.
"""
res = self.db.users.update_one(
{"user_name": f"{user_name}"},
{"$set": {"may_query": may_query}},
)
check_result_acknowledged(res)
[docs]
@user_must_exist
def get_and_set_may_user_query(
self, user_name: str, may_query: bool
) -> bool:
"""
Atomic operation to check and set if the user may query the server.
(Set False before querying and True after updating budget)
Wrapped by :py:func:`user_must_exist`.
Args:
user_name (str): name of the user
may_query (bool): flag give or remove access to user
Returns:
bool: The may_query status of the user before the update.
"""
res = self.db.users.find_one_and_update(
{"user_name": user_name}, {"$set": {"may_query": may_query}}
)
return res["may_query"] # type: ignore
[docs]
@user_must_exist
def has_user_access_to_dataset(
self, user_name: str, dataset_name: str
) -> bool:
"""Checks if a user may access a particular dataset
Wrapped by :py:func:`user_must_exist`.
Args:
user_name (str): name of the user
dataset_name (str): name of the dataset
Returns:
bool: True if the user has access, False otherwise.
"""
if not self.does_dataset_exist(dataset_name):
raise InvalidQueryException(
f"Dataset {dataset_name} does not exist. "
+ "Please, verify the client object initialisation.",
)
doc_count = self.db.users.count_documents(
{
"user_name": f"{user_name}",
"datasets_list.dataset_name": f"{dataset_name}",
}
)
return doc_count > 0
[docs]
def get_epsilon_or_delta(
self, user_name: str, dataset_name: str, parameter: str
) -> float:
"""Get the total spent epsilon or delta by a specific user
on a specific dataset
Args:
user_name (str): name of the user
dataset_name (str): name of the dataset
parameter (str): total_spent_epsilon or total_spent_delta
Returns:
float: The requested budget value.
"""
return list(
self.db.users.aggregate(
[
{"$unwind": "$datasets_list"},
{
"$match": {
"user_name": f"{user_name}",
"datasets_list.dataset_name": f"{dataset_name}",
}
},
]
)
)[0]["datasets_list"][parameter]
[docs]
def update_epsilon_or_delta(
self,
user_name: str,
dataset_name: str,
parameter: str,
spent_value: float,
) -> None:
"""Update the current budget spent by a specific user
with the last spent budget.
Args:
user_name (str): name of the user
dataset_name (str): name of the dataset
parameter (str): "current_epsilon" or "current_delta"
spent_value (float): spending of epsilon or delta on last query
Raises:
WriteConcernError: If the result is not acknowledged.
"""
res = self.db.users.update_one(
{
"user_name": f"{user_name}",
"datasets_list.dataset_name": f"{dataset_name}",
},
{"$inc": {f"datasets_list.$.{parameter}": spent_value}},
)
check_result_acknowledged(res)
[docs]
@dataset_must_exist
def get_dataset_field(self, dataset_name: str, key: str) -> str:
"""Get dataset field type based on dataset name and key
Wrapped by :py:func:`dataset_must_exist`.
Args:
dataset_name (str): Name of the dataset.
key (str): Key for the value to get in the dataset dict.
Returns:
str: The requested value.
"""
dataset = self.db.datasets.find_one({"dataset_name": dataset_name})
return dataset[key] # type: ignore
[docs]
@user_must_have_access_to_dataset
def get_user_previous_queries(
self,
user_name: str,
dataset_name: str,
) -> List[dict]:
"""Retrieves and return the queries already done by a user
Wrapped by :py:func:`user_must_have_access_to_dataset`.
Args:
user_name (str): name of the user
dataset_name (str): name of the dataset
Returns:
List[dict]: List of previous queries.
"""
queries = self.db.queries_archives.find(
{
"user_name": f"{user_name}",
"dataset_name": f"{dataset_name}",
},
{"_id": 0},
)
return list(queries)
[docs]
def save_query(
self, user_name: str, query_json: dict, response: dict
) -> None:
"""Save queries of user on datasets in a separate collection (table)
named "queries_archives" in the DB
Args:
user_name (str): name of the user
query_json (dict): json received from client
response (dict): response sent to the client
Raises:
WriteConcernError: If the result is not acknowledged.
"""
to_archive = super().prepare_save_query(
user_name, query_json, response
)
res = self.db.queries_archives.insert_one(to_archive)
check_result_acknowledged(res)
[docs]
def check_result_acknowledged(res: _WriteResult) -> None:
"""Raises an exception if the result is not acknowledged.
Args:
res (_WriteResult): The PyMongo WriteResult to check.
Raises:
WriteConcernError: If the result is not acknowledged.
"""
if not res.acknowledged:
raise WriteConcernError(
"Write request not acknowledged by MongoDB database."
+ " Please contact the administrator of the server."
)