Skip to content

Server

Classes:

  • AdminDatabase

    Overall database management for server state.

Functions:

AdminDatabase #


              flowchart TD
              lomas_server.admin_database.admin_database.AdminDatabase[AdminDatabase]

              

              click lomas_server.admin_database.admin_database.AdminDatabase href "" "lomas_server.admin_database.admin_database.AdminDatabase"
            

Overall database management for server state.

Methods:

does_user_exist abstractmethod #

does_user_exist(user_name: str) -> bool

Checks if user exist in the database.

Parameters:

  • user_name #

    (str) –

    name of the user to check

Returns:

  • bool ( bool ) –

    True if the user exists, False otherwise.

Source code in server/lomas_server/admin_database/admin_database.py
125
126
127
128
129
130
131
132
133
134
135
@abstractmethod
def does_user_exist(self, user_name: str) -> bool:
    """
    Checks if user exist in the database.

    Args:
        user_name (str): name of the user to check

    Returns:
        bool: True if the user exists, False otherwise.
    """

does_dataset_exist abstractmethod #

does_dataset_exist(dataset_name: str) -> bool

Checks if dataset exist in the database.

Parameters:

  • dataset_name #

    (str) –

    name of the dataset to check

Returns:

  • bool ( bool ) –

    True if the dataset exists, False otherwise.

Source code in server/lomas_server/admin_database/admin_database.py
137
138
139
140
141
142
143
144
145
146
147
@abstractmethod
def does_dataset_exist(self, dataset_name: str) -> bool:
    """
    Checks if dataset exist in the database.

    Args:
        dataset_name (str): name of the dataset to check

    Returns:
        bool: True if the dataset exists, False otherwise.
    """

get_dataset_metadata abstractmethod #

get_dataset_metadata(dataset_name: str) -> TableMetadata

Returns the metadata dictionnary of the dataset.

Wrapped by dataset_must_exist.

Parameters:

  • dataset_name #

    (str) –

    name of the dataset to get the metadata

Returns:

  • TableMetadata ( TableMetadata ) –

    The metadata object.

Source code in server/lomas_server/admin_database/admin_database.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
@abstractmethod
@dataset_must_exist
def get_dataset_metadata(self, dataset_name: str) -> TableMetadata:
    """
    Returns the metadata dictionnary of the dataset.

    Wrapped by [dataset_must_exist][lomas_server.admin_database.admin_database.dataset_must_exist].

    Args:
        dataset_name (str): name of the dataset to get the metadata

    Returns:
        TableMetadata: The metadata object.
    """

is_user_admin abstractmethod #

is_user_admin(user_name: str) -> bool

Returns true if the user is an admin.

Parameters:

  • user_name #

    (str) –

    name of the user

Returns:

  • bool ( bool ) –

    True if the user is a lomas admin.

Source code in server/lomas_server/admin_database/admin_database.py
164
165
166
167
168
169
170
171
172
173
174
175
@abstractmethod
@user_must_exist
def is_user_admin(self, user_name: str) -> bool:
    """
    Returns true if the user is an admin.

    Args:
        user_name (str): name of the user

    Returns:
        bool: True if the user is a lomas admin.
    """

set_may_user_query #

set_may_user_query(user_name: str, may_query: bool) -> None

Sets if a user may query the server..

(Set False before querying and True after updating budget)

Wrapped by user_must_exist.

Parameters:

  • user_name #

    (str) –

    name of the user

  • may_query #

    (bool) –

    flag give or remove access to user

Source code in server/lomas_server/admin_database/admin_database.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@user_must_exist
def set_may_user_query(self, user_name: str, may_query: bool) -> None:
    """
    Sets if a user may query the server..

    (Set False before querying and True after updating budget)

    Wrapped by [user_must_exist][lomas_server.admin_database.admin_database.user_must_exist].

    Args:
        user_name (str): name of the user
        may_query (bool): flag give or remove access to user
    """
    _ = self.get_and_set_may_user_query(user_name, may_query)

get_and_set_may_user_query abstractmethod #

get_and_set_may_user_query(user_name: str, may_query: bool) -> bool

Atomic operation to check and set if the user may query the server.

(Set False before querying and True after updating budget)

Wrapped by user_must_exist.

Parameters:

  • user_name #

    (str) –

    name of the user

  • may_query #

    (bool) –

    flag give or remove access to user

Returns:

  • bool ( bool ) –

    The may_query status of the user before the update.

Source code in server/lomas_server/admin_database/admin_database.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@abstractmethod
@user_must_exist
def get_and_set_may_user_query(self, user_name: str, may_query: bool) -> bool:
    """
    Atomic operation to check and set if the user may query the server.

    (Set False before querying and True after updating budget)

    Wrapped by [user_must_exist][lomas_server.admin_database.admin_database.user_must_exist].

    Args:
        user_name (str): name of the user
        may_query (bool): flag give or remove access to user

    Returns:
        bool: The may_query status of the user before the update.
    """

has_user_access_to_dataset abstractmethod #

has_user_access_to_dataset(user_name: str, dataset_name: str) -> bool

Checks if a user may access a particular dataset.

Wrapped by user_must_exist.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

Returns:

  • bool ( bool ) –

    True if the user has access, False otherwise.

Source code in server/lomas_server/admin_database/admin_database.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@abstractmethod
@user_must_exist
def has_user_access_to_dataset(self, user_name: str, dataset_name: str) -> bool:
    """
    Checks if a user may access a particular dataset.

    Wrapped by [user_must_exist][lomas_server.admin_database.admin_database.user_must_exist].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset

    Returns:
        bool: True if the user has access, False otherwise.
    """

get_epsilon_or_delta abstractmethod #

get_epsilon_or_delta(user_name: str, dataset_name: str, parameter: BudgetDBKey) -> float

Get the total spent epsilon or delta by user on dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

  • parameter #

    (str) –

    Member of BudgetDBKey.

Returns:

  • float ( float ) –

    The requested budget value.

Source code in server/lomas_server/admin_database/admin_database.py
226
227
228
229
230
231
232
233
234
235
236
237
238
@abstractmethod
def get_epsilon_or_delta(self, user_name: str, dataset_name: str, parameter: BudgetDBKey) -> float:
    """
    Get the total spent epsilon or delta by user on dataset.

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset
        parameter (str): Member of BudgetDBKey.

    Returns:
        float: The requested budget value.
    """

get_total_spent_budget #

get_total_spent_budget(user_name: str, dataset_name: str) -> list[float]

Get the total spent epsilon and delta spent by user on dataset.

Wrapped by user_must_have_access_to_dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

Returns:

  • list[float]

    List[float]: The first value of the list is the epsilon value, the second value is the delta value.

Source code in server/lomas_server/admin_database/admin_database.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@user_must_have_access_to_dataset
def get_total_spent_budget(self, user_name: str, dataset_name: str) -> list[float]:
    """
    Get the total spent epsilon and delta spent by user on dataset.

    Wrapped by [user_must_have_access_to_dataset][lomas_server.admin_database.admin_database.user_must_have_access_to_dataset].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset

    Returns:
        List[float]: The first value of the list is the epsilon value,
            the second value is the delta value.
    """
    return [
        self.get_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.EPSILON_SPENT),
        self.get_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.DELTA_SPENT),
    ]

get_initial_budget #

get_initial_budget(user_name: str, dataset_name: str) -> list[float]

Get the initial epsilon and delta budget.

Wrapped by user_must_have_access_to_dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

Returns:

  • list[float]

    List[float]: The first value of the list is the epsilon value, the second value is the delta value.

Source code in server/lomas_server/admin_database/admin_database.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
@user_must_have_access_to_dataset
def get_initial_budget(self, user_name: str, dataset_name: str) -> list[float]:
    """
    Get the initial epsilon and delta budget.

    Wrapped by [user_must_have_access_to_dataset][lomas_server.admin_database.admin_database.user_must_have_access_to_dataset].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset

    Returns:
        List[float]: The first value of the list is the epsilon value,
            the second value is the delta value.
    """
    return [
        self.get_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.EPSILON_INIT),
        self.get_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.DELTA_INIT),
    ]

get_remaining_budget #

get_remaining_budget(user_name: str, dataset_name: str) -> list[float]

Get the remaining epsilon and delta budget (initial - total spent).

Wrapped by user_must_have_access_to_dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

Returns:

  • list[float]

    List[float]: The first value of the list is the epsilon value, the second value is the delta value.

Source code in server/lomas_server/admin_database/admin_database.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
@user_must_have_access_to_dataset
def get_remaining_budget(self, user_name: str, dataset_name: str) -> list[float]:
    """
    Get the remaining epsilon and delta budget (initial - total spent).

    Wrapped by [user_must_have_access_to_dataset][lomas_server.admin_database.admin_database.user_must_have_access_to_dataset].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset

    Returns:
        List[float]: The first value of the list is the epsilon value,
            the second value is the delta value.
    """
    init_eps, init_delta = self.get_initial_budget(user_name, dataset_name)
    spent_eps, spent_delta = self.get_total_spent_budget(user_name, dataset_name)
    return [init_eps - spent_eps, init_delta - spent_delta]

update_epsilon_or_delta abstractmethod #

update_epsilon_or_delta(
    user_name: str, dataset_name: str, parameter: BudgetDBKey, spent_value: float
) -> None

Update current budget spent by user with spent budget.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

  • parameter #

    (str) –

    One of BudgetDBKey

  • spent_value #

    (float) –

    spending of epsilon or delta on last query

Source code in server/lomas_server/admin_database/admin_database.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
@abstractmethod
def update_epsilon_or_delta(
    self,
    user_name: str,
    dataset_name: str,
    parameter: BudgetDBKey,
    spent_value: float,
) -> None:
    """
    Update current budget spent by user with spent budget.

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset
        parameter (str): One of BudgetDBKey
        spent_value (float): spending of epsilon or delta on last query
    """

update_epsilon #

update_epsilon(user_name: str, dataset_name: str, spent_epsilon: float) -> None

Update spent epsilon by user with total spent epsilon.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

  • spent_epsilon #

    (float) –

    value of epsilon spent on last query

Source code in server/lomas_server/admin_database/admin_database.py
317
318
319
320
321
322
323
324
325
326
def update_epsilon(self, user_name: str, dataset_name: str, spent_epsilon: float) -> None:
    """
    Update spent epsilon by user with total spent epsilon.

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset
        spent_epsilon (float): value of epsilon spent on last query
    """
    return self.update_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.EPSILON_SPENT, spent_epsilon)

update_delta #

update_delta(user_name: str, dataset_name: str, spent_delta: float) -> None

Update spent delta spent by user with spent delta of the user.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

  • spent_delta #

    (float) –

    value of delta spent on last query

Source code in server/lomas_server/admin_database/admin_database.py
328
329
330
331
332
333
334
335
336
337
def update_delta(self, user_name: str, dataset_name: str, spent_delta: float) -> None:
    """
    Update spent delta spent by user with spent delta of the user.

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset
        spent_delta (float): value of delta spent on last query
    """
    self.update_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.DELTA_SPENT, spent_delta)

update_budget #

Update current epsilon and delta delta spent by user.

Wrapped by user_must_have_access_to_dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

  • spent_epsilon #

    (float) –

    value of epsilon spent on last query

  • spent_delta #

    (float) –

    value of delta spent on last query

Source code in server/lomas_server/admin_database/admin_database.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
@user_must_have_access_to_dataset
def update_budget(
    self,
    user_name: str,
    dataset_name: str,
    spent_epsilon: float,
    spent_delta: float,
) -> None:
    """
    Update current epsilon and delta delta spent by user.

    Wrapped by [user_must_have_access_to_dataset][lomas_server.admin_database.admin_database.user_must_have_access_to_dataset].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset
        spent_epsilon (float): value of epsilon spent on last query
        spent_delta (float): value of delta spent on last query
    """
    self.update_epsilon(user_name, dataset_name, spent_epsilon)
    self.update_delta(user_name, dataset_name, spent_delta)

get_dataset abstractmethod #

get_dataset(dataset_name: str) -> DSInfo

Get dataset access info based on dataset_name.

Wrapped by dataset_must_exist.

Parameters:

  • dataset_name #

    (str) –

    Name of the dataset.

Returns:

  • Dataset ( DSInfo ) –

    The dataset model.

Source code in server/lomas_server/admin_database/admin_database.py
361
362
363
364
365
366
367
368
369
370
371
372
373
374
@abstractmethod
@dataset_must_exist
def get_dataset(self, dataset_name: str) -> DSInfo:
    """
    Get dataset access info based on dataset_name.

    Wrapped by [dataset_must_exist][lomas_server.admin_database.admin_database.dataset_must_exist].

    Args:
        dataset_name (str): Name of the dataset.

    Returns:
        Dataset: The dataset model.
    """

get_user_previous_queries abstractmethod #

get_user_previous_queries(user_name: str, dataset_name: str) -> list[dict]

Retrieves and return the queries already done by a user.

Wrapped by user_must_have_access_to_dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

Returns:

  • list[dict]

    List[dict]: List of previous queries.

Source code in server/lomas_server/admin_database/admin_database.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
@abstractmethod
@user_must_have_access_to_dataset
def get_user_previous_queries(
    self,
    user_name: str,
    dataset_name: str,
) -> list[dict]:
    """
    Retrieves and return the queries already done by a user.

    Wrapped by [user_must_have_access_to_dataset][lomas_server.admin_database.admin_database.user_must_have_access_to_dataset].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset

    Returns:
        List[dict]: List of previous queries.
    """

prepare_save_query #

Prepare the query to save in archives.

Parameters:

Raises:

  • InternalServerException

    If the type of query is unknown.

Returns:

  • dict ( dict ) –

    The query archive dictionary.

Source code in server/lomas_server/admin_database/admin_database.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def prepare_save_query(self, user_name: str, query: LomasRequestModel, response: QueryResponse) -> dict:
    """
    Prepare the query to save in archives.

    Args:
        user_name (str): name of the user
        query (LomasRequestModel): Request object received from client
        response (QueryResponse): Response object sent to client

    Raises:
        InternalServerException: If the type of query is unknown.

    Returns:
        dict: The query archive dictionary.
    """
    to_archive = {
        "user_name": user_name,
        "dataset_name": query.dataset_name,
        "dp_library": model_input_to_lib(query),
        "client_input": query.model_dump(),
        "response": response.model_dump(),
        "timestamp": time.time(),
    }  # TODO 359 use model for that one too.

    return to_archive

save_query abstractmethod #

Save queries of user on datasets in a separate collection (table).

Parameters:

Source code in server/lomas_server/admin_database/admin_database.py
422
423
424
425
426
427
428
429
430
431
@abstractmethod
def save_query(self, user_name: str, query: LomasRequestModel, response: QueryResponse) -> None:
    """
    Save queries of user on datasets in a separate collection (table).

    Args:
        user_name (str): name of the user
        query (LomasRequestModel): Request object received from client
        response (QueryResponse): Response object sent to client
    """

wipe abstractmethod #

wipe() -> None

Wipe the entire Database.

Source code in server/lomas_server/admin_database/admin_database.py
433
434
435
@abstractmethod
def wipe(self) -> None:
    """Wipe the entire Database."""

user_must_exist #

user_must_exist(
    func: Callable[Concatenate[DB, str, P], T],
) -> Callable[Concatenate[DB, str, P], T]

Decorator function to verify that a user exists.

Parameters:

  • func #

    (Callable) –

    Function to be decorated. Wrapped function arguments must include: - args[0] (str): username

Raises:

Returns:

  • Callable ( Callable[Concatenate[DB, str, P], T] ) –

    Wrapper function that verifies the user exists before calling func.

Source code in server/lomas_server/admin_database/admin_database.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def user_must_exist(
    func: Callable[Concatenate[DB, str, P], T],
) -> Callable[Concatenate[DB, str, P], T]:
    """
    Decorator function to verify that a user exists.

    Args:
        func (Callable): Function to be decorated.
            Wrapped function arguments must include:
            - args[0] (str): username

    Raises:
        UnauthorizedAccessException: If the user does not exist.

    Returns:
        Callable: Wrapper function that verifies the user exists
            before calling func.
    """

    @wraps(func)
    def wrapper_decorator(self: DB, user_name: str, *args: P.args, **kwargs: P.kwargs) -> T:
        if not self.does_user_exist(user_name):
            raise UnauthorizedAccessException(
                f"User {user_name} does not exist. Please, verify the client object initialisation.",
            )
        return func(self, user_name, *args, **kwargs)

    return wrapper_decorator

dataset_must_exist #

dataset_must_exist(
    func: Callable[Concatenate[DB, str, P], T],
) -> Callable[Concatenate[DB, str, P], T]

Decorator function to verify that a dataset exists.

Parameters:

  • func #

    (Callable) –

    Function to be decorated. Wrapped function arguments must include: - args[0] (str): dataset name

Raises:

Returns:

  • Callable ( Callable[Concatenate[DB, str, P], T] ) –

    Wrapper function that checks if the dataset exists before calling the wrapped function.

Source code in server/lomas_server/admin_database/admin_database.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def dataset_must_exist(
    func: Callable[Concatenate[DB, str, P], T],
) -> Callable[Concatenate[DB, str, P], T]:
    """
    Decorator function to verify that a dataset exists.

    Args:
        func (Callable): Function to be decorated.
            Wrapped function arguments must include:
            - args[0] (str): dataset name

    Raises:
        InvalidQueryException: If the dataset does not exist.

    Returns:
        Callable: Wrapper function that checks if the dataset exists
            before calling the wrapped function.
    """

    @wraps(func)
    def wrapper_decorator(self: DB, dataset_name: str, *args: P.args, **kwargs: P.kwargs) -> T:
        if not self.does_dataset_exist(dataset_name):
            raise InvalidQueryException(
                f"Dataset {dataset_name} does not exist. "
                + "Please, verify the client object initialisation.",
            )
        return func(self, dataset_name, *args, **kwargs)

    return wrapper_decorator

user_must_have_access_to_dataset #

user_must_have_access_to_dataset(
    func: Callable[Concatenate[DB, str, str, P], T],
) -> Callable[Concatenate[DB, str, str, P], T]

Decorator function to enforce a user has access to a dataset.

Parameters:

  • func #

    (Callable) –

    Function to be decorated. Wrapped function arguments must include: - args[0] (str): user name - args[1] (str): dataset name

Raises:

Returns:

  • Callable ( Callable[Concatenate[DB, str, str, P], T] ) –

    Wrapper function that checks if the user has access to the dataset before calling the wrapped function.

Source code in server/lomas_server/admin_database/admin_database.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def user_must_have_access_to_dataset(
    func: Callable[Concatenate[DB, str, str, P], T],
) -> Callable[Concatenate[DB, str, str, P], T]:
    """
    Decorator function to enforce a user has access to a dataset.

    Args:
        func (Callable): Function to be decorated.
            Wrapped function arguments must include:
            - args[0] (str): user name
            - args[1] (str): dataset name

    Raises:
        UnauthorizedAccessException: If the user does not have
            access to the dataset.

    Returns:
        Callable: Wrapper function that checks if the user has access
            to the dataset before calling the wrapped function.
    """

    @wraps(func)
    def wrapper_decorator(
        self: DB, user_name: str, dataset_name: str, *args: P.args, **kwargs: P.kwargs
    ) -> T:
        if not self.has_user_access_to_dataset(user_name, dataset_name):
            raise UnauthorizedAccessException(
                f"{user_name} does not have access to {dataset_name}.",
            )
        return func(self, user_name, dataset_name, *args, **kwargs)

    return wrapper_decorator

Classes:

  • TopDBKey

    Key of the top level collecions.

  • BudgetDBKey

    Key for selecting budget values in admin db for given.

TopDBKey #


              flowchart TD
              lomas_server.admin_database.constants.TopDBKey[TopDBKey]

              

              click lomas_server.admin_database.constants.TopDBKey href "" "lomas_server.admin_database.constants.TopDBKey"
            

Key of the top level collecions.

BudgetDBKey #


              flowchart TD
              lomas_server.admin_database.constants.BudgetDBKey[BudgetDBKey]

              

              click lomas_server.admin_database.constants.BudgetDBKey href "" "lomas_server.admin_database.constants.BudgetDBKey"
            

Key for selecting budget values in admin db for given.

dataset and user.

Classes:

LocalAdminDatabase #


              flowchart TD
              lomas_server.admin_database.local_database.LocalAdminDatabase[LocalAdminDatabase]
              lomas_server.admin_database.admin_database.AdminDatabase[AdminDatabase]

                              lomas_server.admin_database.admin_database.AdminDatabase --> lomas_server.admin_database.local_database.LocalAdminDatabase
                


              click lomas_server.admin_database.local_database.LocalAdminDatabase href "" "lomas_server.admin_database.local_database.LocalAdminDatabase"
              click lomas_server.admin_database.admin_database.AdminDatabase href "" "lomas_server.admin_database.admin_database.AdminDatabase"
            

Local Admin database in a single file.

Methods:

Attributes:

  • path (Path) –

    Database accepts existing path or new (creatable) path.

path instance-attribute #

path: Path

Database accepts existing path or new (creatable) path.

add_datasets_via_yaml #

add_datasets_via_yaml(
    yaml_file: Path | BinaryIO | SpooledTemporaryFile,
    clean: bool,
    path_prefix: Path = Path(),
) -> None

Set all database types to datasets in dataset collection based.

on yaml file.

Parameters:

  • yaml_file #

    (Path | BinaryIO | SpooledTemporaryFile) –

    path to the YAML file location

  • clean #

    (bool) –

    Whether to clean the collection before adding.

  • path_prefix #

    (Path, default: Path() ) –

    Prefix to add to all file paths. Defaults to empty Path.

Raises:

  • ValueError

    If there are errors in the YAML file format.

Returns:

  • None

    None

Source code in server/lomas_server/admin_database/local_database.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
@db_span("db.add_datasets_via_yaml", table="admin-db")
def add_datasets_via_yaml(
    self,
    yaml_file: Path | BinaryIO | SpooledTemporaryFile,
    clean: bool,
    path_prefix: Path = Path(),
) -> None:
    """Set all database types to datasets in dataset collection based.

    on yaml file.

    Args:
        yaml_file (Path|BinaryIO|SpooledTemporaryFile): path to the YAML file location
        clean (bool): Whether to clean the collection before adding.
        path_prefix (Path, optional): Prefix to add to all file paths. Defaults to empty Path.

    Raises:
        ValueError: If there are errors in the YAML file format.

    Returns:
        None
    """
    if clean:
        self.drop_collection("datasets")

    match yaml_file:
        case Path():
            yaml_dict = yaml.safe_load(yaml_file.resolve().open(encoding="utf-8"))
        case BinaryIO() | SpooledTemporaryFile():
            yaml_dict = yaml.safe_load(yaml_file)
    self.load_dataset_collection(DatasetsCollection(**yaml_dict).datasets, path_prefix)

add_dataset #

add_dataset(
    dataset_name: str,
    database_type: str,
    metadata_database_type: str,
    dataset_path: str | None = "",
    metadata_path: str = "",
    bucket: str | None = "",
    key: str | None = "",
    endpoint_url: str | None = "",
    credentials_name: str | None = "",
    metadata_bucket: str | None = "",
    metadata_key: str | None = "",
    metadata_endpoint_url: str | None = "",
    metadata_access_key_id: str | None = "",
    metadata_secret_access_key: str | None = "",
    metadata_credentials_name: str | None = "",
) -> None

Set a database type to a dataset in dataset collection.

Parameters:

  • dataset_name #

    (str) –

    Dataset name

  • database_type #

    (str) –

    Type of the database

  • metadata_database_type #

    (str) –

    Metadata database type

  • dataset_path #

    (str, default: '' ) –

    Path to the dataset (for local db type)

  • metadata_path #

    (str, default: '' ) –

    Path to metadata (for local db type)

  • bucket #

    (str, default: '' ) –

    S3 bucket name

  • key #

    (str, default: '' ) –

    S3 key

  • endpoint_url #

    (str, default: '' ) –

    S3 endpoint URL

  • credentials_name #

    (str, default: '' ) –

    The name of the credentials in the server config to retrieve the dataset from S3 storage.

  • metadata_bucket #

    (str, default: '' ) –

    Metadata S3 bucket name

  • metadata_key #

    (str, default: '' ) –

    Metadata S3 key

  • metadata_endpoint_url #

    (str, default: '' ) –

    Metadata S3 endpoint URL

  • metadata_access_key_id #

    (str, default: '' ) –

    Metadata AWS access key ID

  • metadata_secret_access_key #

    (str, default: '' ) –

    Metadata AWS secret access key

  • metadata_credentials_name #

    (str, default: '' ) –

    The name of the credentials in the server config for retrieving the metadata.

Raises:

  • ValueError

    If the dataset already exists or if the database type is unknown.

Returns:

  • None

    None

Source code in server/lomas_server/admin_database/local_database.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
@db_span("db.add_dataset", table="admin-db")
def add_dataset(
    self,
    dataset_name: str,
    database_type: str,
    metadata_database_type: str,
    dataset_path: str | None = "",
    metadata_path: str = "",
    bucket: str | None = "",
    key: str | None = "",
    endpoint_url: str | None = "",
    credentials_name: str | None = "",
    metadata_bucket: str | None = "",
    metadata_key: str | None = "",
    metadata_endpoint_url: str | None = "",
    metadata_access_key_id: str | None = "",
    metadata_secret_access_key: str | None = "",
    metadata_credentials_name: str | None = "",
) -> None:
    """Set a database type to a dataset in dataset collection.

    Args:
        dataset_name (str): Dataset name
        database_type (str): Type of the database
        metadata_database_type (str): Metadata database type

        dataset_path (str): Path to the dataset (for local db type)
        metadata_path (str): Path to metadata (for local db type)

        bucket (str): S3 bucket name
        key (str): S3 key
        endpoint_url (str): S3 endpoint URL
        credentials_name (str): The name of the credentials in the\
            server config to retrieve the dataset from S3 storage.
        metadata_bucket (str): Metadata S3 bucket name
        metadata_key (str): Metadata S3 key
        metadata_endpoint_url (str): Metadata S3 endpoint URL
        metadata_access_key_id (str): Metadata AWS access key ID
        metadata_secret_access_key (str): Metadata AWS secret access key
        metadata_credentials_name (str): The name of the credentials in the\
            server config for retrieving the metadata.

    Raises:
        ValueError: If the dataset already exists
                    or if the database type is unknown.

    Returns:
        None
    """
    ADMINDB_INSERT_COUNTER.add(1, {"operation": "add_dataset"})
    # Step 1: Build dataset
    dataset: dict[str, Any] = {"dataset_name": dataset_name}

    dataset_access: dict[str, Any] = {
        "database_type": database_type,
    }

    if database_type == PrivateDatabaseType.PATH:
        if dataset_path is None:
            ADMINDB_ERROR_COUNTER.add(1, {"operation": "datasetpath_error"})
            raise ValueError("Dataset path not set.")
        dataset_access["path"] = dataset_path
    elif database_type == PrivateDatabaseType.S3:
        dataset_access["bucket"] = bucket
        dataset_access["key"] = key
        dataset_access["endpoint_url"] = endpoint_url
        dataset_access["credentials_name"] = credentials_name
    else:
        ADMINDB_ERROR_COUNTER.add(1, {"operation": "database_type_error"})
        raise ValueError(f"Unknown database type {database_type}")

    dataset["dataset_access"] = dataset_access

    # Step 2: Build metadata
    metadata_access: dict[str, Any] = {"database_type": metadata_database_type}
    if metadata_database_type == PrivateDatabaseType.PATH:
        # Store metadata to metadata collection
        metadata_dict = json.loads(Path(metadata_path).resolve().read_text(encoding="utf-8"))
        metadata_access["path"] = metadata_path

    elif metadata_database_type == PrivateDatabaseType.S3:
        client = boto3.client(
            "s3",
            endpoint_url=metadata_endpoint_url,
            aws_access_key_id=metadata_access_key_id,
            aws_secret_access_key=metadata_secret_access_key,
        )
        response = client.get_object(Bucket=metadata_bucket, Key=metadata_key)
        try:
            metadata_dict = json.loads(response["Body"].read().decode("utf-8"))
        except yaml.YAMLError as e:
            raise e

        metadata_access["bucket"] = metadata_bucket
        metadata_access["key"] = metadata_key
        metadata_access["endpoint_url"] = metadata_endpoint_url
        metadata_access["credentials_name"] = metadata_credentials_name

    else:
        ADMINDB_ERROR_COUNTER.add(1, {"operation": "metadata_db_type_error"})
        raise ValueError(f"Unknown database type {metadata_database_type}")

    dataset["metadata_access"] = metadata_access

    # Step 3: Validate
    ds_info = DSInfo.model_validate(dataset)
    validated_dataset = ds_info.model_dump()
    validated_metadata = TableMetadata.from_dict(metadata_dict).model_dump()

    # Step 4: Insert into db
    with shelve.open(self.path, writeback=True) as db:
        db[TK.DATASETS] = [*db.get(TK.DATASETS, []), validated_dataset]
        db[TK.METADATA] = db.get(TK.METADATA, {}) | {dataset_name: validated_metadata}
        db.sync()

add_users_via_yaml #

add_users_via_yaml(
    yaml_file: Path | BinaryIO | SpooledTemporaryFile, clean: bool
) -> None

Add all users from yaml file to the user collection.

Parameters:

  • yaml_file #

    (Path) –

    a path to the YAML file location

  • clean #

    (bool) –

    boolean flag True if drop current user collection False if keep current user collection

Returns:

  • None

    None

Source code in server/lomas_server/admin_database/local_database.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
@db_span("db.add_users_via_yaml", table="admin-db")
def add_users_via_yaml(self, yaml_file: Path | BinaryIO | SpooledTemporaryFile, clean: bool) -> None:
    """Add all users from yaml file to the user collection.

    Args:
        yaml_file (Path): a path to the YAML file location
        clean (bool): boolean flag
            True if drop current user collection
            False if keep current user collection

    Returns:
        None
    """
    if clean:
        self.drop_collection("users")

    # Load yaml data and insert it
    match yaml_file:
        case Path():
            yaml_dict = yaml.safe_load(yaml_file.resolve().open(encoding="utf-8"))
        case BinaryIO() | SpooledTemporaryFile():
            yaml_dict = yaml.safe_load(yaml_file)
    self.load_users_collection(UserCollection(**yaml_dict).users)

add_user #

add_user(
    username: str,
    email: str,
    dataset_name: str | None = None,
    epsilon: float = 0.0,
    delta: float = 0.0,
) -> None

Add new user in users collection with default values for all fields.

Parameters:

  • username #

    (str) –

    username to be added

  • email #

    (str) –

    email to be added

Raises:

  • ValueError

    If the username already exists.

  • WriteConcernError

    If the result is not acknowledged.

Returns:

  • None

    None

Source code in server/lomas_server/admin_database/local_database.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
@db_span("db.add_user", table="admin-db")
def add_user(
    self,
    username: str,
    email: str,
    dataset_name: str | None = None,
    epsilon: float = 0.0,
    delta: float = 0.0,
) -> None:
    """Add new user in users collection with default values for all fields.

    Args:
        username (str): username to be added
        email (str): email to be added

    Raises:
        ValueError: If the username already exists.
        WriteConcernError: If the result is not acknowledged.

    Returns:
        None
    """
    ADMINDB_INSERT_COUNTER.add(1, {"operation": "add_user"})
    validated_user = User(
        id=UserId(name=username, email=email),
        may_query=True,
        datasets_list=(
            []
            if dataset_name is None
            else [DatasetOfUser(dataset_name=dataset_name, initial_epsilon=epsilon, initial_delta=delta)]
        ),
    ).model_dump()

    with shelve.open(self.path, writeback=True) as db:
        if "users" not in db:
            db[TK.USERS] = {}
        db[TK.USERS][username] = validated_user

set_may_user_query #

set_may_user_query(user_name: str, may_query: bool) -> None

Sets if a user may query the server..

(Set False before querying and True after updating budget)

Wrapped by user_must_exist.

Parameters:

  • user_name #

    (str) –

    name of the user

  • may_query #

    (bool) –

    flag give or remove access to user

Source code in server/lomas_server/admin_database/admin_database.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@user_must_exist
def set_may_user_query(self, user_name: str, may_query: bool) -> None:
    """
    Sets if a user may query the server..

    (Set False before querying and True after updating budget)

    Wrapped by [user_must_exist][lomas_server.admin_database.admin_database.user_must_exist].

    Args:
        user_name (str): name of the user
        may_query (bool): flag give or remove access to user
    """
    _ = self.get_and_set_may_user_query(user_name, may_query)

get_total_spent_budget #

get_total_spent_budget(user_name: str, dataset_name: str) -> list[float]

Get the total spent epsilon and delta spent by user on dataset.

Wrapped by user_must_have_access_to_dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

Returns:

  • list[float]

    List[float]: The first value of the list is the epsilon value, the second value is the delta value.

Source code in server/lomas_server/admin_database/admin_database.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@user_must_have_access_to_dataset
def get_total_spent_budget(self, user_name: str, dataset_name: str) -> list[float]:
    """
    Get the total spent epsilon and delta spent by user on dataset.

    Wrapped by [user_must_have_access_to_dataset][lomas_server.admin_database.admin_database.user_must_have_access_to_dataset].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset

    Returns:
        List[float]: The first value of the list is the epsilon value,
            the second value is the delta value.
    """
    return [
        self.get_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.EPSILON_SPENT),
        self.get_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.DELTA_SPENT),
    ]

get_initial_budget #

get_initial_budget(user_name: str, dataset_name: str) -> list[float]

Get the initial epsilon and delta budget.

Wrapped by user_must_have_access_to_dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

Returns:

  • list[float]

    List[float]: The first value of the list is the epsilon value, the second value is the delta value.

Source code in server/lomas_server/admin_database/admin_database.py
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
@user_must_have_access_to_dataset
def get_initial_budget(self, user_name: str, dataset_name: str) -> list[float]:
    """
    Get the initial epsilon and delta budget.

    Wrapped by [user_must_have_access_to_dataset][lomas_server.admin_database.admin_database.user_must_have_access_to_dataset].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset

    Returns:
        List[float]: The first value of the list is the epsilon value,
            the second value is the delta value.
    """
    return [
        self.get_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.EPSILON_INIT),
        self.get_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.DELTA_INIT),
    ]

get_remaining_budget #

get_remaining_budget(user_name: str, dataset_name: str) -> list[float]

Get the remaining epsilon and delta budget (initial - total spent).

Wrapped by user_must_have_access_to_dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

Returns:

  • list[float]

    List[float]: The first value of the list is the epsilon value, the second value is the delta value.

Source code in server/lomas_server/admin_database/admin_database.py
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
@user_must_have_access_to_dataset
def get_remaining_budget(self, user_name: str, dataset_name: str) -> list[float]:
    """
    Get the remaining epsilon and delta budget (initial - total spent).

    Wrapped by [user_must_have_access_to_dataset][lomas_server.admin_database.admin_database.user_must_have_access_to_dataset].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset

    Returns:
        List[float]: The first value of the list is the epsilon value,
            the second value is the delta value.
    """
    init_eps, init_delta = self.get_initial_budget(user_name, dataset_name)
    spent_eps, spent_delta = self.get_total_spent_budget(user_name, dataset_name)
    return [init_eps - spent_eps, init_delta - spent_delta]

update_epsilon #

update_epsilon(user_name: str, dataset_name: str, spent_epsilon: float) -> None

Update spent epsilon by user with total spent epsilon.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

  • spent_epsilon #

    (float) –

    value of epsilon spent on last query

Source code in server/lomas_server/admin_database/admin_database.py
317
318
319
320
321
322
323
324
325
326
def update_epsilon(self, user_name: str, dataset_name: str, spent_epsilon: float) -> None:
    """
    Update spent epsilon by user with total spent epsilon.

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset
        spent_epsilon (float): value of epsilon spent on last query
    """
    return self.update_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.EPSILON_SPENT, spent_epsilon)

update_delta #

update_delta(user_name: str, dataset_name: str, spent_delta: float) -> None

Update spent delta spent by user with spent delta of the user.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

  • spent_delta #

    (float) –

    value of delta spent on last query

Source code in server/lomas_server/admin_database/admin_database.py
328
329
330
331
332
333
334
335
336
337
def update_delta(self, user_name: str, dataset_name: str, spent_delta: float) -> None:
    """
    Update spent delta spent by user with spent delta of the user.

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset
        spent_delta (float): value of delta spent on last query
    """
    self.update_epsilon_or_delta(user_name, dataset_name, BudgetDBKey.DELTA_SPENT, spent_delta)

update_budget #

Update current epsilon and delta delta spent by user.

Wrapped by user_must_have_access_to_dataset.

Parameters:

  • user_name #

    (str) –

    name of the user

  • dataset_name #

    (str) –

    name of the dataset

  • spent_epsilon #

    (float) –

    value of epsilon spent on last query

  • spent_delta #

    (float) –

    value of delta spent on last query

Source code in server/lomas_server/admin_database/admin_database.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
@user_must_have_access_to_dataset
def update_budget(
    self,
    user_name: str,
    dataset_name: str,
    spent_epsilon: float,
    spent_delta: float,
) -> None:
    """
    Update current epsilon and delta delta spent by user.

    Wrapped by [user_must_have_access_to_dataset][lomas_server.admin_database.admin_database.user_must_have_access_to_dataset].

    Args:
        user_name (str): name of the user
        dataset_name (str): name of the dataset
        spent_epsilon (float): value of epsilon spent on last query
        spent_delta (float): value of delta spent on last query
    """
    self.update_epsilon(user_name, dataset_name, spent_epsilon)
    self.update_delta(user_name, dataset_name, spent_delta)

prepare_save_query #

Prepare the query to save in archives.

Parameters:

Raises:

  • InternalServerException

    If the type of query is unknown.

Returns:

  • dict ( dict ) –

    The query archive dictionary.

Source code in server/lomas_server/admin_database/admin_database.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
def prepare_save_query(self, user_name: str, query: LomasRequestModel, response: QueryResponse) -> dict:
    """
    Prepare the query to save in archives.

    Args:
        user_name (str): name of the user
        query (LomasRequestModel): Request object received from client
        response (QueryResponse): Response object sent to client

    Raises:
        InternalServerException: If the type of query is unknown.

    Returns:
        dict: The query archive dictionary.
    """
    to_archive = {
        "user_name": user_name,
        "dataset_name": query.dataset_name,
        "dp_library": model_input_to_lib(query),
        "client_input": query.model_dump(),
        "response": response.model_dump(),
        "timestamp": time.time(),
    }  # TODO 359 use model for that one too.

    return to_archive

Functions:

  • main

    Main function for the streamlit lomas dashboard.

  • about

    About page.

main #

main() -> None

Main function for the streamlit lomas dashboard.

Source code in server/lomas_server/administration/dashboard/about.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def main() -> None:
    """Main function for the streamlit lomas dashboard."""
    page = st.navigation(
        [
            st.Page(about, title="Home"),
            st.Page(
                "database_administration.py",
                title="Database",
            ),
        ]
    )
    # Sidebar common to all page
    with st.sidebar:
        if not st.user.get("is_logged_in"):
            if st.button("Log in"):
                st.login()
        else:
            st.write(f"**{st.user.name}**")
            if st.button("Log out", type="primary"):
                st.logout()

    page.run()

about #

about() -> None

About page.

Source code in server/lomas_server/administration/dashboard/about.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def about() -> None:
    """About page."""
    st.set_page_config(page_title="Lomas Dashboard")

    st.title("Welcome!")

    st.header("Lomas Administration Dashboard")
    description = """
        The Lomas Administration Dashboard provides a centralized interface for managing various aspects of your server and database.
        Whether you need to monitor server status, manage user accounts, or administer datasets, this dashboard offers a convenient way to do so.
    """
    st.write(description)

    st.header("Key Features")

    features = """
        - **Server Overview**: Quickly check the status of your server, including live status and configuration details.
        - **Admin Database Management**: Effortlessly manage users and datasets through intuitive interfaces.
        - **User Management**: Add, modify, or delete user accounts, set budget parameters, and control user permissions.
        - **Dataset Management**: Add, remove, or modify datasets and associated metadata with ease.
        - **View Database Content**: Dive deep into the database to view detailed information about users, datasets, metadata, and archives.
        - **Delete Content (DANGEROUS)**: Safely delete users, datasets, metadata, or entire collections when necessary.
        """
    st.write(features)

    # Additional resources
    st.header("Resources")

    doc = (
        "**Documentation**: [server documentation]"
        "(https://dscc-admin-ch.github.io/lomas-docs/lomas_server.admin_database.html)"
    )
    st.write(doc)
    support = (
        "**Support**: If you encounter any issues or have questions, reach out on [Github issues]"
        "(https://github.com/dscc-admin-ch/lomas/issues)"
    )

    st.write(support)

    # Server Status
    st.header("Server Status")

    match query_lomas_auth("/state", httpx.get):
        case IOSuccess(Success({"state": state})):
            status = f":green-badge[{state}]"
        case IOSuccess(Success(unexpected)):
            status = f":orange-badge[unexpected state: {unexpected}]"
        case IOFailure(Failure(e)):
            status = f":red-badge[unavailable]: {e}"

    match get_config().map(lambda config: config.server_url):
        case IOSuccess(Success(server_url)):
            st.write(f"{status} at {server_url}")
        case IOFailure(Failure(e)):
            st.error(f"Configuration Error: {e}")

    flow(
        get_config(),
        map_(lambda config: Maybe.from_optional(config.dex_config)),
        bind_result(maybe_to_result),
    ).map(
        lambda _: st.write(
            ":red-badge[Dex is enabled.] Dex is only supported for demo purposes and is not safe for a production environment!"
        )
    )

Functions:

  • call_if_dex

    Gets the Dex config and if it exists, passes it to the provided task.

  • get_datasets

    List all datasets available on the server.

  • get_users

    List all users available on the server.

  • get_user_df

    Get all users into a displayable pandas dataframe.

  • list_users

    List all usernames.

call_if_dex #

call_if_dex(task: Callable[[DexAdminConfig], IOResultE]) -> IOResultE[Maybe[IOResultE]]

Gets the Dex config and if it exists, passes it to the provided task.

Parameters:

Returns:

  • IOResultE[Maybe[IOResultE]]

    IOResultE[Maybe[IOResultE]]: An IOFailure if the task returns a failure or the config cannot be read.

Source code in server/lomas_server/administration/dashboard/utils.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def call_if_dex(task: Callable[[DexAdminConfig], IOResultE]) -> IOResultE[Maybe[IOResultE]]:
    """Gets the Dex config and if it exists, passes it to the provided task.

    Args:
        task (Callable[[DexAdminConfig], IOResultE]): The task to run.

    Returns:
        IOResultE[Maybe[IOResultE]]: An IOFailure if the task returns a failure or the config cannot be read.
    """

    def unwrap_Failure(res: IOResultE[Maybe[IOResultE]]) -> IOResultE[Maybe[IOResultE]]:
        match res:
            case IOSuccess(Success(Some(IOFailure(Failure(e))))):
                return IOFailure(e)
            case _:
                return res

    dex_config_res = flow(
        get_config(),
        map_(lambda config: Maybe.from_optional(config.dex_config)),
        map_(
            map_(task),
        ),
        unwrap_Failure,
    )

    return dex_config_res

get_datasets #

get_datasets() -> IOResultE[list[str]]

List all datasets available on the server.

Source code in server/lomas_server/administration/dashboard/utils.py
100
101
102
def get_datasets() -> IOResultE[list[str]]:
    """List all datasets available on the server."""
    return query_lomas_auth("/datasets", httpx.get)

get_users #

get_users() -> IOResultE[list[User]]

List all users available on the server.

Source code in server/lomas_server/administration/dashboard/utils.py
105
106
107
108
109
def get_users() -> IOResultE[list[User]]:
    """List all users available on the server."""
    return query_lomas_auth("/users", httpx.get).map(
        lambda user_list: list(map(User.model_validate, user_list))
    )

get_user_df #

get_user_df() -> IOResultE[DataFrame]

Get all users into a displayable pandas dataframe.

Source code in server/lomas_server/administration/dashboard/utils.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def get_user_df() -> IOResultE[pd.DataFrame]:
    """Get all users into a displayable pandas dataframe."""
    # Specifying columns ever when users is [] allow subsequent .Name to be safe (and return [])
    columns = ["Name", "Email", "datasets", "dsofuser"]
    return get_users().map(
        lambda users: pd.DataFrame(
            columns=columns,
            data=[
                [
                    u.id.name,
                    u.id.email,
                    [ds.dataset_name for ds in u.datasets_list],
                    pd.DataFrame([ds.model_dump() for ds in u.datasets_list]),
                ]
                for u in users
            ],
        )
    )

list_users #

list_users() -> IO[list[str]]

List all usernames.

Source code in server/lomas_server/administration/dashboard/utils.py
136
137
138
def list_users() -> IO[list[str]]:
    """List all usernames."""
    return get_user_df().map(lambda udf: udf.Name.tolist()).value_or([])

Classes:

  • InvalidDexOperation

    Groups all exceptions for trying to perform invalid Dex rpcs (e.g. adding a user that already exists).

  • DexRPCError

    Groups all Dex rpc errors.

Functions:

InvalidDexOperation #

InvalidDexOperation(error_message: str)

              flowchart TD
              lomas_server.administration.dex.dex_admin.InvalidDexOperation[InvalidDexOperation]

              

              click lomas_server.administration.dex.dex_admin.InvalidDexOperation href "" "lomas_server.administration.dex.dex_admin.InvalidDexOperation"
            

Groups all exceptions for trying to perform invalid Dex rpcs (e.g. adding a user that already exists).

Args:description error_message (str): initial error message

Source code in server/lomas_server/administration/dex/dex_admin.py
28
29
30
31
32
33
34
def __init__(self, error_message: str) -> None:
    """Init function.

    Args:_description_
        error_message (str): initial error message
    """
    self.error_message = error_message

DexRPCError #

DexRPCError(error_message: str)

              flowchart TD
              lomas_server.administration.dex.dex_admin.DexRPCError[DexRPCError]

              

              click lomas_server.administration.dex.dex_admin.DexRPCError href "" "lomas_server.administration.dex.dex_admin.DexRPCError"
            

Groups all Dex rpc errors.

Args:description error_message (str): initial error message

Source code in server/lomas_server/administration/dex/dex_admin.py
40
41
42
43
44
45
46
def __init__(self, error_message: str) -> None:
    """Init function.

    Args:_description_
        error_message (str): initial error message
    """
    self.error_message = error_message

get_grpc_channel #

get_grpc_channel(dex_config: DexAdminConfig) -> Channel

Returns a valid grpc channel to use as context.

Note: does not support mTLS yet.

Parameters:

Returns:

  • Channel

    grpc.Channel: A valid grpc channel.

Source code in server/lomas_server/administration/dex/dex_admin.py
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_grpc_channel(dex_config: DexAdminConfig) -> grpc.Channel:
    """Returns a valid grpc channel to use as context.

    Note: does not support mTLS yet.

    Args:
        dex_config (DexAdminConfig): The Dex config

    Returns:
        grpc.Channel: A valid grpc channel.
    """
    assert dex_config.use_mtls is False
    return grpc.insecure_channel(f"{dex_config.url.host}:{dex_config.url.port}")

hash_pwd #

hash_pwd(password: str) -> bytes

Hashes the password string.

repeated Password passwords = 1;

  • password #

    (str) –

    The password string to hash.

Returns:

  • bytes ( bytes ) –

    The password hash.

Source code in server/lomas_server/administration/dex/dex_admin.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def hash_pwd(password: str) -> bytes:
    """Hashes the password string.

    Args:  repeated Password passwords = 1;
        password (str): The password string to hash.

    Returns:
        bytes: The password hash.
    """
    bytes = password.encode("utf-8")
    salt = bcrypt.gensalt()
    hash = bcrypt.hashpw(bytes, salt)

    return hash

to_log #

Util function to sanitize user provided strings before logging.

Parameters:

  • user_provided_str #

    (str) –

    User provided string to sanitize

Returns:

  • str ( str ) –

    The sanitized string.

Source code in server/lomas_server/administration/dex/dex_admin.py
80
81
82
83
84
85
86
87
88
89
def to_log(user_provided_str: str) -> str:
    """Util function to sanitize user provided strings before logging.

    Args:
        user_provided_str (str): User provided string to sanitize

    Returns:
        str: The sanitized string.
    """
    return user_provided_str.replace("\r\n", "").replace("\n", "")

add_dex_user #

Adds a new user to dex.

Parameters:

  • dex_config #

    (DexAdminConfig) –

    The DexAdminConfig

  • user_name #

    (str) –

    The user name

  • user_email #

    (str) –

    The user email

  • user_password #

    (str) –

    The user pasword

Raises:

Source code in server/lomas_server/administration/dex/dex_admin.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@impure_safe
def add_dex_user(dex_config: DexAdminConfig, user_name: str, user_email: str, user_password: str) -> str:
    """Adds a new user to dex.

    Args:
        dex_config (DexAdminConfig): The DexAdminConfig
        user_name (str): The user name
        user_email (str): The user email
        user_password (str): The user pasword

    Raises:
        InvalidDexOperation: If the user already exists.
        e: grpc.RpcError
    """
    with get_grpc_channel(dex_config) as channel:
        stub = DexStub(channel)

        new_pwd = Password(
            email=user_email, hash=hash_pwd(user_password), username=user_name, user_id=str(uuid.uuid4())
        )

        res = stub.CreatePassword(CreatePasswordReq(password=new_pwd))

        if res.already_exists:
            msg = f"Failed to add user {to_log(user_name)} with email {to_log(user_email)}. User or email already exists."
            logger.error(msg)
            raise InvalidDexOperation(msg)

        logger.debug(f"Added {to_log(user_name)} user.")
        return user_name

del_dex_user #

Removes the dex user.

Parameters:

  • dex_config #

    (DexAdminConfig) –

    The DexAdminConfig

  • user_name #

    (str) –

    The name of the user to remove.

Raises:

Source code in server/lomas_server/administration/dex/dex_admin.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
@impure_safe
def del_dex_user(dex_config: DexAdminConfig, user_name: str) -> bool:
    """Removes the dex user.

    Args:
        dex_config (DexAdminConfig): The DexAdminConfig
        user_name (str): The name of the user to remove.

    Raises:
        InvalidDexOperation: If the user does not exist.
        e: grpc.RpcError
    """
    with get_grpc_channel(dex_config) as channel:
        stub = DexStub(channel)

        dex_users = stub.ListPasswords(ListPasswordReq()).passwords

        for user in dex_users:
            if user.username == user_name:
                res = stub.DeletePassword(DeletePasswordReq(email=user.email))
                logger.debug(f"Deleted dex user {user_name}")
                return not res.not_found

        logger.error(f"User {user_name} does not exist.")
        raise InvalidDexOperation(f"Cannot delete. User does not exist {user_name}")

del_all_dex_users #

del_all_dex_users(dex_config: DexAdminConfig) -> None

Removes all dex users.

Parameters:

Raises:

  • RpcError

    If any of the calls to dex fails

Source code in server/lomas_server/administration/dex/dex_admin.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@impure_safe
def del_all_dex_users(dex_config: DexAdminConfig) -> None:
    """Removes all dex users.

    Args:
        dex_config (DexAdminConfig): The DexAdminConfig

    Raises:
        grpc.RpcError: If any of the calls to dex fails
    """
    with get_grpc_channel(dex_config) as channel:
        stub = DexStub(channel)
        dex_users = stub.ListPasswords(ListPasswordReq()).passwords

        for user in dex_users:
            stub.DeletePassword(DeletePasswordReq(email=user.email))
            logger.debug(f"Deleted dex user {user.username}")

    logger.debug("Removed all dex users.")

add_dex_users #

Adds new lomas users to Dex.

Iterates over user_list and creates password entries in Dex for each user. If clean is True all existing dex users are removed first. If overwrite is True (and not clean) any existing dex users with the same username are removed before creating the new entry.

Parameters:

  • dex_config #

    (DexAdminConfig) –

    A DexAdminConfig

  • user_list #

    (UserCollection) –

    Collection to load the users from

  • clean #

    (bool) –

    Whether to remove existing users and start with a clean state.

  • overwrite #

    (bool) –

    Whether to overwrite existing users.

Raises:

  • InvalidDexOperation

    If a user does not have a password or the user already exists and should not be overwritten.

  • RpcError

    If any of the calls to dex fails

Source code in server/lomas_server/administration/dex/dex_admin.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
@impure_safe
def add_dex_users(
    dex_config: DexAdminConfig,
    user_list: UserCollection,
    clean: bool,
    overwrite: bool,
) -> list[str]:
    """Adds new lomas users to Dex.

    Iterates over `user_list` and creates password entries in Dex for each user.
    If `clean` is True all existing dex users are removed first. If `overwrite`
    is True (and not `clean`) any existing dex users with the same username are
    removed before creating the new entry.

    Args:
        dex_config (DexAdminConfig): A DexAdminConfig
        user_list (UserCollection): Collection to load the users from
        clean (bool): Whether to remove existing users and start with a clean state.
        overwrite(bool): Whether to overwrite existing users.

    Raises:
        InvalidDexOperation: If a user does not have a password or the user already exists and should not be overwritten.
        grpc.RpcError: If any of the calls to dex fails
    """
    # Remove all existing users if requested
    if clean:
        del_all_dex_users(dex_config)

    with get_grpc_channel(dex_config) as channel:
        stub = DexStub(channel)
        dex_users = stub.ListPasswords(ListPasswordReq()).passwords

        added_user_result: list[IOResultE] = []
        for user in user_list.users:
            # Remove user with same name if requested (but not already cleaned)
            if overwrite and not clean:
                for dex_user in dex_users:
                    if dex_user.username == user.id.name:
                        stub.DeletePassword(DeletePasswordReq(email=dex_user.email))
                        logger.debug(f"Removed existing dex user {user.id.name} due to overwrite flag")

            # Create the dex user
            # TODO replace client_secret with password
            if user.id.client_secret is None:
                logger.error(f"Cannot add Dex user {user.id.name} without password")
                raise InvalidDexOperation(f"Cannot add Dex user {user.id.name} without password")

            added_user = add_dex_user(dex_config, user.id.name, user.id.email, user.id.client_secret)
            added_user_result.append(added_user)
        logger.debug("Added dex users from user collection.")
        return Fold.collect(added_user_result, IOSuccess(()))  # list[IOResultE] -> IOResultE[list]

add_dex_users_via_yaml #

add_dex_users_via_yaml(
    dex_config: DexAdminConfig, yaml_file: Path, clean: bool, overwrite: bool
) -> IOResultE[list[str]]

Adds new lomas users to Dex from a YAML file.

Parameters:

  • dex_config #

    (DexAdminConfig) –

    A DexAdminConfig

  • yaml_file #

    (Path | BytesIO) –

    File name to load the users from

  • clean #

    (bool) –

    Whether to remove existing users and start with a clean state.

  • overwrite #

    (bool) –

    Whether to overwrite existing users.

Source code in server/lomas_server/administration/dex/dex_admin.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def add_dex_users_via_yaml(
    dex_config: DexAdminConfig,
    yaml_file: Path,
    clean: bool,
    overwrite: bool,
) -> IOResultE[list[str]]:
    """Adds new lomas users to Dex from a YAML file.

    Args:
        dex_config (DexAdminConfig): A DexAdminConfig
        yaml_file (Path | BytesIO): File name to load the users from
        clean (bool): Whether to remove existing users and start with a clean state.
        overwrite(bool): Whether to overwrite existing users.
    """
    user_list = UserCollection(**yaml.safe_load(yaml_file.resolve().open(encoding="utf-8")))
    return add_dex_users(dex_config, user_list, clean, overwrite)

set_dex_user_password #

set_dex_user_password(
    dex_config: DexAdminConfig, user_name: str, new_password: str
) -> bool

Sets the new user password to the Dex user.

Parameters:

  • dex_config #

    (DexAdminConfig) –

    The dex admin config.

  • user_name #

    (str) –

    The user name for which to change the password.

  • new_password #

    (str) –

    The new password to set.

Raises:

  • InvalidDexOperation

    If the user does not exist.

  • RpcError

    If any of the calls to dex fails

Source code in server/lomas_server/administration/dex/dex_admin.py
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
@impure_safe
def set_dex_user_password(dex_config: DexAdminConfig, user_name: str, new_password: str) -> bool:
    """Sets the new user password to the Dex user.

    Args:
        dex_config (DexAdminConfig): The dex admin config.
        user_name (str): The user name for which to change the password.
        new_password (str): The new password to set.

    Raises:
        InvalidDexOperation: If the user does not exist.
        grpc.RpcError: If any of the calls to dex fails
    """
    with get_grpc_channel(dex_config) as channel:
        stub = DexStub(channel)
        dex_users = stub.ListPasswords(ListPasswordReq()).passwords

        for dex_user in dex_users:
            if dex_user.username == user_name:
                response = stub.UpdatePassword(
                    UpdatePasswordReq(
                        email=dex_user.email, new_hash=hash_pwd(new_password), new_username=user_name
                    )
                )
                logger.debug(f"Updated password for user {to_log(user_name)}.")
                return not response.not_found

        raise InvalidDexOperation(
            f"Failed to update user password for {to_log(user_name)}. User does not exist."
        )

Classes:

Functions:

DemoAdminConfig #


              flowchart TD
              lomas_server.administration.scripts.lomas_demo_setup.DemoAdminConfig[DemoAdminConfig]
              lomas_server.models.config.AdminConfig[AdminConfig]

                              lomas_server.models.config.AdminConfig --> lomas_server.administration.scripts.lomas_demo_setup.DemoAdminConfig
                


              click lomas_server.administration.scripts.lomas_demo_setup.DemoAdminConfig href "" "lomas_server.administration.scripts.lomas_demo_setup.DemoAdminConfig"
              click lomas_server.models.config.AdminConfig href "" "lomas_server.models.config.AdminConfig"
            

Extension of Admin config for demo setup.

add_lomas_demo_data #

add_lomas_demo_data(config: DemoAdminConfig) -> IOResultE

Adds the demo data to the admindb as well as the keycloak instance if required.

Meant to be used in the develop mode of the service or for testing

Parameters:

Source code in server/lomas_server/administration/scripts/lomas_demo_setup.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def add_lomas_demo_data(config: DemoAdminConfig) -> IOResultE:
    """
    Adds the demo data to the admindb as well as the keycloak instance if required.

    Meant to be used in the develop mode of the service or for testing

    Args:
        config (AdminConfig): The administration config.
    """
    pprint("Creating user collection from Config")
    pprint(config)

    add_users: IOResultE = query_lomas(
        "/usersfile",
        httpx.post,
        json={"clean": True},
        files={"file": config.user_yaml.open(mode="rb")},
        headers={"Authorization": f"Bearer {config.bootstrap}"},
    )

    add_dex_users: IOResultE = flow(
        config.dex_config,  # DexAdminConfig | None
        Maybe.from_optional,  # Maybe[DexAdminConfig]
        map_(  # Maybe[IOResultE]
            partial(  # DexAdminConfig -> IOResultE
                add_dex_users_via_yaml, yaml_file=config.user_yaml, clean=False, overwrite=True
            )
        ),
    ).value_or(IOSuccess("No Dex config"))

    pprint("Creating datasets and metadata collection")
    add_datasets: IOResultE = query_lomas(
        "/dataset/bulk",
        httpx.post,
        json={"clean": True},
        files={"file": config.dataset_yaml.open(mode="rb")},
        headers={"Authorization": f"Bearer {config.bootstrap}"},
    )

    pprint("Empty archives")
    delete_archives: IOResultE = query_lomas(
        f"/collections/{TK.ARCHIVE}",
        httpx.delete,
        headers={"Authorization": f"Bearer {config.bootstrap}"},
    )

    return Fold.collect([add_users, add_dex_users, add_datasets, delete_archives], IOSuccess(()))

lomas_demo_setup #

lomas_demo_setup() -> int

Script for setting up demo users and dataset.

Returns:

  • int ( int ) –

    the return code used by sys.exit (0 for success 1 or other for failure)

Source code in server/lomas_server/administration/scripts/lomas_demo_setup.py
87
88
89
90
91
92
93
94
95
96
97
98
99
def lomas_demo_setup() -> int:
    """Script for setting up demo users and dataset.

    Returns:
        int: the return code used by sys.exit (0 for success 1 or other for failure)
    """
    demo_config = DemoAdminConfig()
    match add_lomas_demo_data(demo_config):
        case IOSuccess(_):
            return Status.EX_OK
        case IOFailure(Failure(e)):
            return e
    return Status.EX_IOERR

Functions:

  • lifespan

    Lifespan function for the server.

lifespan async #

lifespan(lomas_app: FastAPI) -> AsyncGenerator[None]

Lifespan function for the server.

This function is executed once on server startup, yields and finishes running at server shutdown.

Server initialization is performed (config loading, etc.) and the server state is updated accordingly. This can have potential side effects on the return values of the "depends" functions, which check the server state.

Source code in server/lomas_server/app.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@asynccontextmanager
async def lifespan(lomas_app: FastAPI) -> AsyncGenerator[None]:
    """
    Lifespan function for the server.

    This function is executed once on server startup, yields and
    finishes running at server shutdown.

    Server initialization is performed (config loading, etc.) and
    the server state is updated accordingly. This can have potential
    side effects on the return values of the "depends"
    functions, which check the server state.
    """
    # Load Config
    config = Config()

    # Set some app state
    lomas_app.state.jobs = {}

    # Load admin database
    try:
        logger.info("Loading admin database")
        lomas_app.state.admin_database = LocalAdminDatabase(path=config.admin_database_url)
        logger.info("Loading authenticator")
        lomas_app.state.authenticator = config.authenticator
        lomas_app.state.bootstrap = config.bootstrap
        lomas_app.state.private_db_credentials = config.private_db_credentials
    except InternalServerException as e:
        logger.exception(f"Failed at startup: {e!s}")

    # Set DP Libraries config
    set_opendp_features_config(config.opendp_features)

    async with rabbitmq_ctx(lomas_app):
        yield  # lomas_app is handling requests

Classes:

  • FreePassAuthenticator

    Authenticator that Bypass Auth.

  • OIDCAuthenticator

    Authenticator that identifies users by either validating the provided JWT token querying the userinfo endpoint.

Functions:

  • get_user_id

    Extracts user id from bearer token.

  • authorize_user

    Raises an UnauthorizedAccessExpection if the user does not have the permission for the given scopes.

FreePassAuthenticator #


              flowchart TD
              lomas_server.auth.auth.FreePassAuthenticator[FreePassAuthenticator]

              

              click lomas_server.auth.auth.FreePassAuthenticator href "" "lomas_server.auth.auth.FreePassAuthenticator"
            

Authenticator that Bypass Auth.

OIDCAuthenticator #


              flowchart TD
              lomas_server.auth.auth.OIDCAuthenticator[OIDCAuthenticator]

              

              click lomas_server.auth.auth.OIDCAuthenticator href "" "lomas_server.auth.auth.OIDCAuthenticator"
            

Authenticator that identifies users by either validating the provided JWT token querying the userinfo endpoint.

Attributes:

authentication_type instance-attribute #

authentication_type: Literal[OIDC]

The OpenId connect provider's discovery url.

oidc_discovery_url instance-attribute #

oidc_discovery_url: HttpUrl

Whether to use the access token to query userinfo endpoint.

If false, access token is parsed as jwt.

oidc_config cached property #

oidc_config: OIDCConfig

Returns the oidc provider config.

jwk_client cached property #

jwk_client: PyJWKClient

Initializes instance PyJWKClient with caching.

get_user_id #

get_user_id(
    authenticator: AuthenticatorT, security_scopes: SecurityScopes, credentials: str
) -> UserId

Extracts user id from bearer token.

Fails if user does not have scope.

Parameters:

  • authenticator #

    (AuthenticatorT) –

    A valid authenticator (FreePassAuthenticator or OIDC Authenticator)

  • security_scopes #

    (SecurityScopes) –

    The required scopes for the endpoint.

  • credentials #

    (str) –

    Authorization credentials.

Returns:

  • UserId ( UserId ) –

    The UserId object containing user infos.

Source code in server/lomas_server/auth/auth.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def get_user_id(
    authenticator: AuthenticatorT,
    security_scopes: SecurityScopes,
    credentials: str,
) -> UserId:
    """Extracts user id from bearer token.

    Fails if user does not have scope.

    Args:
        authenticator (AuthenticatorT): A valid authenticator (FreePassAuthenticator or OIDC Authenticator)
        security_scopes (SecurityScopes): The required scopes for the endpoint.
        credentials (str): Authorization credentials.

    Returns:
        UserId: The UserId object containing user infos.
    """
    match authenticator:
        case FreePassAuthenticator():
            try:
                user = UserId(name=credentials, email="free@pass.com")
            except Exception as e:
                raise UnauthorizedAccessException("Failed bearer token verification.") from e

        case OIDCAuthenticator():
            try:
                # Get userfinfo from userinfo endpoint or jwt token
                if authenticator.query_userinfo:
                    response = requests.get(
                        url=str(authenticator.oidc_config.userinfo_endpoint),
                        headers={"Authorization": f"Bearer {credentials}"},
                    )
                    response.raise_for_status()
                    userinfo = response.json()

                else:
                    # Extracts kid from JWT and fetches corresponding key from keycloak (or cache).
                    key = authenticator.jwk_client.get_signing_key_from_jwt(credentials)
                    # Decodes and validates JWT
                    # Note: audience is set to lomas client because it receives the token from IdP. Not all IdP support multi-audience.
                    userinfo = jwt.decode(credentials, key=key, audience=OIDC_LOMAS_CLIENT__CLIENT_ID)

                user = UserId(
                    name=userinfo[
                        OIDCClaims.USER_NAME
                    ],  # TODO make pydantic model or parametrize claim name?
                    email=userinfo[OIDCClaims.USER_EMAIL],
                )

            except UnauthorizedAccessException as e:
                raise e
            except Exception as e:
                # TODO problematic to add e into error message to client?
                raise UnauthorizedAccessException("Failed bearer token verification.") from e

    return user

authorize_user #

authorize_user(
    user: UserId, admin_database: AdminDatabase, security_scopes: SecurityScopes
) -> None

Raises an UnauthorizedAccessExpection if the user does not have the permission for the given scopes.

Also raises an exception if an unknown scope is required.

Parameters:

  • user #

    (UserId) –

    The user id object

  • admin_database #

    (AdminDatabase) –

    The admin database to get user permissions from.

  • security_scopes #

    (SecurityScopes) –

    The required scopes.

Source code in server/lomas_server/auth/auth.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def authorize_user(user: UserId, admin_database: AdminDatabase, security_scopes: SecurityScopes) -> None:
    """Raises an UnauthorizedAccessExpection if the user does not have the permission for the given scopes.

    Also raises an exception if an unknown scope is required.

    Args:
        user (UserId): The user id object
        admin_database (AdminDatabase): The admin database to get user permissions from.
        security_scopes (SecurityScopes): The required scopes.
    """
    for scope in security_scopes.scopes:
        match scope:
            case Scopes.ADMIN:
                if not admin_database.is_user_admin(user.name):
                    raise UnauthorizedAccessException("Only admin users can query this endpoint.")
            case _:
                # Raise server exception if scope is unknown
                raise InternalServerException(f"Unknown security scope {scope}, cannot authorize query.")

Classes:

OIDCClaims #


              flowchart TD
              lomas_server.constants.OIDCClaims[OIDCClaims]

              

              click lomas_server.constants.OIDCClaims href "" "lomas_server.constants.OIDCClaims"
            

OIDC claim names, also used as claim names in JWT token.

SSynthTableTransStyle #


              flowchart TD
              lomas_server.constants.SSynthTableTransStyle[SSynthTableTransStyle]

              

              click lomas_server.constants.SSynthTableTransStyle href "" "lomas_server.constants.SSynthTableTransStyle"
            

Transformer style for smartnoise synth.

SSynthColumnType #


              flowchart TD
              lomas_server.constants.SSynthColumnType[SSynthColumnType]

              

              click lomas_server.constants.SSynthColumnType href "" "lomas_server.constants.SSynthColumnType"
            

Type of columns for SmartnoiseSynth transformer pre-processing.

OpenDPMeasurement #


              flowchart TD
              lomas_server.constants.OpenDPMeasurement[OpenDPMeasurement]

              

              click lomas_server.constants.OpenDPMeasurement href "" "lomas_server.constants.OpenDPMeasurement"
            

Type of divergence for opendp measurement.

see https://docs.opendp.org/en/stable/api/python/opendp.measurements.html

Classes:

DataConnector #


              flowchart TD
              lomas_server.data_connector.data_connector.DataConnector[DataConnector]

              

              click lomas_server.data_connector.data_connector.DataConnector href "" "lomas_server.data_connector.data_connector.DataConnector"
            

Overall access to sensitive data.

Methods:

get_pandas_df abstractmethod #

get_pandas_df() -> DataFrame

Get the data in pandas dataframe format.

Returns:

  • DataFrame

    pd.DataFrame: The pandas dataframe for this dataset.

Source code in server/lomas_server/data_connector/data_connector.py
45
46
47
48
49
50
51
@abstractmethod
def get_pandas_df(self) -> pd.DataFrame:
    """Get the data in pandas dataframe format.

    Returns:
        pd.DataFrame: The pandas dataframe for this dataset.
    """

get_polars_lf #

get_polars_lf() -> LazyFrame

Get the data in polars lazyframe format.

Returns:

  • LazyFrame

    pl.LazyFrame: The polars lazyframe for this dataset.

Source code in server/lomas_server/data_connector/data_connector.py
53
54
55
56
57
58
59
def get_polars_lf(self) -> pl.LazyFrame:
    """Get the data in polars lazyframe format.

    Returns:
        pl.LazyFrame: The polars lazyframe for this dataset.
    """
    return pl.from_pandas(self.get_pandas_df()).lazy()

Classes:

  • InMemoryConnector

    DataConnector for a dataset created from an in-memory pandas DataFrame.

InMemoryConnector #


              flowchart TD
              lomas_server.data_connector.in_memory_connector.InMemoryConnector[InMemoryConnector]
              lomas_server.data_connector.data_connector.DataConnector[DataConnector]

                              lomas_server.data_connector.data_connector.DataConnector --> lomas_server.data_connector.in_memory_connector.InMemoryConnector
                


              click lomas_server.data_connector.in_memory_connector.InMemoryConnector href "" "lomas_server.data_connector.in_memory_connector.InMemoryConnector"
              click lomas_server.data_connector.data_connector.DataConnector href "" "lomas_server.data_connector.data_connector.DataConnector"
            

DataConnector for a dataset created from an in-memory pandas DataFrame.

Methods:

get_pandas_df #

get_pandas_df() -> DataFrame

Get the data in pandas dataframe format.

Returns:

  • DataFrame

    pd.DataFrame: pandas dataframe of dataset (a copy)

Source code in server/lomas_server/data_connector/in_memory_connector.py
13
14
15
16
17
18
19
20
21
def get_pandas_df(self) -> pd.DataFrame:
    """Get the data in pandas dataframe format.

    Returns:
        pd.DataFrame: pandas dataframe of dataset (a copy)
    """
    assert self.df is not None
    # We use a copy here for safety.
    return self.df.copy()  # pylint: disable=no-member

get_polars_lf #

get_polars_lf() -> LazyFrame

Get the data in polars lazyframe format.

Returns:

  • LazyFrame

    pl.LazyFrame: The polars lazyframe for this dataset.

Source code in server/lomas_server/data_connector/data_connector.py
53
54
55
56
57
58
59
def get_polars_lf(self) -> pl.LazyFrame:
    """Get the data in polars lazyframe format.

    Returns:
        pl.LazyFrame: The polars lazyframe for this dataset.
    """
    return pl.from_pandas(self.get_pandas_df()).lazy()

Classes:

  • PathConnector

    DataConnector for dataset located at constant path.

PathConnector #


              flowchart TD
              lomas_server.data_connector.path_connector.PathConnector[PathConnector]
              lomas_server.data_connector.data_connector.DataConnector[DataConnector]

                              lomas_server.data_connector.data_connector.DataConnector --> lomas_server.data_connector.path_connector.PathConnector
                


              click lomas_server.data_connector.path_connector.PathConnector href "" "lomas_server.data_connector.path_connector.PathConnector"
              click lomas_server.data_connector.data_connector.DataConnector href "" "lomas_server.data_connector.data_connector.DataConnector"
            

DataConnector for dataset located at constant path.

Path can be local or remote (http).

Methods:

get_pandas_df #

get_pandas_df() -> DataFrame

Get the data in pandas dataframe format.

Raises:

Returns:

  • DataFrame

    pd.DataFrame: pandas dataframe of dataset

Source code in server/lomas_server/data_connector/path_connector.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def get_pandas_df(self) -> pd.DataFrame:
    """Get the data in pandas dataframe format.

    Raises:
        InternalServerException: If the file format is not supported.

    Returns:
        pd.DataFrame: pandas dataframe of dataset
    """
    supported_filetypes = [".csv"]

    if self.df is not None:
        return self.df

    match self.dataset_path:
        case Path():
            path = self.dataset_path
        case HttpUrl():
            path = Path(self.dataset_path.path)

    if path.suffix not in supported_filetypes:
        raise InvalidQueryException(
            f"File type other than {supported_filetypes} not supported for loading into pandas DataFrame."
        )
    try:
        self.df = pd.read_csv(
            str(self.dataset_path),
            dtype=self.dtypes,
            parse_dates=self.datetime_columns,
        )
        return self.df
    except Exception as err:
        raise InternalServerException(
            f"Error reading csv at http path:{self.dataset_path}: {err}",
        ) from err

get_polars_lf #

get_polars_lf() -> LazyFrame

Get the data in polars lazyframe format.

Returns:

  • LazyFrame

    pl.LazyFrame: The polars lazyframe for this dataset.

Source code in server/lomas_server/data_connector/data_connector.py
53
54
55
56
57
58
59
def get_polars_lf(self) -> pl.LazyFrame:
    """Get the data in polars lazyframe format.

    Returns:
        pl.LazyFrame: The polars lazyframe for this dataset.
    """
    return pl.from_pandas(self.get_pandas_df()).lazy()

Classes:

  • S3Connector

    DataConnector for dataset in S3 storage.

S3Connector #


              flowchart TD
              lomas_server.data_connector.s3_connector.S3Connector[S3Connector]
              lomas_server.data_connector.data_connector.DataConnector[DataConnector]

                              lomas_server.data_connector.data_connector.DataConnector --> lomas_server.data_connector.s3_connector.S3Connector
                


              click lomas_server.data_connector.s3_connector.S3Connector href "" "lomas_server.data_connector.s3_connector.S3Connector"
              click lomas_server.data_connector.data_connector.DataConnector href "" "lomas_server.data_connector.data_connector.DataConnector"
            

DataConnector for dataset in S3 storage.

Methods:

get_pandas_df #

get_pandas_df() -> DataFrame

Get the data in pandas dataframe format.

Raises:

Returns:

  • DataFrame

    pd.DataFrame: pandas dataframe of dataset

Source code in server/lomas_server/data_connector/s3_connector.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def get_pandas_df(self) -> pd.DataFrame:
    """Get the data in pandas dataframe format.

    Raises:
        InternalServerException: If the dataset cannot be read.

    Returns:
        pd.DataFrame: pandas dataframe of dataset
    """
    if self.df is not None:
        return self.df

    obj = self._client.get_object(Bucket=self.bucket, Key=self.key)
    try:
        self.df = pd.read_csv(obj["Body"], dtype=self.dtypes, parse_dates=self.datetime_columns)
        return self.df
    except Exception as err:
        raise InternalServerException(
            "Error reading csv at s3 path:" + f"{self.bucket}/{self.key}: {err}"
        ) from err

get_polars_lf #

get_polars_lf() -> LazyFrame

Get the data in polars lazyframe format.

Returns:

  • LazyFrame

    pl.LazyFrame: The polars lazyframe for this dataset.

Source code in server/lomas_server/data_connector/data_connector.py
53
54
55
56
57
58
59
def get_polars_lf(self) -> pl.LazyFrame:
    """Get the data in polars lazyframe format.

    Returns:
        pl.LazyFrame: The polars lazyframe for this dataset.
    """
    return pl.from_pandas(self.get_pandas_df()).lazy()

Classes:

  • DiffPrivLibQuerier

    Concrete implementation of the DPQuerier ABC for the DiffPrivLib library.

Functions:

DiffPrivLibQuerier #

DiffPrivLibQuerier(data_connector: DataConnector, admin_database: Proxy)

              flowchart TD
              lomas_server.dp_queries.dp_libraries.diffprivlib.DiffPrivLibQuerier[DiffPrivLibQuerier]
              lomas_server.dp_queries.dp_querier.DPQuerier[DPQuerier]

                              lomas_server.dp_queries.dp_querier.DPQuerier --> lomas_server.dp_queries.dp_libraries.diffprivlib.DiffPrivLibQuerier
                


              click lomas_server.dp_queries.dp_libraries.diffprivlib.DiffPrivLibQuerier href "" "lomas_server.dp_queries.dp_libraries.diffprivlib.DiffPrivLibQuerier"
              click lomas_server.dp_queries.dp_querier.DPQuerier href "" "lomas_server.dp_queries.dp_querier.DPQuerier"
            

Concrete implementation of the DPQuerier ABC for the DiffPrivLib library.

Methods:

  • complete_pipeline

    Finalize the DiffPrivLib pipeline by injecting accountant and privacy constraints.

  • fit_model_on_data

    Fit the DiffPrivLib pipeline on the dataset provided by the data connector.

  • cost

    Estimate the privacy budget cost of running a DiffPrivLib query.

  • query

    Run the query on the fitted DiffPrivLib pipeline and return the results.

  • handle_query

    Handle DP query.

Source code in server/lomas_server/dp_queries/dp_libraries/diffprivlib.py
34
35
36
37
38
39
40
41
42
43
def __init__(
    self,
    data_connector: DataConnector,
    admin_database: Proxy,
) -> None:
    super().__init__(data_connector, admin_database)
    self.dpl_pipeline: Pipeline | None = None
    self.x_test: pd.DataFrame | None = None
    self.y_test: pd.DataFrame | None = None
    self.accountant = BudgetAccountant()

complete_pipeline #

complete_pipeline(feature_columns: list[str], target_columns: list[str] | None) -> None

Finalize the DiffPrivLib pipeline by injecting accountant and privacy constraints.

Steps
  1. Attach the shared budget accountant to all compatible steps.
  2. Add metadata-driven privacy constraints (data_norm, bounds, bounds_X, bounds_y) to the first pipeline step when supported.

Parameters:

  • feature_columns #

    (list[str]) –

    List of feature columns used for training.

  • target_columns #

    (list[str] | None) –

    Optional list of target columns (required if bounds_y is needed).

Raises:

Source code in server/lomas_server/dp_queries/dp_libraries/diffprivlib.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def complete_pipeline(self, feature_columns: list[str], target_columns: list[str] | None) -> None:
    """
    Finalize the DiffPrivLib pipeline by injecting accountant and privacy constraints.

    Steps:
        1. Attach the shared budget accountant to all compatible steps.
        2. Add metadata-driven privacy constraints (`data_norm`, `bounds`, `bounds_X`, `bounds_y`) to the first pipeline step when supported.

    Args:
        feature_columns: List of feature columns used for training.
        target_columns: Optional list of target columns (required if `bounds_y` is needed).

    Raises:
        InternalServerException: If pipeline is not initialized.
        InvalidQueryException: If target bounds are required but not provided.
    """
    if self.dpl_pipeline is None:
        raise InternalServerException("Pipeline must be initialized before calling complete_pipeline")

    # 1. Add budget accountant
    for _, step in self.dpl_pipeline.steps:
        if hasattr(step, "accountant"):
            step.accountant = self.accountant

    # 2. Get metadata for features
    metadata = self.data_connector.metadata
    feature_metadata = [col for col in metadata.columns if col.name in feature_columns]

    first_step = self.dpl_pipeline.steps[0][1]

    # --- Handle feature constraints ---
    feature_bounds: tuple[list[float], list[float]] | None = None
    if hasattr(first_step, "data_norm"):

        def contribution(meta: ColumnMetadata) -> int:
            return 1 if meta.datatype == "boolean" else meta.maximum**2

        first_step.data_norm = np.sqrt(sum(contribution(meta) for meta in feature_metadata))

    if hasattr(first_step, "bounds") or hasattr(first_step, "bounds_X"):
        feature_bounds = get_dpl_bounds(feature_metadata)

    if hasattr(first_step, "bounds"):
        first_step.bounds = feature_bounds
    if hasattr(first_step, "bounds_X"):
        first_step.bounds_X = feature_bounds

    # --- Handle target constraints ---
    if hasattr(first_step, "bounds_y"):
        if not target_columns:
            raise InvalidQueryException("target_columns must be provided when bounds_y is required")
        target_metadata = [col for col in metadata.columns if col.name in target_columns]
        first_step.bounds_y = get_dpl_bounds(target_metadata)

fit_model_on_data #

fit_model_on_data(query_json: DiffPrivLibRequestModel) -> None

Fit the DiffPrivLib pipeline on the dataset provided by the data connector.

Steps
  1. Validate inputs (no overlap between feature and target columns).
  2. Select and preprocess relevant columns (handle missing data).
  3. Split data into training and test sets.
  4. Deserialize the pipeline and inject server parameters.
  5. Fit the pipeline while treating PrivacyLeakWarning as an error.

Parameters:

  • query_json #

    (DiffPrivLibRequestModel) –

    Request object describing feature/target columns, pipeline definition, and preprocessing options.

Raises:

Source code in server/lomas_server/dp_queries/dp_libraries/diffprivlib.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def fit_model_on_data(self, query_json: DiffPrivLibRequestModel) -> None:
    """
    Fit the DiffPrivLib pipeline on the dataset provided by the data connector.

    Steps:
        1. Validate inputs (no overlap between feature and target columns).
        2. Select and preprocess relevant columns (handle missing data).
        3. Split data into training and test sets.
        4. Deserialize the pipeline and inject server parameters.
        5. Fit the pipeline while treating PrivacyLeakWarning as an error.

    Args:
        query_json: Request object describing feature/target columns,
                    pipeline definition, and preprocessing options.

    Raises:
        InvalidQueryException: If feature/target columns overlap.
        ExternalLibraryException: If DiffPrivLib fitting fails.
    """
    # 1. Validate feature/target columns
    feature_columns = query_json.feature_columns.copy()
    target_columns = query_json.target_columns or []

    overlap = set(feature_columns) & set(target_columns)
    if overlap:
        raise InvalidQueryException(f"Columns cannot be both feature and target: {', '.join(overlap)}")

    # 2. Select and preprocess data
    useful_columns = feature_columns + target_columns
    df = self.data_connector.get_pandas_df()[useful_columns]
    df = handle_missing_data(df, query_json.imputer_strategy)

    # 3. Split data
    x_train, self.x_test, y_train, self.y_test = split_train_test_data(df, query_json)

    # 4. Deserialize and configure pipeline
    self.dpl_pipeline = deserialise_pipeline(query_json.diffprivlib_json)
    self.complete_pipeline(feature_columns, query_json.target_columns)

    # 5. Fit pipeline with strict warning handling
    warnings.simplefilter("error", PrivacyLeakWarning)
    try:
        y_train = None if y_train is None else y_train.to_numpy().ravel()
        self.dpl_pipeline = self.dpl_pipeline.fit(x_train, y_train)
    except PrivacyLeakWarning as e:
        raise ExternalLibraryException(
            DPLibraries.DIFFPRIVLIB,
            f"PrivacyLeakWarning: {e} "
            + "Lomas server cannot fit pipeline on data, PrivacyLeakWarning is a blocker.",
        ) from e
    except Exception as e:
        raise ExternalLibraryException(
            DPLibraries.DIFFPRIVLIB,
            f"Cannot fit pipeline on data because {e}",
        ) from e

cost #

Estimate the privacy budget cost of running a DiffPrivLib query.

Steps
  1. Fit the model on the dataset (including accountant injection).
  2. Retrieve the total budget consumed from the accountant.

Parameters:

  • query_json #

    (DiffPrivLibRequestModel) –

    The request object describing the query (features, targets, pipeline JSON).

Raises:

Returns:

Source code in server/lomas_server/dp_queries/dp_libraries/diffprivlib.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
def cost(self, query_json: DiffPrivLibRequestModel) -> tuple[float, float]:
    """
    Estimate the privacy budget cost of running a DiffPrivLib query.

    Steps:
        1. Fit the model on the dataset (including accountant injection).
        2. Retrieve the total budget consumed from the accountant.

    Args:
        query_json: The request object describing the query (features, targets, pipeline JSON).

    Raises:
        ExternalLibraryException: If the pipeline fitting fails.

    Returns:
        A tuple of (epsilon, delta) costs.
    """
    # 1. Fit model (this will attach accountant and configure constraints)
    self.fit_model_on_data(query_json)

    # 2. Retrieve total budget
    epsilon, delta = self.accountant.total()
    return epsilon, delta

query #

Run the query on the fitted DiffPrivLib pipeline and return the results.

Parameters:

Raises:

Returns:

  • DiffPrivLibQueryResult

    DiffPrivLibQueryResult containing: - score: Model accuracy on the test set. - model: The trained DiffPrivLib pipeline.

Source code in server/lomas_server/dp_queries/dp_libraries/diffprivlib.py
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def query(
    self,
    query_json: DiffPrivLibQueryModel,
) -> DiffPrivLibQueryResult:
    """
    Run the query on the fitted DiffPrivLib pipeline and return the results.

    Args:
        query_json: The request object describing the query parameters.

    Raises:
        InternalServerException: If `query` is called before `cost` (pipeline not initialized).
        ExternalLibraryException: If the underlying pipeline evaluation fails.

    Returns:
        DiffPrivLibQueryResult containing:
            - score: Model accuracy on the test set.
            - model: The trained DiffPrivLib pipeline.
    """
    if self.dpl_pipeline is None:
        raise InternalServerException("DiffPrivLib `query` method called before `cost` method")

    # Model accuracy
    score = self.dpl_pipeline.score(self.x_test, self.y_test)

    # Serialise model
    return DiffPrivLibQueryResult(score=score, model=self.dpl_pipeline)

handle_query async #

Handle DP query.

Parameters:

  • query_json #

    (QueryModel) –

    The input object of the query.

  • user_name #

    (str) –

    User name.

Raises:

Returns:

  • QueryResponse ( QueryResponse ) –

    The response object. # TODO remove what is next. - requested_by (str): The user name. - query_response (pd.DataFrame): A DataFrame containing the query response. - spent_epsilon (float): The amount of epsilon budget spent for the query. - spent_delta (float): The amount of delta budget spent for the query.

Source code in server/lomas_server/dp_queries/dp_querier.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def handle_query(
    self,
    query_json: QueryModel,
    user_name: str,
) -> QueryResponse:
    """
    Handle DP query.

    Args:
        query_json (QueryModel): The input object of the query.
        user_name (str, optional): User name.

    Raises:
        UnauthorizedAccessException: A query is already ongoing for this user,\
            the user does not exist or does not have access to the dataset.
        InvalidQueryException: If the query is not valid.
        InternalServerException: For any other unforseen exceptions.

    Returns:
        QueryResponse: The response object. # TODO remove what is next.
            - requested_by (str): The user name.
            - query_response (pd.DataFrame): A DataFrame containing the query response.
            - spent_epsilon (float): The amount of epsilon budget spent for the query.
            - spent_delta (float): The amount of delta budget spent for the query.
    """
    # Block access to other queries to user
    if not await self.admin_database.get_and_set_may_user_query(user_name=user_name, may_query=False):
        raise UnauthorizedAccessException(
            f"User {user_name} is trying to query before end of previous query."
        )

    try:
        # Get cost of the query
        eps_cost, delta_cost = self.cost(query_json)

        # Check that enough budget to do the query
        try:
            (
                eps_remain,
                delta_remain,
            ) = await self.admin_database.get_remaining_budget(
                user_name=user_name, dataset_name=query_json.dataset_name
            )
        except UnauthorizedAccessException as e:
            raise e

        if (eps_remain < eps_cost) or (delta_remain < delta_cost):
            raise InvalidQueryException(
                "Not enough budget for this query epsilon remaining "
                f"{eps_remain}, delta remaining {delta_remain}."
            )

        # Query
        try:
            query_result = self.query(query_json)
        except KNOWN_EXCEPTIONS as e:
            raise e
        except Exception as e:
            raise InternalServerException(str(e)) from e

        # Deduce budget from user
        await self.admin_database.update_budget(
            user_name=user_name,
            dataset_name=query_json.dataset_name,
            spent_epsilon=eps_cost,
            spent_delta=delta_cost,
        )

        response = QueryResponse(
            requested_by=user_name,
            result=query_result,
            epsilon=eps_cost,
            delta=delta_cost,
        )

        # Add query to db (for archive)
        await self.admin_database.save_query(
            user_name=user_name, query=query_json, response=response
        )  # TODO 359 here

    except Exception as e:
        await self.admin_database.set_may_user_query(user_name=user_name, may_query=True)
        raise e

    # Re-enable user to query
    await self.admin_database.set_may_user_query(user_name=user_name, may_query=True)

    # Return response
    return response

split_train_test_data #

split_train_test_data(
    df: DataFrame, query_json: DiffPrivLibRequestModel
) -> tuple[DataFrame, DataFrame, DataFrame, DataFrame]

Split the data between train and test set.

Parameters:

  • df #

    (DataFrame) –

    dataframe with the data

  • query_json #

    (DiffPrivLibRequestModel) –

    user input query indication feature_columns (list[str]): columns from data to use as features target_columns (list[str]): columns from data to use as target (to predict) test_size (float): proportion of data in the test set test_train_split_seed (int): seed for the random train-test split

Returns:

  • x_train ( DataFrame ) –

    training data features

  • x_test ( DataFrame ) –

    testing data features

  • y_train ( DataFrame ) –

    training data target

  • y_test ( DataFrame ) –

    testing data target

Source code in server/lomas_server/dp_queries/dp_libraries/diffprivlib.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def split_train_test_data(
    df: pd.DataFrame, query_json: DiffPrivLibRequestModel
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Split the data between train and test set.

    Args:
        df (pd.DataFrame): dataframe with the data
        query_json (DiffPrivLibRequestModel): user input query indication
            feature_columns (list[str]): columns from data to use as features
            target_columns (list[str]): columns from data to use as target (to predict)
            test_size (float): proportion of data in the test set
            test_train_split_seed (int): seed for the random train-test split

    Returns:
        x_train (pd.DataFrame): training data features
        x_test (pd.DataFrame): testing data features
        y_train (pd.DataFrame): training data target
        y_test (pd.DataFrame): testing data target
    """
    feature_data = df[query_json.feature_columns]

    if query_json.target_columns is None:
        x_train, x_test = train_test_split(
            feature_data,
            test_size=query_json.test_size,
            random_state=query_json.test_train_split_seed,
        )
        y_train, y_test = None, None
    else:
        label_data = df[query_json.target_columns]
        x_train, x_test, y_train, y_test = train_test_split(
            feature_data,
            label_data,
            test_size=query_json.test_size,
            random_state=query_json.test_train_split_seed,
        )
    return x_train, x_test, y_train, y_test

get_dpl_bounds #

get_dpl_bounds(feature_columns: list[ColumnMetadata]) -> tuple[list[float], list[float]]

Format metadata bounds of feature columns in format expected by DiffPrivLib.

Parameters:

  • feature_columns #

    (list[ColumnMetadata]) –

    list of feature columns

Return

tuple of lower and upper bounds as expected by DiffPrivLib

Source code in server/lomas_server/dp_queries/dp_libraries/diffprivlib.py
247
248
249
250
251
252
253
254
255
256
257
258
259
def get_dpl_bounds(feature_columns: list[ColumnMetadata]) -> tuple[list[float], list[float]]:
    """
    Format metadata bounds of feature columns in format expected by DiffPrivLib.

    Args:
        feature_columns (list[ColumnMetadata]): list of feature columns

    Return:
        tuple of lower and upper bounds as expected by DiffPrivLib
    """
    lower = [col.minimum if col.datatype != "boolean" else 0 for col in feature_columns]
    upper = [col.minimum if col.datatype != "boolean" else 1 for col in feature_columns]
    return (lower, upper)

Classes:

  • OpenDPQuerier

    Concrete implementation of the DPQuerier ABC for the OpenDP library.

Functions:

OpenDPQuerier #

OpenDPQuerier(data_connector: DataConnector, admin_database: Proxy)

              flowchart TD
              lomas_server.dp_queries.dp_libraries.opendp.OpenDPQuerier[OpenDPQuerier]
              lomas_server.dp_queries.dp_querier.DPQuerier[DPQuerier]

                              lomas_server.dp_queries.dp_querier.DPQuerier --> lomas_server.dp_queries.dp_libraries.opendp.OpenDPQuerier
                


              click lomas_server.dp_queries.dp_libraries.opendp.OpenDPQuerier href "" "lomas_server.dp_queries.dp_libraries.opendp.OpenDPQuerier"
              click lomas_server.dp_queries.dp_querier.DPQuerier href "" "lomas_server.dp_queries.dp_querier.DPQuerier"
            

Concrete implementation of the DPQuerier ABC for the OpenDP library.

Parameters:

  • data_connector #

    (DataConnector) –

    DataConnector for the dataset to query.

Methods:

  • cost

    Estimate cost of query.

  • query

    Perform the query and return the response.

  • handle_query

    Handle DP query.

Source code in server/lomas_server/dp_queries/dp_libraries/opendp.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def __init__(
    self,
    data_connector: DataConnector,
    admin_database: Proxy,
) -> None:
    """Initializer.

    Args:
        data_connector (DataConnector): DataConnector for the dataset to query.
    """
    super().__init__(data_connector, admin_database)

    # Get metadata once and for all
    self.metadata = self.data_connector.metadata

cost #

Estimate cost of query.

Parameters:

Raises:

Returns:

  • tuple[float, float]

    tuple[float, float]: The tuple of costs, the first value is the epsilon cost, the second value is the delta value.

Source code in server/lomas_server/dp_queries/dp_libraries/opendp.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def cost(self, query_json: OpenDPRequestModel) -> tuple[float, float]:
    """
    Estimate cost of query.

    Args:
        query_json (OpenDPRequestModel): The request model object.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: The pipeline does not contain a
            "measurement", there is not enough budget or the dataset
            does not exist.

    Returns:
        tuple[float, float]: The tuple of costs, the first value
            is the epsilon cost, the second value is the delta value.
    """
    input_data = self.data_connector.get_polars_lf()
    context = csvw_to_opendp_context(
        self.metadata.to_dict(),
        input_data,
        epsilon=query_json.epsilon,
        delta=query_json.delta,
        rho=query_json.rho,
        split_evenly_over=1,
    )

    meas = context.accountant
    meas_type = str(meas.output_measure)
    max_contrib = self.metadata.max_contributions

    match meas_type:
        case OpenDPMeasurement.ZERO_CONCENTRATED_DIVERGENCE:
            meas_zcdp = dp.combinators.make_zCDP_to_approxDP(meas)
            cost = meas_zcdp.map(d_in=int(max_contrib))

            fixed_delta = query_json.delta
            if fixed_delta is None:
                raise InvalidQueryException("Provide a fixed delta for this query.")
            epsilon, delta = cost.epsilon(fixed_delta), fixed_delta

        case OpenDPMeasurement.APPROX_ZERO_CONCENTRATED_DIVERGENCE:
            meas_zcdp = dp.combinators.make_zCDP_to_approxDP(meas)
            cost = meas_zcdp.map(d_in=int(max_contrib))

            epsilon, delta = cost[0].epsilon(cost[1]), cost[1]

        case OpenDPMeasurement.MAX_DIVERGENCE:
            epsilon, delta = meas.map(d_in=int(max_contrib)), 0

        case OpenDPMeasurement.APPROX_MAX_DIVERGENCE:
            epsilon, delta = meas.map(d_in=int(max_contrib))

        case _:
            raise InternalServerException(f"Invalid measurement type: {meas_type}")
    return epsilon, delta

query #

Perform the query and return the response.

Parameters:

Raises:

Returns:

Source code in server/lomas_server/dp_queries/dp_libraries/opendp.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def query(self, query_json: OpenDPQueryModel) -> OpenDPQueryResult | OpenDPPolarsQueryResult:
    """Perform the query and return the response.

    Args:
        query_json (OpenDPQueryModel): The input model for the query.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.

    Returns:
        (Union[List, int, float]) query result
    """
    input_data = self.data_connector.get_polars_lf()
    context = csvw_to_opendp_context(
        self.metadata.to_dict(),
        input_data,
        epsilon=query_json.epsilon,
        delta=query_json.delta,
        rho=query_json.rho,
        split_evenly_over=1,
    )
    serialized_plan = b64decode(query_json.opendp_json.encode("utf-8"))
    plan = context.deserialize_polars_plan(serialized_plan)

    try:
        release_data = plan.release()
    except Exception as e:
        logger.exception(e)
        raise ExternalLibraryException(
            DPLibraries.OPENDP,
            "Error executing query:" + str(e),
        ) from e

    if isinstance(release_data, dp.extras.polars.OnceFrame):
        release_data = release_data.collect()
        return OpenDPPolarsQueryResult(value=release_data)
    return OpenDPQueryResult(value=release_data)

handle_query async #

Handle DP query.

Parameters:

  • query_json #

    (QueryModel) –

    The input object of the query.

  • user_name #

    (str) –

    User name.

Raises:

Returns:

  • QueryResponse ( QueryResponse ) –

    The response object. # TODO remove what is next. - requested_by (str): The user name. - query_response (pd.DataFrame): A DataFrame containing the query response. - spent_epsilon (float): The amount of epsilon budget spent for the query. - spent_delta (float): The amount of delta budget spent for the query.

Source code in server/lomas_server/dp_queries/dp_querier.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def handle_query(
    self,
    query_json: QueryModel,
    user_name: str,
) -> QueryResponse:
    """
    Handle DP query.

    Args:
        query_json (QueryModel): The input object of the query.
        user_name (str, optional): User name.

    Raises:
        UnauthorizedAccessException: A query is already ongoing for this user,\
            the user does not exist or does not have access to the dataset.
        InvalidQueryException: If the query is not valid.
        InternalServerException: For any other unforseen exceptions.

    Returns:
        QueryResponse: The response object. # TODO remove what is next.
            - requested_by (str): The user name.
            - query_response (pd.DataFrame): A DataFrame containing the query response.
            - spent_epsilon (float): The amount of epsilon budget spent for the query.
            - spent_delta (float): The amount of delta budget spent for the query.
    """
    # Block access to other queries to user
    if not await self.admin_database.get_and_set_may_user_query(user_name=user_name, may_query=False):
        raise UnauthorizedAccessException(
            f"User {user_name} is trying to query before end of previous query."
        )

    try:
        # Get cost of the query
        eps_cost, delta_cost = self.cost(query_json)

        # Check that enough budget to do the query
        try:
            (
                eps_remain,
                delta_remain,
            ) = await self.admin_database.get_remaining_budget(
                user_name=user_name, dataset_name=query_json.dataset_name
            )
        except UnauthorizedAccessException as e:
            raise e

        if (eps_remain < eps_cost) or (delta_remain < delta_cost):
            raise InvalidQueryException(
                "Not enough budget for this query epsilon remaining "
                f"{eps_remain}, delta remaining {delta_remain}."
            )

        # Query
        try:
            query_result = self.query(query_json)
        except KNOWN_EXCEPTIONS as e:
            raise e
        except Exception as e:
            raise InternalServerException(str(e)) from e

        # Deduce budget from user
        await self.admin_database.update_budget(
            user_name=user_name,
            dataset_name=query_json.dataset_name,
            spent_epsilon=eps_cost,
            spent_delta=delta_cost,
        )

        response = QueryResponse(
            requested_by=user_name,
            result=query_result,
            epsilon=eps_cost,
            delta=delta_cost,
        )

        # Add query to db (for archive)
        await self.admin_database.save_query(
            user_name=user_name, query=query_json, response=response
        )  # TODO 359 here

    except Exception as e:
        await self.admin_database.set_may_user_query(user_name=user_name, may_query=True)
        raise e

    # Re-enable user to query
    await self.admin_database.set_may_user_query(user_name=user_name, may_query=True)

    # Return response
    return response

set_opendp_features_config #

set_opendp_features_config(features: OpenDPFeatures) -> None

Enable opendp features based on config.

See https://github.com/opendp/opendp/discussions/304

Also sets the "OPENDP_POLARS_LIB_PATH" environment variable for correctly creating private lazyframes from deserialized polars plans.

Source code in server/lomas_server/dp_queries/dp_libraries/opendp.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def set_opendp_features_config(features: OpenDPFeatures) -> None:
    """Enable opendp features based on config.

    See https://github.com/opendp/opendp/discussions/304

    Also sets the "OPENDP_POLARS_LIB_PATH" environment variable
    for correctly creating private lazyframes from deserialized
    polars plans.
    """
    for feat in features:
        logger.debug(f"OpenDP: enabling feature: {feat}")
        enable_features(feat)

    # Set DP Libraries config
    os.environ["OPENDP_LIB_PATH"] = str(lib_path)

Classes:

  • SmartnoiseSQLQuerier

    Concrete implementation of the DPQuerier ABC for the SmartNoiseSQL library.

Functions:

SmartnoiseSQLQuerier #

SmartnoiseSQLQuerier(data_connector: DataConnector, admin_database: Proxy)

              flowchart TD
              lomas_server.dp_queries.dp_libraries.smartnoise_sql.SmartnoiseSQLQuerier[SmartnoiseSQLQuerier]
              lomas_server.dp_queries.dp_querier.DPQuerier[DPQuerier]

                              lomas_server.dp_queries.dp_querier.DPQuerier --> lomas_server.dp_queries.dp_libraries.smartnoise_sql.SmartnoiseSQLQuerier
                


              click lomas_server.dp_queries.dp_libraries.smartnoise_sql.SmartnoiseSQLQuerier href "" "lomas_server.dp_queries.dp_libraries.smartnoise_sql.SmartnoiseSQLQuerier"
              click lomas_server.dp_queries.dp_querier.DPQuerier href "" "lomas_server.dp_queries.dp_querier.DPQuerier"
            

Concrete implementation of the DPQuerier ABC for the SmartNoiseSQL library.

Methods:

  • cost

    Estimate cost of query.

  • query

    Performs the query and returns the response.

  • query_with_iter

    Perform the query and return the response.

  • handle_query

    Handle DP query.

Source code in server/lomas_server/dp_queries/dp_libraries/smartnoise_sql.py
25
26
27
28
29
30
31
32
def __init__(
    self,
    data_connector: DataConnector,
    admin_database: Proxy,
) -> None:
    super().__init__(data_connector, admin_database)
    self.reader: Reader | None = None
    self.query_columns: list[str] = []

cost #

Estimate cost of query.

Parameters:

  • query_json #

    (SmartnoiseSQLModelCost) –

    JSON request object for the query.

Raises:

Returns:

  • tuple[float, float]

    tuple[float, float]: The tuple of costs, the first value is the epsilon cost, the second value is the delta value.

Source code in server/lomas_server/dp_queries/dp_libraries/smartnoise_sql.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def cost(self, query_json: SmartnoiseSQLRequestModel) -> tuple[float, float]:
    """Estimate cost of query.

    Args:
        query_json (SmartnoiseSQLModelCost): JSON request object for the query.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.

    Returns:
        tuple[float, float]: The tuple of costs, the first value
            is the epsilon cost, the second value is the delta value.
    """
    privacy = Privacy(epsilon=query_json.epsilon, delta=query_json.delta)
    privacy = set_mechanisms(privacy, query_json.mechanisms)

    df = self.data_connector.get_pandas_df()

    # Extract query columns, fallback to the first column if none are found
    self.query_columns = get_query_columns(query_json.query_str) or [df.columns[0]]
    missing = [col for col in self.query_columns if col not in df.columns]
    if missing:
        raise InvalidQueryException(f"Query requested columns not found in DataFrame: {missing}")

    # Subset DataFrame to only the relevant columns
    df = df[self.query_columns]

    # Prepare metadata in smartnoise-sql format
    metadata = self.data_connector.metadata
    metadata.columns = [col for col in metadata.columns if col.name in self.query_columns]

    smartnoise_metadata = csvw_to_smartnoise_sql(metadata.to_dict())
    # Only keep self.query_columns
    self.reader = from_connection(
        df,
        privacy=privacy,
        metadata=smartnoise_metadata,
    )
    try:
        epsilon, delta = self.reader.get_privacy_cost(query_json.query_str)

    except Exception as e:
        raise ExternalLibraryException(DPLibraries.SMARTNOISE_SQL, f"Error obtaining cost: {e}") from e

    return epsilon, delta

query #

Performs the query and returns the response.

Parameters:

Returns: dict: The dictionary encoding of the result pd.DataFrame.

Source code in server/lomas_server/dp_queries/dp_libraries/smartnoise_sql.py
81
82
83
84
85
86
87
88
89
def query(self, query_json: SmartnoiseSQLQueryModel) -> SmartnoiseSQLQueryResult:
    """Performs the query and returns the response.

    Args:
        query_json (SmartnoiseSQLQueryModel): The request model object.
    Returns:
        dict: The dictionary encoding of the result pd.DataFrame.
    """
    return self.query_with_iter(query_json)

query_with_iter #

Perform the query and return the response.

Parameters:

  • query_json #

    (SmartnoiseSQLQueryModel) –

    Request object for the query.

  • nb_iter #

    (int, default: 0 ) –

    Number of trials if output is Nan. Defaults to 0.

Raises:

Returns:

Source code in server/lomas_server/dp_queries/dp_libraries/smartnoise_sql.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def query_with_iter(
    self, query_json: SmartnoiseSQLQueryModel, nb_iter: int = 0
) -> SmartnoiseSQLQueryResult:
    """Perform the query and return the response.

    Args:
        query_json (SmartnoiseSQLQueryModel): Request object for the query.
        nb_iter (int, optional): Number of trials if output is Nan.
            Defaults to 0.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InvalidQueryException: If the budget values are too small to
            perform the query.

    Returns:
        SmartnoiseSQLQueryResult:
            The dictionary encoding of the resulting pd.DataFrame.
    """
    epsilon, delta = query_json.epsilon, query_json.delta
    if self.reader is None:
        raise InternalServerException("Smartnoise SQL `query` method called before `cost` method")

    try:
        result = self.reader.execute(query_json.query_str, postprocess=query_json.postprocess)
    except Exception as e:
        raise ExternalLibraryException(
            DPLibraries.SMARTNOISE_SQL,
            "Error executing query:" + str(e),
        ) from e
    if not query_json.postprocess:
        result = next(iter(result))
        cols = [f"res_{i}" for i in range(len(result))]
        result = [result]
    else:
        cols = result.pop(0)

    if result == []:
        raise ExternalLibraryException(
            DPLibraries.SMARTNOISE_SQL,
            f"SQL Reader generated empty results. "
            f"Epsilon: {epsilon} and Delta: {delta} are too small"
            " to generate output.",
        )

    df_res = pd.DataFrame(result, columns=cols)

    # Check for NaNs in any of the new columns
    new_columns = [col for col in df_res.columns if col not in self.query_columns]
    if df_res[new_columns].isna().any().any():
        if nb_iter < SSQL_MAX_ITERATION:
            nb_iter += 1
            return self.query_with_iter(query_json, nb_iter)

        raise InvalidQueryException(
            f"SQL Reader generated NaN results. "
            f"Epsilon: {epsilon}, Delta: {delta} — too small to generate valid output."
        )
    return SmartnoiseSQLQueryResult(df=df_res)

handle_query async #

Handle DP query.

Parameters:

  • query_json #

    (QueryModel) –

    The input object of the query.

  • user_name #

    (str) –

    User name.

Raises:

Returns:

  • QueryResponse ( QueryResponse ) –

    The response object. # TODO remove what is next. - requested_by (str): The user name. - query_response (pd.DataFrame): A DataFrame containing the query response. - spent_epsilon (float): The amount of epsilon budget spent for the query. - spent_delta (float): The amount of delta budget spent for the query.

Source code in server/lomas_server/dp_queries/dp_querier.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def handle_query(
    self,
    query_json: QueryModel,
    user_name: str,
) -> QueryResponse:
    """
    Handle DP query.

    Args:
        query_json (QueryModel): The input object of the query.
        user_name (str, optional): User name.

    Raises:
        UnauthorizedAccessException: A query is already ongoing for this user,\
            the user does not exist or does not have access to the dataset.
        InvalidQueryException: If the query is not valid.
        InternalServerException: For any other unforseen exceptions.

    Returns:
        QueryResponse: The response object. # TODO remove what is next.
            - requested_by (str): The user name.
            - query_response (pd.DataFrame): A DataFrame containing the query response.
            - spent_epsilon (float): The amount of epsilon budget spent for the query.
            - spent_delta (float): The amount of delta budget spent for the query.
    """
    # Block access to other queries to user
    if not await self.admin_database.get_and_set_may_user_query(user_name=user_name, may_query=False):
        raise UnauthorizedAccessException(
            f"User {user_name} is trying to query before end of previous query."
        )

    try:
        # Get cost of the query
        eps_cost, delta_cost = self.cost(query_json)

        # Check that enough budget to do the query
        try:
            (
                eps_remain,
                delta_remain,
            ) = await self.admin_database.get_remaining_budget(
                user_name=user_name, dataset_name=query_json.dataset_name
            )
        except UnauthorizedAccessException as e:
            raise e

        if (eps_remain < eps_cost) or (delta_remain < delta_cost):
            raise InvalidQueryException(
                "Not enough budget for this query epsilon remaining "
                f"{eps_remain}, delta remaining {delta_remain}."
            )

        # Query
        try:
            query_result = self.query(query_json)
        except KNOWN_EXCEPTIONS as e:
            raise e
        except Exception as e:
            raise InternalServerException(str(e)) from e

        # Deduce budget from user
        await self.admin_database.update_budget(
            user_name=user_name,
            dataset_name=query_json.dataset_name,
            spent_epsilon=eps_cost,
            spent_delta=delta_cost,
        )

        response = QueryResponse(
            requested_by=user_name,
            result=query_result,
            epsilon=eps_cost,
            delta=delta_cost,
        )

        # Add query to db (for archive)
        await self.admin_database.save_query(
            user_name=user_name, query=query_json, response=response
        )  # TODO 359 here

    except Exception as e:
        await self.admin_database.set_may_user_query(user_name=user_name, may_query=True)
        raise e

    # Re-enable user to query
    await self.admin_database.set_may_user_query(user_name=user_name, may_query=True)

    # Return response
    return response

set_mechanisms #

set_mechanisms(privacy: Privacy, mechanisms: dict[str, str]) -> Privacy

Set privacy mechanisms on the Privacy object.

For more information see: https://docs.smartnoise.org/sql/advanced.html#overriding-mechanisms

Parameters:

  • privacy #

    (Privacy) –

    Privacy object.

  • mechanisms #

    (dict[str, str]) –

    Mechanisms to set.

Returns:

  • Privacy ( Privacy ) –

    The updated Privacy object.

Source code in server/lomas_server/dp_queries/dp_libraries/smartnoise_sql.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def set_mechanisms(privacy: Privacy, mechanisms: dict[str, str]) -> Privacy:
    """Set privacy mechanisms on the Privacy object.

    For more information see:
    https://docs.smartnoise.org/sql/advanced.html#overriding-mechanisms

    Args:
        privacy (Privacy): Privacy object.
        mechanisms (dict[str, str]): Mechanisms to set.

    Returns:
        Privacy: The updated Privacy object.
    """
    for stat in SSQL_STATS:
        if stat in mechanisms:
            privacy.mechanisms.map[Stat[stat]] = Mechanism[mechanisms[stat]]
    return privacy

get_query_columns #

get_query_columns(query: str) -> list[str]

Extract all column names used in a SQL query.

Traverses the query AST (Abstract Syntax Tree) to find every column reference across SELECT, WHERE, GROUP BY, ORDER BY, etc. Assumes only one table is present in the query.

Parameters:

  • query #

    (str) –

    SQL query string.

Returns:

  • list[str]

    list[str]: List of unique column names used in the query.

Source code in server/lomas_server/dp_queries/dp_libraries/smartnoise_sql.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def get_query_columns(query: str) -> list[str]:
    """
    Extract all column names used in a SQL query.

    Traverses the query AST (Abstract Syntax Tree) to find every
    column reference across SELECT, WHERE, GROUP BY, ORDER BY, etc.
    Assumes only one table is present in the query.

    Args:
        query (str): SQL query string.

    Returns:
        list[str]: List of unique column names used in the query.
    """
    # Parse SQL into an expression tree
    expression = parse_one(query)

    # Extract all column references from anywhere in the query
    columns = [col.name for col in expression.find_all(exp.Column)]

    return list(set(columns))

Functions:

handle_missing_data #

handle_missing_data(df: DataFrame, imputer_strategy: str) -> DataFrame

Impute missing data based on given imputation strategy for NaNs.

Parameters:

  • df #

    (DataFrame) –

    dataframe with the data

  • imputer_strategy #

    (str) –

    string to indicate imputatation for NaNs "drop": will drop all rows with missing values "mean": will replace values by the mean of the column values "median": will replace values by the median of the column values "most_frequent": : will replace values by the most frequent values

Raises:

Returns:

  • df ( DataFrame ) –

    dataframe with the imputed data

Source code in server/lomas_server/dp_queries/dp_libraries/utils.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def handle_missing_data(df: pd.DataFrame, imputer_strategy: str) -> pd.DataFrame:
    """Impute missing data based on given imputation strategy for NaNs.

    Args:
        df (pd.DataFrame): dataframe with the data
        imputer_strategy (str): string to indicate imputatation for NaNs
            "drop": will drop all rows with missing values
            "mean": will replace values by the mean of the column values
            "median": will replace values by the median of the column values
            "most_frequent": : will replace values by the most frequent values

    Raises:
        InvalidQueryException: If the "imputer_strategy" does not exist

    Returns:
        df (pd.DataFrame): dataframe with the imputed data
    """
    dtypes = df.dtypes

    if imputer_strategy == "drop":
        df = df.dropna()
    elif imputer_strategy in {"mean", "median"}:
        numerical_cols = df.select_dtypes(include=NUMERICAL_DTYPES).columns.tolist()
        categorical_cols = [col for col in df.columns if col not in numerical_cols]

        # Impute numerical features using given strategy
        imp_mean = SimpleImputer(strategy=imputer_strategy)
        df_num_imputed = imp_mean.fit_transform(df[numerical_cols])

        # Impute categorical features with most frequent value
        imp_most_frequent = SimpleImputer(strategy="most_frequent")
        df.loc[:, categorical_cols] = df[categorical_cols].astype("object").replace({pd.NA: np.nan})
        df[df.select_dtypes(bool).columns] = df.select_dtypes(bool).astype("boolean")
        df_cat_imputed = imp_most_frequent.fit_transform(df[categorical_cols]) if categorical_cols else []

        # Combine imputed dataframes
        df = pd.concat(
            [
                pd.DataFrame(df_num_imputed, columns=numerical_cols),
                pd.DataFrame(df_cat_imputed, columns=categorical_cols),
            ],
            axis=1,
        )
    elif imputer_strategy == "most_frequent":
        # Impute all features with most frequent value
        imp_most_frequent = SimpleImputer(strategy=imputer_strategy)
        df.loc[:, df.columns] = df[df.columns].astype("object").replace({pd.NA: np.nan})
        df = pd.DataFrame(imp_most_frequent.fit_transform(df), columns=df.columns)
    else:
        raise InvalidQueryException(f"Imputation strategy {imputer_strategy} not supported.")

    # Force int and bool type on int and bool columns
    for col in df.columns:
        if pd.api.types.is_integer_dtype(dtypes[col]) or pd.api.types.is_bool_dtype(dtypes[col]):
            df.loc[:, col] = df[col].round().astype(dtypes[col])

    df = df.astype(dtype=dtypes)

    if df.shape[0] == 0:
        raise InvalidQueryException("Empty dataframe, please try another imputation strategy.")
    return df

Classes:

  • DPQuerier

    Abstract Base Class for Queriers to external DP library.

DPQuerier #


              flowchart TD
              lomas_server.dp_queries.dp_querier.DPQuerier[DPQuerier]

              

              click lomas_server.dp_queries.dp_querier.DPQuerier href "" "lomas_server.dp_queries.dp_querier.DPQuerier"
            

Abstract Base Class for Queriers to external DP library.

A querier type is specific to a DP library and a querier instance is specific to a DataConnector instance.

Parameters:

  • data_connector #

    (DataConnector) –

    The private dataset to query.

  • admin_database #

    (Proxy) –

    A Proxy for an initialized instance of an AdminDatabase.

Methods:

  • cost

    Estimate cost of query.

  • query

    Perform the query and return the response.

  • handle_query

    Handle DP query.

Source code in server/lomas_server/dp_queries/dp_querier.py
35
36
37
38
39
40
41
42
43
44
45
46
47
def __init__(
    self,
    data_connector: DataConnector,
    admin_database: Proxy,
) -> None:
    """Initialise with specific dataset.

    Args:
        data_connector (DataConnector): The private dataset to query.
        admin_database (Proxy): A Proxy for an initialized instance of an AdminDatabase.
    """
    self.data_connector = data_connector
    self.admin_database = admin_database

cost abstractmethod #

cost(query_json: RequestModelGeneric) -> tuple[float, float]

Estimate cost of query.

Parameters:

  • query_json #

    (RequestModelGeneric) –

    The input object of the request. Must be a subclass of LomasRequestModel.

Returns: tuple[float, float]: The tuple of costs, the first value is the epsilon cost, the second value is the delta value.

Source code in server/lomas_server/dp_queries/dp_querier.py
49
50
51
52
53
54
55
56
57
58
59
60
@abstractmethod
def cost(self, query_json: RequestModelGeneric) -> tuple[float, float]:
    """
    Estimate cost of query.

    Args:
        query_json (RequestModelGeneric): The input object of the request.
            Must be a subclass of LomasRequestModel.
    Returns:
        tuple[float, float]: The tuple of costs, the first value is
            the epsilon cost, the second value is the delta value.
    """

query abstractmethod #

query(query_json: QueryModelGeneric) -> QueryResultGeneric

Perform the query and return the response.

Parameters:

  • query_json #

    (QueryModelGeneric) –

    The input object of the query. Must be a subclass of QueryModel.

Returns:

  • QueryResultGeneric

    dict | int | float | List[Any] | Any | str: The query result, to be added to the response dict.

Source code in server/lomas_server/dp_queries/dp_querier.py
62
63
64
65
66
67
68
69
70
71
72
73
74
@abstractmethod
def query(self, query_json: QueryModelGeneric) -> QueryResultGeneric:
    """
    Perform the query and return the response.

    Args:
        query_json (QueryModelGeneric): The input object of the query.\
            Must be a subclass of QueryModel.

    Returns:
        dict | int | float | List[Any] | Any | str:
            The query result, to be added to the response dict.
    """

handle_query async #

Handle DP query.

Parameters:

  • query_json #

    (QueryModel) –

    The input object of the query.

  • user_name #

    (str) –

    User name.

Raises:

Returns:

  • QueryResponse ( QueryResponse ) –

    The response object. # TODO remove what is next. - requested_by (str): The user name. - query_response (pd.DataFrame): A DataFrame containing the query response. - spent_epsilon (float): The amount of epsilon budget spent for the query. - spent_delta (float): The amount of delta budget spent for the query.

Source code in server/lomas_server/dp_queries/dp_querier.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def handle_query(
    self,
    query_json: QueryModel,
    user_name: str,
) -> QueryResponse:
    """
    Handle DP query.

    Args:
        query_json (QueryModel): The input object of the query.
        user_name (str, optional): User name.

    Raises:
        UnauthorizedAccessException: A query is already ongoing for this user,\
            the user does not exist or does not have access to the dataset.
        InvalidQueryException: If the query is not valid.
        InternalServerException: For any other unforseen exceptions.

    Returns:
        QueryResponse: The response object. # TODO remove what is next.
            - requested_by (str): The user name.
            - query_response (pd.DataFrame): A DataFrame containing the query response.
            - spent_epsilon (float): The amount of epsilon budget spent for the query.
            - spent_delta (float): The amount of delta budget spent for the query.
    """
    # Block access to other queries to user
    if not await self.admin_database.get_and_set_may_user_query(user_name=user_name, may_query=False):
        raise UnauthorizedAccessException(
            f"User {user_name} is trying to query before end of previous query."
        )

    try:
        # Get cost of the query
        eps_cost, delta_cost = self.cost(query_json)

        # Check that enough budget to do the query
        try:
            (
                eps_remain,
                delta_remain,
            ) = await self.admin_database.get_remaining_budget(
                user_name=user_name, dataset_name=query_json.dataset_name
            )
        except UnauthorizedAccessException as e:
            raise e

        if (eps_remain < eps_cost) or (delta_remain < delta_cost):
            raise InvalidQueryException(
                "Not enough budget for this query epsilon remaining "
                f"{eps_remain}, delta remaining {delta_remain}."
            )

        # Query
        try:
            query_result = self.query(query_json)
        except KNOWN_EXCEPTIONS as e:
            raise e
        except Exception as e:
            raise InternalServerException(str(e)) from e

        # Deduce budget from user
        await self.admin_database.update_budget(
            user_name=user_name,
            dataset_name=query_json.dataset_name,
            spent_epsilon=eps_cost,
            spent_delta=delta_cost,
        )

        response = QueryResponse(
            requested_by=user_name,
            result=query_result,
            epsilon=eps_cost,
            delta=delta_cost,
        )

        # Add query to db (for archive)
        await self.admin_database.save_query(
            user_name=user_name, query=query_json, response=response
        )  # TODO 359 here

    except Exception as e:
        await self.admin_database.set_may_user_query(user_name=user_name, may_query=True)
        raise e

    # Re-enable user to query
    await self.admin_database.set_may_user_query(user_name=user_name, may_query=True)

    # Return response
    return response

Functions:

get_dummy_dataset_for_query async #

get_dummy_dataset_for_query(
    admin_database: Proxy, query_json: DummyQueryModel
) -> InMemoryConnector

Get a dummy dataset for a given query.

Parameters:

  • admin_database #

    (Proxy) –

    A Proxy for an initialized instance of an AdminDatabase.

  • query_json #

    (RequestModel) –

    The request object for the query.

Returns:

Source code in server/lomas_server/dp_queries/dummy_dataset.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def get_dummy_dataset_for_query(
    admin_database: Proxy, query_json: DummyQueryModel
) -> InMemoryConnector:
    """Get a dummy dataset for a given query.

    Args:
        admin_database (Proxy): A Proxy for an initialized instance of an AdminDatabase.
        query_json (RequestModel): The request object for the query.

    Returns:
        InMemoryConnector: An in memory dummy dataset instance.
    """
    # Create dummy dataset based on seed and number of rows
    metadata = await admin_database.get_dataset_metadata(dataset_name=query_json.dataset_name)
    df = make_dummy_from_metadata(
        metadata.to_dict(),
        query_json.dummy_nb_rows,
        query_json.dummy_seed,
    )
    return InMemoryConnector(metadata=metadata, df=df)

Classes:

PrivateDBCredentials #


              flowchart TD
              lomas_server.models.config.PrivateDBCredentials[PrivateDBCredentials]

              

              click lomas_server.models.config.PrivateDBCredentials href "" "lomas_server.models.config.PrivateDBCredentials"
            

BaseModel for private database credentials.

S3CredentialsConfig #


              flowchart TD
              lomas_server.models.config.S3CredentialsConfig[S3CredentialsConfig]
              lomas_server.models.config.PrivateDBCredentials[PrivateDBCredentials]

                              lomas_server.models.config.PrivateDBCredentials --> lomas_server.models.config.S3CredentialsConfig
                


              click lomas_server.models.config.S3CredentialsConfig href "" "lomas_server.models.config.S3CredentialsConfig"
              click lomas_server.models.config.PrivateDBCredentials href "" "lomas_server.models.config.PrivateDBCredentials"
            

BaseModel for S3 database credentials.

AmqpConfig #


              flowchart TD
              lomas_server.models.config.AmqpConfig[AmqpConfig]

              

              click lomas_server.models.config.AmqpConfig href "" "lomas_server.models.config.AmqpConfig"
            

BaseSettings for Advanced Message Queuing Protocol (AMQP).

Methods:

  • dsn

    Construct full DSN including credentials.

  • base_url

    Queue base URL.

dsn #

dsn() -> str

Construct full DSN including credentials.

Source code in server/lomas_server/models/config.py
44
45
46
47
48
49
50
51
52
53
54
55
@computed_field
def dsn(self) -> str:
    """Construct full DSN including credentials."""
    dsn = Url.build(
        scheme=self.url.scheme,
        username=self.username,
        password=self.password,
        host=self.url.host,
        port=self.url.port,
        query=f"heartbeat={self.heartbeat}",
    )
    return str(dsn)

base_url #

base_url() -> str

Queue base URL.

Source code in server/lomas_server/models/config.py
57
58
59
60
61
62
63
64
65
@computed_field
def base_url(self) -> str:
    """Queue base URL."""
    base_url = Url.build(
        scheme=self.url.scheme,
        host=self.url.host,
        port=self.url.port,
    )
    return str(base_url)

DexAdminConfig #


              flowchart TD
              lomas_server.models.config.DexAdminConfig[DexAdminConfig]

              

              click lomas_server.models.config.DexAdminConfig href "" "lomas_server.models.config.DexAdminConfig"
            

Methods:

use_mtls #

use_mtls() -> bool

Using mTLS ?

Source code in server/lomas_server/models/config.py
71
72
73
74
@computed_field
def use_mtls(self) -> bool:
    """Using mTLS ?"""
    return self.url.scheme == "https"

Server #


              flowchart TD
              lomas_server.models.config.Server[Server]

              

              click lomas_server.models.config.Server href "" "lomas_server.models.config.Server"
            

BaseModel for uvicorn server configs.

Attributes:

submit_limit instance-attribute #

submit_limit: float

A limit on the rate which users can submit answers.

Config #


              flowchart TD
              lomas_server.models.config.Config[Config]

              

              click lomas_server.models.config.Config href "" "lomas_server.models.config.Config"
            

Server runtime config.

AdminConfig #


              flowchart TD
              lomas_server.models.config.AdminConfig[AdminConfig]

              

              click lomas_server.models.config.AdminConfig href "" "lomas_server.models.config.AdminConfig"
            

Base model for settings for administrative tasks.

Classes:

ConfigResponse #


              flowchart TD
              lomas_server.models.responses.ConfigResponse[ConfigResponse]

              

              click lomas_server.models.responses.ConfigResponse href "" "lomas_server.models.responses.ConfigResponse"
            

Model for response to server config queries.

Attributes:

config class-attribute instance-attribute #

config: Config = Field(default_factory=Config)

The server config.

Classes:

LoggingAndTracingMiddleware #


              flowchart TD
              lomas_server.routes.middlewares.LoggingAndTracingMiddleware[LoggingAndTracingMiddleware]

              

              click lomas_server.routes.middlewares.LoggingAndTracingMiddleware href "" "lomas_server.routes.middlewares.LoggingAndTracingMiddleware"
            

Middleware for logging and tracing incoming HTTP requests.

This middleware logs the incoming requests, including the user name the route being accessed, and any query parameters. Additionally, it creates a trace span to trace the user's request and adds attributes to the span related to the user name and query parameters.

Methods:

  • dispatch

    Handles the request and performs logging and tracing.

dispatch async #

dispatch(request: Request, call_next: RequestResponseEndpoint) -> Response

Handles the request and performs logging and tracing.

Logs the user name, the route and the query parameters. Creates a trace span to monitor the request and adds relevant attributes.

Parameters:

  • request #

    (Request) –

    The incoming request object.

  • call_next #

    (Callable) –

    A function that, when called, passes the request to the next middleware or request handler.

Returns:

  • Response ( Response ) –

    The HTTP response generated by calling call_next(request).

Source code in server/lomas_server/routes/middlewares.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
    """
    Handles the request and performs logging and tracing.

    Logs the user name, the route and the query parameters.
    Creates a trace span to monitor the request and adds relevant attributes.

    Args:
        request (Request): The incoming request object.
        call_next (Callable): A function that, when called, passes the request
                              to the next middleware or request handler.

    Returns:
        Response: The HTTP response generated by calling `call_next(request)`.
    """
    user_name = request.headers.get("user-name")
    route = request.url.path

    try:
        query_params = await request.json()
    except (json.JSONDecodeError, ValueError):
        query_params = {}
    for param, value in query_params.items():
        if value is None:
            query_params[param] = ""
        if isinstance(value, dict):
            query_params[param] = json.dumps(value)

    tracer = get_tracer(__name__)
    with tracer.start_as_current_span("user_request_span") as span:
        for param, value in query_params.items():
            span.set_attribute(f"query_param.{param}", value)

        logger.log(
            TRACE_LOG_LEVEL,
            f"User is making a request to route '{route}' "
            f"with query params: {query_params}. "
            f"trace_id={format_trace_id(span.get_span_context().trace_id)}",
        )

        response = await call_next(request)

        if response.status_code < 400:  # Run only for successful requests.
            if hasattr(request.state, "user_name"):  # Not all routes extract the user name.
                user_name = request.state.user_name
                logger.log(
                    TRACE_LOG_LEVEL,
                    f"Request with trace_id={format_trace_id(span.get_span_context().trace_id)}"
                    f" for user '{user_name}' completed.",
                )
                span.set_attribute("user_name", request.state.user_name)

            logger.log(
                TRACE_LOG_LEVEL,
                f"Request with trace_id={format_trace_id(span.get_span_context().trace_id)}"
                " completed successfully",
            )

        else:
            logger.log(
                TRACE_LOG_LEVEL,
                f"Failed request with trace_id={format_trace_id(span.get_span_context().trace_id)}."
                f"Status code: {response.status_code}.",
            )

    return response

FastAPIMetricMiddleware #

FastAPIMetricMiddleware(app: ASGIApp, app_name: str)

              flowchart TD
              lomas_server.routes.middlewares.FastAPIMetricMiddleware[FastAPIMetricMiddleware]

              

              click lomas_server.routes.middlewares.FastAPIMetricMiddleware href "" "lomas_server.routes.middlewares.FastAPIMetricMiddleware"
            

Middleware to collect and expose Prometheus metrics for a FastAPI application.

This middleware tracks various metrics related to HTTP requests, including: - Total requests (fastapi_requests_total) - Total responses (fastapi_responses_total) - Exceptions raised (fastapi_exceptions_total) - Request processing duration (fastapi_requests_duration_seconds) - Current requests in progress (fastapi_requests_in_progress)

It also supports integration with an OpenTelemetry exporter for exporting metrics to a metrics collector (e.g., Prometheus or any other OTLP-compatible collector).

Parameters:

  • app #

    (ASGIApp) –

    The FastAPI application instance.

  • app_name #

    (str) –

    The name of the application used for metric labeling.

Methods:

  • dispatch

    Processes HTTP request, records metrics and returns the HTTP response.

  • get_path

    Attempts to match the request' route to a defined route.

Source code in server/lomas_server/routes/middlewares.py
117
118
119
120
121
122
123
124
125
126
def __init__(self, app: ASGIApp, app_name: str) -> None:
    """
    Initializes the MetricMiddleware.

    Args:
        app (ASGIApp): The FastAPI application instance.
        app_name (str): The name of the application used for metric labeling.
    """
    super().__init__(app)
    self.app_name = app_name

dispatch async #

dispatch(request: Request, call_next: RequestResponseEndpoint) -> Response

Processes HTTP request, records metrics and returns the HTTP response.

This method performs the following steps: 1. Tracks the current request in progress using fastapi_requests_in_progress gauge. 2. Records the request count with fastapi_requests_total counter. 3. Records the time taken to process the request using fastapi_requests_duration_seconds histogram. 4. Handles exceptions, if raised, and records the exception details using fastapi_exceptions_total counter. 5. Records the response status code with fastapi_responses_total counter. 6. Decrements the in-progress request gauge after processing.

Parameters:

  • request #

    (Request) –

    The incoming HTTP request to be processed.

  • call_next #

    (RequestResponseEndpoint) –

    Endpoint that processes the request and returns a response.

Returns:

  • Response ( Response ) –

    The HTTP response after processing the request.

Raises:

  • BaseException

    If an exception occurs during request processing, it is raised after logging it.

Source code in server/lomas_server/routes/middlewares.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
    """
    Processes HTTP request, records metrics and returns the HTTP response.

    This method performs the following steps:
    1. Tracks the current request in progress using `fastapi_requests_in_progress` gauge.
    2. Records the request count with `fastapi_requests_total` counter.
    3. Records the time taken to process the request using
    `fastapi_requests_duration_seconds` histogram.
    4. Handles exceptions, if raised, and records the exception details using
    `fastapi_exceptions_total` counter.
    5. Records the response status code with `fastapi_responses_total` counter.
    6. Decrements the in-progress request gauge after processing.

    Args:
        request (Request): The incoming HTTP request to be processed.
        call_next (RequestResponseEndpoint): Endpoint that processes the request and returns a response.

    Returns:
        Response: The HTTP response after processing the request.

    Raises:
        BaseException: If an exception occurs during request processing, it is raised after logging it.
    """
    method = request.method
    path, is_handled_path = self.get_path(request)

    if not is_handled_path:
        return await call_next(request)

    # Track requests being processed
    FAST_API_REQUESTS_IN_PROGRESS_GAUGE.add(
        1, {"method": method, "path": path, "app_name": self.app_name}
    )
    FAST_API_REQUESTS_COUNTER.add(1, {"method": method, "path": path, "app_name": self.app_name})

    before_time = time.perf_counter()

    # Initialize status_code
    status_code = None

    try:
        response = await call_next(request)
    except Exception as e:
        FAST_API_EXCEPTION_COUNTER.add(
            1,
            {
                "method": method,
                "path": path,
                "exception_type": type(e).__name__,
                "app_name": self.app_name,
            },
        )
        raise e from None
    else:
        status_code = response.status_code
        after_time = time.perf_counter()

        # Record request processing time
        FAST_API_REQUESTS_PROCESSING_HISTOGRAM.record(
            after_time - before_time,
            {"method": method, "path": path, "app_name": self.app_name},
        )

    finally:
        FAST_API_RESPONSES_COUNTER.add(
            1,
            {
                "method": method,
                "path": path,
                "status_code": status_code,
                "app_name": self.app_name,
            },
        )
        FAST_API_REQUESTS_IN_PROGRESS_GAUGE.add(
            -1, {"method": method, "path": path, "app_name": self.app_name}
        )

    return response

get_path staticmethod #

get_path(request: Request) -> tuple[str, bool]

Attempts to match the request' route to a defined route.

Parameters:

  • request #

    (Request) –

    The HTTP request to check for a matching path.

Returns:

  • tuple[str, bool]

    Tuple[str, bool]: A tuple containing: - The matched path (str) from the request URL. - Boolean (True if the path was handled by one of the routes).

Source code in server/lomas_server/routes/middlewares.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
@staticmethod
def get_path(request: Request) -> tuple[str, bool]:
    """
    Attempts to match the request' route to a defined route.

    Args:
        request (Request): The HTTP request to check for a matching path.

    Returns:
        Tuple[str, bool]: A tuple containing:
            - The matched path (str) from the request URL.
            - Boolean (True if the path was handled by one of the routes).
    """
    for route in request.app.routes:
        match, _ = route.matches(request.scope)
        if match == Match.FULL:
            return route.path, True

    return request.url.path, False

Functions:

root async #

root() -> RedirectResponse

Redirect root endpoint to the state endpoint.

Returns:

  • JSONResponse ( RedirectResponse ) –

    The state of the server instance.

Source code in server/lomas_server/routes/routes_admin.py
41
42
43
44
45
46
47
48
@router.get("/")
async def root() -> RedirectResponse:
    """Redirect root endpoint to the state endpoint.

    Returns:
        JSONResponse: The state of the server instance.
    """
    return RedirectResponse(url="/state")

health_handler async #

health_handler() -> JSONResponse

HealthCheck endpoint: server alive.

Returns:

  • JSONResponse ( JSONResponse ) –

    "live"

Source code in server/lomas_server/routes/routes_admin.py
51
52
53
54
55
56
57
58
@router.get("/live")
async def health_handler() -> JSONResponse:
    """HealthCheck endpoint: server alive.

    Returns:
        JSONResponse: "live"
    """
    return JSONResponse(content={"status": "alive"})

status_handler async #

status_handler(user_id: UserId, request: Request, uid: UUID, response: Response) -> Job

Job status endpoint.

Parameters:

  • user_id #

    (UserId) –

    The user id.

  • request #

    (Request) –

    The raw request.

  • uid #

    (UUID) –

    The job's unique id.

  • response #

    (Response) –

    The job status response.

Raises:

Returns:

  • Job ( Job ) –

    The Job model for this uid.

Source code in server/lomas_server/routes/routes_admin.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@router.get("/status/{uid}", responses=SERVER_QUERY_ERROR_RESPONSES)
async def status_handler(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    uid: UUID,
    response: Response,
) -> Job:
    """Job status endpoint.

    Args:
        user_id (UserId): The user id.
        request (Request): The raw request.
        uid (UUID): The job's unique id.
        response (Response): The job status response.

    Raises:
        UnauthorizedAccessException: If the user does not have access to this job.
        HTTPException: If the job does not exist.

    Returns:
        Job: The Job model for this uid.
    """
    jobs = request.app.state.jobs
    if (job := jobs.get(str(uid))) is not None:
        if job.requested_by != user_id.name:
            raise UnauthorizedAccessException(f"{user_id.name} does not have access to job with uid {uid}.")

        if job.status == "failed":
            response.status_code = job.status_code

        if job.status == "complete":
            # Delete completed job from state once returned to user.
            del jobs[str(uid)]

        return job
    raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="This job does not exist.")

get_state async #

get_state(_: UserId) -> JSONResponse

Returns the current state dict of this server instance.

Parameters:

  • _ #

    (UserId) –

    A UserId object identifying the user.

Returns:

  • JSONResponse ( JSONResponse ) –

    The state of the server instance.

Source code in server/lomas_server/routes/routes_admin.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@router.get("/state", tags=["ADMIN_USER"])
async def get_state(
    _: Annotated[UserId, Security(get_user_id_from_authenticator, scopes=[Scopes.ADMIN])],
) -> JSONResponse:
    """Returns the current state dict of this server instance.

    Args:
        _ (UserId): A UserId object identifying the user.

    Returns:
        JSONResponse: The state of the server instance.
    """
    return JSONResponse(
        content={
            "state": "live",
        }
    )

get_server_config async #

get_server_config(_: UserId) -> ConfigResponse

Returns the config of this server instance.

Parameters:

  • _ #

    (UserId) –

    A UserId object identifying the user.

Returns:

Source code in server/lomas_server/routes/routes_admin.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@router.get(
    "/config",
    tags=["ADMIN_USER"],
)
async def get_server_config(
    _: Annotated[UserId, Security(get_user_id_from_authenticator, scopes=[Scopes.ADMIN])],
) -> ConfigResponse:
    """Returns the config of this server instance.

    Args:
        _ (UserId): A UserId object identifying the user.

    Returns:
        ConfigResponse: The server config.
    """
    return ConfigResponse()

get_dataset_metadata #

get_dataset_metadata(
    request: Request,
    user_id: UserId,
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> TableMetadata

Retrieves metadata for a given dataset.

Parameters:

  • request #

    (Request) –

    Raw request object

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • query_json #

    (LomasRequestModel, default: example_get_admin_db_data_body ) –

    A JSON object containing the dataset_name key for indicating the dataset. Defaults to example_get_admin_db_data_body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

Returns:

  • TableMetadata ( TableMetadata ) –

    The metadata object for the specified dataset_name.

Source code in server/lomas_server/routes/routes_admin.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@router.post(
    "/get_dataset_metadata",
    tags=["USER_METADATA"],
)
def get_dataset_metadata(
    request: Request,
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> TableMetadata:
    """
    Retrieves metadata for a given dataset.

    Args:
        request (Request): Raw request object
        user_id (UserId): A UserId object identifying the user.
        query_json (LomasRequestModel, optional): A JSON object containing
            the dataset_name key for indicating the dataset.
            Defaults to example_get_admin_db_data_body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.

    Returns:
        TableMetadata: The metadata object for the specified dataset_name.
    """
    app = request.app

    dataset_name = query_json.dataset_name

    if not app.state.admin_database.has_user_access_to_dataset(user_id.name, dataset_name):
        raise UnauthorizedAccessException(
            f"{user_id.name} does not have access to {dataset_name}.",
        )

    try:
        ds_metadata = app.state.admin_database.get_dataset_metadata(dataset_name)
    except KNOWN_EXCEPTIONS as e:
        raise e
    except Exception as e:
        raise InternalServerException(str(e)) from e
    return ds_metadata

get_dummy_dataset #

get_dummy_dataset(
    request: Request,
    user_id: UserId,
    query_json: GetDummyDataset = example_get_dummy_dataset_body,
) -> DummyDsResponse

Generates and returns a dummy dataset.

Parameters:

  • request #

    (Request) –

    Raw request object

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • query_json #

    (GetDummyDataset, default: example_get_dummy_dataset_body ) –

    A JSON object containing the following: - nb_rows (int, optional): The number of rows in the dummy dataset (default: 100). - seed (int, optional): The random seed for generating the dummy dataset (default: 42).

    Defaults to example_get_dummy_dataset_body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

Returns:

  • JSONResponse ( DummyDsResponse ) –

    a dict with the dataframe as a dict, the column types and the list of datetime columns.

Source code in server/lomas_server/routes/routes_admin.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@router.post(
    "/get_dummy_dataset",
    tags=["USER_DUMMY"],
)
def get_dummy_dataset(
    request: Request,
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    query_json: GetDummyDataset = example_get_dummy_dataset_body,
) -> DummyDsResponse:
    """
    Generates and returns a dummy dataset.

    Args:
        request (Request): Raw request object
        user_id (UserId): A UserId object identifying the user.
        query_json (GetDummyDataset, optional):
            A JSON object containing the following:
                - nb_rows (int, optional): The number of rows in the
                  dummy dataset (default: 100).
                - seed (int, optional): The random seed for generating
                  the dummy dataset (default: 42).

            Defaults to example_get_dummy_dataset_body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.

    Returns:
        JSONResponse: a dict with the dataframe as a dict, the column types
            and the list of datetime columns.
    """
    app = request.app
    dataset_name = query_json.dataset_name
    if not app.state.admin_database.has_user_access_to_dataset(user_id.name, dataset_name):
        raise UnauthorizedAccessException(
            f"{user_id.name} does not have access to {dataset_name}.",
        )

    try:
        ds_metadata = app.state.admin_database.get_dataset_metadata(dataset_name)
        dtypes = {col.name: to_pandas_dtype(col.datatype) for col in ds_metadata.columns}
        dummy_df = make_dummy_from_metadata(
            ds_metadata.to_dict(),
            query_json.dummy_nb_rows,
            query_json.dummy_seed,
        )

    except KNOWN_EXCEPTIONS as e:
        raise e
    except Exception as e:
        raise InternalServerException(str(e)) from e

    return DummyDsResponse(dtypes=dtypes, dummy_df=dummy_df)

get_initial_budget #

get_initial_budget(
    request: Request,
    user_id: UserId,
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> InitialBudgetResponse

Returns the initial budget for a user and dataset.

Parameters:

  • request #

    (Request) –

    Raw request object.

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • query_json #

    (LomasRequestModel, default: example_get_admin_db_data_body ) –

    A JSON object containing: - dataset_name (str): The name of the dataset.

    Defaults to example_get_admin_db_data_body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    The dataset does not exist.

  • UnauthorizedAccessException

    The user does not exist or the user does not have access to the dataset.

Returns: JSONResponse: a JSON object with: - initial_epsilon (float): initial epsilon budget. - initial_delta (float): initial delta budget.

Source code in server/lomas_server/routes/routes_admin.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
@router.post(
    "/get_initial_budget",
    tags=["USER_BUDGET"],
)
def get_initial_budget(
    request: Request,
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> InitialBudgetResponse:
    """
    Returns the initial budget for a user and dataset.

    Args:
        request (Request): Raw request object.
        user_id (UserId): A UserId object identifying the user.
        query_json (LomasRequestModel, optional): A JSON object containing:
            - dataset_name (str): The name of the dataset.

            Defaults to example_get_admin_db_data_body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: The dataset does not exist.
        UnauthorizedAccessException: The user does not exist or
            the user does not have access to the dataset.
    Returns:
        JSONResponse: a JSON object with:
            - initial_epsilon (float): initial epsilon budget.
            - initial_delta (float): initial delta budget.
    """
    app = request.app

    try:
        (
            initial_epsilon,
            initial_delta,
        ) = app.state.admin_database.get_initial_budget(user_id.name, query_json.dataset_name)
    except KNOWN_EXCEPTIONS as e:
        raise e
    except Exception as e:
        raise InternalServerException(str(e)) from e

    return InitialBudgetResponse(initial_epsilon=initial_epsilon, initial_delta=initial_delta)

get_total_spent_budget #

get_total_spent_budget(
    request: Request,
    user_id: UserId,
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> SpentBudgetResponse

Returns the spent budget for a user and dataset.

Parameters:

  • request #

    (Request) –

    Raw request object.

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • query_json #

    (LomasRequestModel, default: example_get_admin_db_data_body ) –

    A JSON object containing: - dataset_name (str): The name of the dataset.

    Defaults to example_get_admin_db_data_body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    The dataset does not exist.

  • UnauthorizedAccessException

    The user does not exist or the user does not have access to the dataset.

Returns: JSONResponse: a JSON object with: - total_spent_epsilon (float): total spent epsilon budget. - total_spent_delta (float): total spent delta budget.

Source code in server/lomas_server/routes/routes_admin.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
@router.post(
    "/get_total_spent_budget",
    tags=["USER_BUDGET"],
)
def get_total_spent_budget(
    request: Request,
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> SpentBudgetResponse:
    """
    Returns the spent budget for a user and dataset.

    Args:
        request (Request): Raw request object.
        user_id (UserId): A UserId object identifying the user.
        query_json (LomasRequestModel, optional): A JSON object containing:
            - dataset_name (str): The name of the dataset.

            Defaults to example_get_admin_db_data_body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: The dataset does not exist.
        UnauthorizedAccessException: The user does not exist or
            the user does not have access to the dataset.
    Returns:
        JSONResponse: a JSON object with:
            - total_spent_epsilon (float): total spent epsilon budget.
            - total_spent_delta (float): total spent delta budget.
    """
    app = request.app

    try:
        (
            total_spent_epsilon,
            total_spent_delta,
        ) = app.state.admin_database.get_total_spent_budget(user_id.name, query_json.dataset_name)
    except KNOWN_EXCEPTIONS as e:
        raise e
    except Exception as e:
        raise InternalServerException(str(e)) from e

    return SpentBudgetResponse(total_spent_epsilon=total_spent_epsilon, total_spent_delta=total_spent_delta)

get_remaining_budget #

get_remaining_budget(
    request: Request,
    user_id: UserId,
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> RemainingBudgetResponse

Returns the remaining budget for a user and dataset.

Parameters:

  • request #

    (Request) –

    Raw request object.

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • query_json #

    (LomasRequestModel, default: example_get_admin_db_data_body ) –

    A JSON object containing: - dataset_name (str): The name of the dataset.

    Defaults to example_get_admin_db_data_body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    The dataset does not exist.

  • UnauthorizedAccessException

    The user does not exist or the user does not have access to the dataset.

Returns: JSONResponse: a JSON object with: - remaining_epsilon (float): remaining epsilon budget. - remaining_delta (float): remaining delta budget.

Source code in server/lomas_server/routes/routes_admin.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
@router.post(
    "/get_remaining_budget",
    tags=["USER_BUDGET"],
)
def get_remaining_budget(
    request: Request,
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> RemainingBudgetResponse:
    """
    Returns the remaining budget for a user and dataset.

    Args:
        request (Request): Raw request object.
        user_id (UserId): A UserId object identifying the user.
        query_json (LomasRequestModel, optional): A JSON object containing:
            - dataset_name (str): The name of the dataset.

            Defaults to example_get_admin_db_data_body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: The dataset does not exist.
        UnauthorizedAccessException: The user does not exist or
            the user does not have access to the dataset.
    Returns:
        JSONResponse: a JSON object with:
            - remaining_epsilon (float): remaining epsilon budget.
            - remaining_delta (float): remaining delta budget.
    """
    app = request.app

    try:
        rem_epsilon, rem_delta = app.state.admin_database.get_remaining_budget(
            user_id.name, query_json.dataset_name
        )
    except KNOWN_EXCEPTIONS as e:
        raise e
    except Exception as e:
        raise InternalServerException(str(e)) from e

    return RemainingBudgetResponse(remaining_epsilon=rem_epsilon, remaining_delta=rem_delta)

get_user_previous_queries #

get_user_previous_queries(
    request: Request,
    user_id: UserId,
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> JSONResponse

Returns the query history of a user on a specific dataset.

Parameters:

  • request #

    (Request) –

    Raw request object.

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • query_json #

    (LomasRequestModel, default: example_get_admin_db_data_body ) –

    A JSON object containing: - dataset_name (str): The name of the dataset.

    Defaults to example_get_admin_db_data_body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    The dataset does not exist.

  • UnauthorizedAccessException

    The user does not exist or the user does not have access to the dataset.

Returns:

  • JSONResponse ( JSONResponse ) –

    A JSON object containing: - previous_queries (list[dict]): a list of dictionaries containing the previous queries.

Source code in server/lomas_server/routes/routes_admin.py
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
@router.post(
    "/get_previous_queries",
    tags=["USER_BUDGET"],
)
def get_user_previous_queries(
    request: Request,
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    query_json: LomasRequestModel = example_get_admin_db_data_body,
) -> JSONResponse:
    """
    Returns the query history of a user on a specific dataset.

    Args:
        request (Request): Raw request object.
        user_id (UserId): A UserId object identifying the user.
        query_json (LomasRequestModel, optional): A JSON object containing:
            - dataset_name (str): The name of the dataset.

            Defaults to example_get_admin_db_data_body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: The dataset does not exist.
        UnauthorizedAccessException: The user does not exist or
            the user does not have access to the dataset.

    Returns:
        JSONResponse: A JSON object containing:
            - previous_queries (list[dict]): a list of dictionaries
              containing the previous queries.
    """
    app = request.app

    try:
        previous_queries = app.state.admin_database.get_user_previous_queries(
            user_id.name, query_json.dataset_name
        )  # TODO 359 improve on that and return models.
    except KNOWN_EXCEPTIONS as e:
        raise e
    except Exception as e:
        raise InternalServerException(str(e)) from e
    return JSONResponse(content={"previous_queries": previous_queries})

add_user #

add_user(request: Request, _: UserId, new_user: User) -> None

Adds a new user with an associated budget for a given dataset.

Parameters:

  • new_user #

    (User) –

    User to add

Source code in server/lomas_server/routes/routes_admin.py
449
450
451
452
453
454
455
456
457
458
459
460
461
@router.post("/users")
def add_user(
    request: Request,
    _: Annotated[UserId, Security(get_user_id_from_authenticator, scopes=[Scopes.ADMIN])],
    new_user: User,
) -> None:
    """Adds a new user with an associated budget for a given dataset.

    Args:
        new_user (User): User to add
    """
    db: LocalAdminDatabase = request.app.state.admin_database
    return db.add_user(new_user.id.name, new_user.id.email)

add_users_yaml #

add_users_yaml(
    request: Request, _: UserId, file: UploadFile, clean: bool = False
) -> None

Add all users from a yaml file.

Parameters:

  • file #

    (Path) –

    a path to the YAML file location

  • clean #

    (bool, default: False ) –

    boolean flag True if drop current user collection False if keep current user collection

Source code in server/lomas_server/routes/routes_admin.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
@router.post("/usersfile")
def add_users_yaml(
    request: Request,
    _: Annotated[UserId, Security(get_user_id_from_authenticator, scopes=[Scopes.ADMIN])],
    file: UploadFile,
    clean: bool = False,
) -> None:
    """Add all users from a yaml file.

    Args:
        file (Path): a path to the YAML file location
        clean (bool): boolean flag
            True if drop current user collection
            False if keep current user collection
    """
    db: LocalAdminDatabase = request.app.state.admin_database
    return db.add_users_via_yaml(file.file, clean=clean)

delete_user #

delete_user(request: Request, _: UserId, username: str) -> None

Deletes the lomas user.

Parameters:

  • username #

    (str) –

    The name of the user to be deleted.

Source code in server/lomas_server/routes/routes_admin.py
483
484
485
486
487
488
489
490
491
492
493
494
495
@router.delete("/users/{username}")
def delete_user(
    request: Request,
    _: Annotated[UserId, Security(get_user_id_from_authenticator, scopes=[Scopes.ADMIN])],
    username: str,
) -> None:
    """Deletes the lomas user.

    Args:
        username (str): The name of the user to be deleted.
    """
    db: LocalAdminDatabase = request.app.state.admin_database
    return db.del_user(username)

delete_collection #

delete_collection(request: Request, _: UserId, collection_name: str) -> None

Drops the given collection from the administration database.

Parameters:

  • collection_name #

    (str) –

    The collection to drop.

Source code in server/lomas_server/routes/routes_admin.py
498
499
500
501
502
503
504
505
506
507
508
509
510
@router.delete("/collections/{collection_name}")
def delete_collection(
    request: Request,
    _: Annotated[UserId, Security(get_user_id_from_authenticator, scopes=[Scopes.ADMIN])],
    collection_name: str,
) -> None:
    """Drops the given collection from the administration database.

    Args:
        collection_name (str): The collection to drop.
    """
    db: LocalAdminDatabase = request.app.state.admin_database
    return db.drop_collection(collection_name)

Functions:

smartnoise_sql_handler async #

smartnoise_sql_handler(
    user_id: UserId, request: Request, smartnoise_sql_query: SmartnoiseSQLQueryModel
) -> Job

Handles queries for the SmartNoiseSQL library.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object

  • smartnoise_sql_query #

    (SmartnoiseSQLQueryModel) –

    The smartnoise_sql query body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    If there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a QueryResponse containing a SmartnoiseSQLQueryResult.

Source code in server/lomas_server/routes/routes_dp.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@router.post(
    "/smartnoise_sql_query",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_QUERY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def smartnoise_sql_handler(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    smartnoise_sql_query: SmartnoiseSQLQueryModel,
) -> Job:
    """
    Handles queries for the SmartNoiseSQL library.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object
        smartnoise_sql_query (SmartnoiseSQLQueryModel): The smartnoise_sql query body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: If there is not enough budget or the dataset
            does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a QueryResponse containing a SmartnoiseSQLQueryResult.
    """
    return await handle_query_to_job(request, smartnoise_sql_query, user_id.name, DPLibraries.SMARTNOISE_SQL)

dummy_smartnoise_sql_handler async #

dummy_smartnoise_sql_handler(
    user_id: UserId, request: Request, smartnoise_sql_query: SmartnoiseSQLDummyQueryModel
) -> Job

Handles queries on dummy datasets for the SmartNoiseSQL library.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object

  • smartnoise_sql_query #

    (SmartnoiseSQLDummyQueryModel) –

    The smartnoise_sql query body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    If there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a QueryResponse containing a SmartnoiseSQLQueryResult.

Source code in server/lomas_server/routes/routes_dp.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@router.post(
    "/dummy_smartnoise_sql_query",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_DUMMY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def dummy_smartnoise_sql_handler(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    smartnoise_sql_query: SmartnoiseSQLDummyQueryModel,
) -> Job:
    """
    Handles queries on dummy datasets for the SmartNoiseSQL library.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object
        smartnoise_sql_query (SmartnoiseSQLDummyQueryModel):
            The smartnoise_sql query body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: If there is not enough budget or the dataset
            does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a QueryResponse containing a SmartnoiseSQLQueryResult.
    """
    return await handle_query_to_job(request, smartnoise_sql_query, user_id.name, DPLibraries.SMARTNOISE_SQL)

estimate_smartnoise_sql_cost async #

estimate_smartnoise_sql_cost(
    user_id: UserId, request: Request, smartnoise_sql_query: SmartnoiseSQLRequestModel
) -> Job

Estimates the privacy loss budget cost of a SmartNoiseSQL query.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object

  • smartnoise_sql_query #

    (SmartnoiseSQLRequestModel) –

    The smartnoise_sql request body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    The dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a CostResponse containing the privacy loss cost of the input query.

Source code in server/lomas_server/routes/routes_dp.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
@router.post(
    "/estimate_smartnoise_sql_cost",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_QUERY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def estimate_smartnoise_sql_cost(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    smartnoise_sql_query: SmartnoiseSQLRequestModel,
) -> Job:
    """
    Estimates the privacy loss budget cost of a SmartNoiseSQL query.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object
        smartnoise_sql_query (SmartnoiseSQLRequestModel):
            The smartnoise_sql request body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: The dataset does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a CostResponse containing the privacy loss cost of the input query.
    """
    return await handle_query_to_job(request, smartnoise_sql_query, user_id.name, DPLibraries.SMARTNOISE_SQL)

smartnoise_synth_handler async #

smartnoise_synth_handler(
    user_id: UserId, request: Request, smartnoise_synth_query: SmartnoiseSynthQueryModel
) -> Job

Handles queries for the SmartNoiseSynth library.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object

  • smartnoise_synth_query #

    (SmartnoiseSynthQueryModel) –

    The smartnoise_synth query body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    If there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a QueryResponse containing a SmartnoiseSynthModel

  • Job

    or SmartnoiseSynthSamples.

Source code in server/lomas_server/routes/routes_dp.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@router.post(
    "/smartnoise_synth_query",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_QUERY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def smartnoise_synth_handler(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    smartnoise_synth_query: SmartnoiseSynthQueryModel,
) -> Job:
    """
    Handles queries for the SmartNoiseSynth library.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object
        smartnoise_synth_query (SmartnoiseSynthQueryModel):
            The smartnoise_synth query body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: If there is not enough budget or the dataset
            does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a QueryResponse containing a SmartnoiseSynthModel
        or SmartnoiseSynthSamples.
    """
    return await handle_query_to_job(
        request, smartnoise_synth_query, user_id.name, DPLibraries.SMARTNOISE_SYNTH
    )

dummy_smartnoise_synth_handler async #

dummy_smartnoise_synth_handler(
    user_id: UserId,
    request: Request,
    smartnoise_synth_query: SmartnoiseSynthDummyQueryModel,
) -> Job

Handles queries on dummy datasets for the SmartNoiseSynth library.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object

  • smartnoise_synth_query #

    (SmartnoiseSynthDummyQueryModel) –

    The smartnoise_synth query body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    If there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a QueryResponse containing a SmartnoiseSynthModel

  • Job

    or SmartnoiseSynthSamples.

Source code in server/lomas_server/routes/routes_dp.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
@router.post(
    "/dummy_smartnoise_synth_query",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_QUERY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def dummy_smartnoise_synth_handler(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    smartnoise_synth_query: SmartnoiseSynthDummyQueryModel,
) -> Job:
    """
    Handles queries on dummy datasets for the SmartNoiseSynth library.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object
        smartnoise_synth_query (SmartnoiseSynthDummyQueryModel):
            The smartnoise_synth query body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: If there is not enough budget or the dataset
            does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a QueryResponse containing a SmartnoiseSynthModel
        or SmartnoiseSynthSamples.
    """
    return await handle_query_to_job(
        request, smartnoise_synth_query, user_id.name, DPLibraries.SMARTNOISE_SYNTH
    )

estimate_smartnoise_synth_cost async #

estimate_smartnoise_synth_cost(
    user_id: UserId, request: Request, smartnoise_synth_query: SmartnoiseSynthRequestModel
) -> Job

Computes the privacy loss budget cost of a SmartNoiseSynth query.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object

  • smartnoise_synth_query #

    (SmartnoiseSynthRequestModel) –

    The smartnoise_synth query body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    If there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a CostResponse containing the privacy loss cost of the input query.

Source code in server/lomas_server/routes/routes_dp.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
@router.post(
    "/estimate_smartnoise_synth_cost",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_QUERY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def estimate_smartnoise_synth_cost(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    smartnoise_synth_query: SmartnoiseSynthRequestModel,
) -> Job:
    """
    Computes the privacy loss budget cost of a SmartNoiseSynth query.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object
        smartnoise_synth_query (SmartnoiseSynthRequestModel):
            The smartnoise_synth query body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: If there is not enough budget or the dataset
            does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a CostResponse containing the privacy loss cost of the input query.
    """
    return await handle_query_to_job(
        request, smartnoise_synth_query, user_id.name, DPLibraries.SMARTNOISE_SYNTH
    )

opendp_query_handler async #

opendp_query_handler(
    user_id: UserId, request: Request, opendp_query: OpenDPQueryModel
) -> Job

Handles queries for the OpenDP Library.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object.

  • opendp_query #

    (OpenDPQueryModel) –

    The opendp query object.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    The pipeline does not contain a "measurement", there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a QueryResponse containing an OpenDPQueryResult.

Source code in server/lomas_server/routes/routes_dp.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
@router.post(
    "/opendp_query",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_QUERY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def opendp_query_handler(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    opendp_query: OpenDPQueryModel,
) -> Job:
    """
    Handles queries for the OpenDP Library.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object.
        opendp_query (OpenDPQueryModel): The opendp query object.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: The pipeline does not contain a "measurement",
            there is not enough budget or the dataset does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a QueryResponse containing an OpenDPQueryResult.
    """
    return await handle_query_to_job(request, opendp_query, user_id.name, DPLibraries.OPENDP)

dummy_opendp_query_handler async #

dummy_opendp_query_handler(
    user_id: UserId, request: Request, opendp_query: OpenDPDummyQueryModel
) -> Job

Handles queries on dummy datasets for the OpenDP library.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object.

  • opendp_query #

    (OpenDPQueryModel) –

    The opendp query object.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    The pipeline does not contain a "measurement", there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a QueryResponse containing an OpenDPQueryResult.

Source code in server/lomas_server/routes/routes_dp.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
@router.post(
    "/dummy_opendp_query",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_DUMMY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def dummy_opendp_query_handler(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    opendp_query: OpenDPDummyQueryModel,
) -> Job:
    """
    Handles queries on dummy datasets for the OpenDP library.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object.
        opendp_query (OpenDPQueryModel): The opendp query object.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: The pipeline does not contain a "measurement",
            there is not enough budget or the dataset does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a QueryResponse containing an OpenDPQueryResult.
    """
    return await handle_query_to_job(request, opendp_query, user_id.name, DPLibraries.OPENDP)

estimate_opendp_cost async #

estimate_opendp_cost(
    user_id: UserId, request: Request, opendp_query: OpenDPRequestModel
) -> Job

Estimates the privacy loss budget cost of an OpenDP query.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object.

  • opendp_query #

    (OpenDPRequestModel) –

    The opendp query object.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    The pipeline does not contain a "measurement", there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a CostResponse containing the privacy loss cost of the input query.

Source code in server/lomas_server/routes/routes_dp.py
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
@router.post(
    "/estimate_opendp_cost",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_QUERY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def estimate_opendp_cost(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    opendp_query: OpenDPRequestModel,
) -> Job:
    """
    Estimates the privacy loss budget cost of an OpenDP query.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object.
        opendp_query (OpenDPRequestModel): The opendp query object.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: The pipeline does not contain a "measurement",
            there is not enough budget or the dataset does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a CostResponse containing the privacy loss cost of the input query.
    """
    return await handle_query_to_job(request, opendp_query, user_id.name, DPLibraries.OPENDP)

diffprivlib_query_handler async #

diffprivlib_query_handler(
    user_id: UserId, request: Request, diffprivlib_query: DiffPrivLibQueryModel
) -> Job

Handles queries for the DiffPrivLib Library.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object

  • diffprivlib_query #

    (DiffPrivLibQueryModel) –

    The diffprivlib query body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    If there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a QueryResponse containing a DiffPrivLibQueryResult.

Source code in server/lomas_server/routes/routes_dp.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
@router.post(
    "/diffprivlib_query",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_QUERY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def diffprivlib_query_handler(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    diffprivlib_query: DiffPrivLibQueryModel,
) -> Job:
    """
    Handles queries for the DiffPrivLib Library.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object
        diffprivlib_query (DiffPrivLibQueryModel): The diffprivlib query body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: If there is not enough budget or the dataset
            does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a QueryResponse containing a DiffPrivLibQueryResult.
    """
    return await handle_query_to_job(request, diffprivlib_query, user_id.name, DPLibraries.DIFFPRIVLIB)

dummy_diffprivlib_query_handler async #

dummy_diffprivlib_query_handler(
    user_id: UserId, request: Request, query_json: DiffPrivLibDummyQueryModel
) -> Job

Handles queries on dummy datasets for the DiffPrivLib library.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object

  • query_json #

    (DiffPrivLibDummyQueryModel) –

    The diffprivlib query body.

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    If there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a QueryResponse containing a DiffPrivLibQueryResult.

Source code in server/lomas_server/routes/routes_dp.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
@router.post(
    "/dummy_diffprivlib_query",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_DUMMY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def dummy_diffprivlib_query_handler(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    query_json: DiffPrivLibDummyQueryModel,
) -> Job:
    """
    Handles queries on dummy datasets for the DiffPrivLib library.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object
        query_json (DiffPrivLibDummyQueryModel): The diffprivlib query body.

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: If there is not enough budget or the dataset
            does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a QueryResponse containing a DiffPrivLibQueryResult.
    """
    return await handle_query_to_job(request, query_json, user_id.name, DPLibraries.DIFFPRIVLIB)

estimate_diffprivlib_cost async #

estimate_diffprivlib_cost(
    user_id: UserId, request: Request, diffprivlib_query: DiffPrivLibRequestModel
) -> Job

Estimates the privacy loss budget cost of an DiffPrivLib query.

Parameters:

  • user_id #

    (UserId) –

    A UserId object identifying the user.

  • request #

    (Request) –

    Raw request object

  • diffprivlib_query #

    (DiffPrivLibRequestModel) –

    The diffprivlib query body. A JSON object containing the following: - pipeline: The DiffPrivLib pipeline for the query. - feature_columns: the list of feature column to train - target_columns: the list of target column to predict - test_size: proportion of the test set - test_train_split_seed: seed for the random train test split, - imputer_strategy: imputation strategy

    Defaults to Body(example_dummy_diffprivlib).
    

Raises:

  • ExternalLibraryException

    For exceptions from libraries external to this package.

  • InternalServerException

    For any other unforseen exceptions.

  • InvalidQueryException

    If there is not enough budget or the dataset does not exist.

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    a scheduled Job resulting in a CostResponse containing the privacy loss cost of the input query.

Source code in server/lomas_server/routes/routes_dp.py
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
@router.post(
    "/estimate_diffprivlib_cost",
    responses=SERVER_QUERY_ERROR_RESPONSES,
    tags=["USER_QUERY"],
    status_code=status.HTTP_202_ACCEPTED,
)
async def estimate_diffprivlib_cost(
    user_id: Annotated[UserId, Security(get_user_id_from_authenticator)],
    request: Request,
    diffprivlib_query: DiffPrivLibRequestModel,
) -> Job:
    """
    Estimates the privacy loss budget cost of an DiffPrivLib query.

    \f
    Args:
        user_id (UserId): A UserId object identifying the user.
        request (Request): Raw request object
        diffprivlib_query (DiffPrivLibRequestModel): The diffprivlib query body.\
            A JSON object containing the following:
                - pipeline: The DiffPrivLib pipeline for the query.
                - feature_columns: the list of feature column to train
                - target_columns: the list of target column to predict
                - test_size: proportion of the test set
                - test_train_split_seed: seed for the random train test split,
                - imputer_strategy: imputation strategy

                Defaults to Body(example_dummy_diffprivlib).

    Raises:
        ExternalLibraryException: For exceptions from libraries
            external to this package.
        InternalServerException: For any other unforseen exceptions.
        InvalidQueryException: If there is not enough budget or the dataset
            does not exist.
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: a scheduled Job resulting in a CostResponse containing the privacy loss cost of the input query.
    """
    return await handle_query_to_job(request, diffprivlib_query, user_id.name, DPLibraries.DIFFPRIVLIB)

Functions:

process_response async #

process_response(
    queue: Queue, cls: type[QueryResponse | CostResponse], jobs: dict[UUID, Job]
) -> None

Process responses queue into Jobs.

Source code in server/lomas_server/routes/utils.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
async def process_response(
    queue: aio_pika.Queue, cls: type[QueryResponse | CostResponse], jobs: dict[UUID, Job]
) -> None:
    """Process responses queue into Jobs."""
    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process(ignore_processed=True):
                if message.correlation_id not in jobs:
                    await message.reject(requeue=True)
                else:
                    await message.ack()

                    message_body = message.body.decode()
                    match message.headers:
                        case {"type": "exception", "status_code": status_code}:
                            jobs[
                                message.correlation_id
                            ].error = LomasServerExceptionTypeAdapter.validate_json(message_body)
                            jobs[message.correlation_id].status = "failed"
                            jobs[message.correlation_id].result = None
                            jobs[message.correlation_id].status_code = status_code
                        case _:
                            jobs[message.correlation_id].result = cls.model_validate_json(message_body)
                            jobs[message.correlation_id].status = "complete"

rabbitmq_connect_queue async #

rabbitmq_connect_queue(
    config: Config, reconnect_interval: int = 10, timeout: int = 120
) -> RobustConnection

Attempt with retries to connect to the queue.

Source code in server/lomas_server/routes/utils.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
async def rabbitmq_connect_queue(
    config: Config, reconnect_interval: int = 10, timeout: int = 120
) -> aio_pika.RobustConnection:
    """Attempt with retries to connect to the queue."""
    try:
        async with asyncio.timeout(timeout):
            connection = await aio_pika.connect_robust(
                str(config.amqp.dsn),
                fail_fast=False,
                reconnect_interval=reconnect_interval,
            )
            return connection
    except TimeoutError:
        logger.error(f"Couldn't connect to queue {config.amqp.base_url} in time")
        sys.exit(Status.EX_UNAVAILABLE)

rabbitmq_ctx async #

rabbitmq_ctx(app: FastAPI) -> AsyncIterator[None]

RabbitMQ queue context to connect and register callbacks.

Source code in server/lomas_server/routes/utils.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@asynccontextmanager
async def rabbitmq_ctx(app: FastAPI) -> AsyncIterator[None]:
    """RabbitMQ queue context to connect and register callbacks."""
    config = Config()

    connection = await rabbitmq_connect_queue(config)
    channel = await connection.channel()
    background_tasks = set()  # Avoid dangling asyncio.Task by storing them here

    await channel.declare_queue("task_queue", durable=True)
    app.state.task_queue_channel = channel
    queue = await channel.declare_queue("task_response", durable=True)
    tasks_response_task = asyncio.create_task(process_response(queue, QueryResponse, app.state.jobs))
    background_tasks.add(tasks_response_task)
    tasks_response_task.add_done_callback(background_tasks.discard)

    await channel.declare_queue("cost_queue", durable=True)
    app.state.cost_queue_channel = channel
    queue = await channel.declare_queue("cost_response", durable=True)
    cost_response_task = asyncio.create_task(process_response(queue, CostResponse, app.state.jobs))
    background_tasks.add(cost_response_task)
    cost_response_task.add_done_callback(background_tasks.discard)

    await channel.declare_queue("dummy_queue", durable=True)
    app.state.dummy_queue_channel = channel
    queue = await channel.declare_queue("dummy_response", durable=True)
    dummy_response_task = asyncio.create_task(process_response(queue, QueryResponse, app.state.jobs))
    background_tasks.add(dummy_response_task)
    dummy_response_task.add_done_callback(background_tasks.discard)

    rpc = await RPC.create(channel, durable=True)
    await rpc.register(
        "get_and_set_may_user_query", app.state.admin_database.get_and_set_may_user_query, durable=True
    )
    await rpc.register("set_may_user_query", app.state.admin_database.set_may_user_query, durable=True)
    await rpc.register("get_remaining_budget", app.state.admin_database.get_remaining_budget, durable=True)
    await rpc.register("update_budget", app.state.admin_database.update_budget, durable=True)
    await rpc.register("save_query", app.state.admin_database.save_query, durable=True)
    await rpc.register("get_dataset_metadata", app.state.admin_database.get_dataset_metadata, durable=True)

    yield  # app is handling requests

    await connection.close()

timing_protection #

timing_protection(func)

Adds delays to requests response to protect against timing attack.

Source code in server/lomas_server/routes/utils.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def timing_protection(func):  # type: ignore[no-untyped-def]
    """Adds delays to requests response to protect against timing attack."""

    @wraps(func)
    def wrapper(*args, **kwargs):  # type: ignore[no-untyped-def]
        config = Config()

        start_time = time.time()
        response = func(*args, **kwargs)
        process_time = time.time() - start_time

        match config.server.time_attack.method:
            case TimeAttackMethod.STALL:
                # Slows to a minimum response time defined by magnitude
                if process_time < config.server.time_attack.magnitude:
                    time.sleep(config.server.time_attack.magnitude - process_time)
            case TimeAttackMethod.JITTER:
                # Adds some time between 0 and magnitude secs
                time.sleep(config.server.time_attack.magnitude * random.uniform(0, 1))
        return response

    return wrapper

get_user_id_from_authenticator #

get_user_id_from_authenticator(
    request: Request,
    security_scopes: SecurityScopes,
    auth_creds: HTTPAuthorizationCredentials,
) -> UserId

Extracts the authenticator from the app state and calls its get_user_id method.

Also adds the user_name to the request state to annotate the telemetry request span.

Parameters:

  • request #

    (Request) –

    The request to access the app and state.

  • security_scopes #

    (SecurityScopes) –

    The required scopes for the endpoint.

  • auth_creds #

    (Annotated[HTTPAuthorizationCredentials, Depends) –

    The HTTP bearer token.

Returns:

  • UserId ( UserId ) –

    A UserId instance extracted from the token.

Source code in server/lomas_server/routes/utils.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def get_user_id_from_authenticator(
    request: Request,
    security_scopes: SecurityScopes,
    auth_creds: Annotated[HTTPAuthorizationCredentials, Depends(HTTPBearer())],
) -> UserId:
    """Extracts the authenticator from the app state and calls its get_user_id method.

    Also adds the user_name to the request state to annotate the telemetry request span.

    Args:
        request (Request): The request to access the app and state.
        security_scopes (SecurityScopes): The required scopes for the endpoint.
        auth_creds (Annotated[HTTPAuthorizationCredentials, Depends): The HTTP bearer token.

    Returns:
        UserId: A UserId instance extracted from the token.
    """
    # Bootstrap initialization
    if (bootstrap_cred := request.app.state.bootstrap) is not None:
        match auth_creds:
            case HTTPAuthorizationCredentials(scheme="Bearer") if auth_creds.credentials == bootstrap_cred:
                logger.warning("Bootstrap User Bypass")
                user_id = UserId(name="bootstrap", email="boot@strap.com")
                request.state.user_name = user_id.name
                return user_id
            case _:
                pass

    user_id = get_user_id(request.app.state.authenticator, security_scopes, auth_creds.credentials)
    request.state.user_name = user_id.name
    # This raises an exception if authz fails
    authorize_user(user_id, request.app.state.admin_database, security_scopes)

    return user_id

get_dataset_credentials #

Search the list of private database credentials and.

returns the one that matches the database type and credentials name.

Parameters:

Raises:

Returns:

Source code in server/lomas_server/routes/utils.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def get_dataset_credentials(
    private_db_credentials: dict[int, PrivateDBCredentials],
    db_type: PrivateDatabaseType,
    credentials_name: str,
) -> PrivateDBCredentials:
    """
    Search the list of private database credentials and.

    returns the one that matches the database type and
    credentials name.

    Args:
        private_db_credentials (Sequence[PrivateDBCredentials]):\
            The list of private database credentials.
        db_type (PrivateDatabaseType): The type of the database.

    Raises:
        InternalServerException: If the credentials are not found.

    Returns:
        PrivateDBCredentials: The matching credentials.
    """
    if db_type == PrivateDatabaseType.S3:
        for c in private_db_credentials.values():
            if isinstance(c, S3CredentialsConfig) and (credentials_name == c.credentials_name):
                return c

    raise InternalServerException(
        "Could not find credentials for private dataset. Please contact server administrator."
    )

handle_query_to_job async #

Submit Job to handles queries on private, dummy and cost datasets on a worker.

Parameters:

Raises:

  • UnauthorizedAccessException

    A query is already ongoing for this user, the user does not exist or does not have access to the dataset.

Returns:

  • Job ( Job ) –

    A scheduled Job resulting in a QueryResponse containing the result of the query (specific to the library) as well as the cost of the query. or a CostResponse containing the epsilon, delta and privacy-loss budget cost for the request.

Source code in server/lomas_server/routes/utils.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
@timing_protection
async def handle_query_to_job(
    request: Request,
    query: DummyQueryModel | QueryModel | LomasRequestModel,
    user_name: str,
    dp_library: DPLibraries,
) -> Job:
    """
    Submit Job to handles queries on private, dummy and cost datasets on a worker.

    Args:
        request (Request): Raw request object
        query (DummyQueryModel|QueryModel|LomasRequestModel): A Request or Query to be scheduled
        user_name (str): The user name
        dp_library (DPLibraries): Name of the DP library to use for the request

    Raises:
        UnauthorizedAccessException: A query is already ongoing for this user,
            the user does not exist or does not have access to the dataset.

    Returns:
        Job: A scheduled Job resulting in a QueryResponse containing the result of the query
            (specific to the library) as well as the cost of the query.
            or a CostResponse containing the epsilon, delta and privacy-loss budget cost for the request.
    """
    app = request.app
    admin_database = app.state.admin_database
    private_db_credentials = app.state.private_db_credentials

    dataset_name = query.dataset_name

    if not admin_database.has_user_access_to_dataset(user_name, dataset_name):
        raise UnauthorizedAccessException(f"{user_name} does not have access to {dataset_name}.")

    ds_access = admin_database.get_dataset(dataset_name).dataset_access
    ds_metadata = admin_database.get_dataset_metadata(dataset_name)
    data_connector = None

    match ds_access:
        case DSPathAccess():
            match path := ds_access.path:
                case Path():
                    data_connector = PathConnector(metadata=ds_metadata, dataset_path=path.resolve())
                case _:
                    data_connector = PathConnector(metadata=ds_metadata, dataset_path=path)
        case DSS3Access():
            credentials = get_dataset_credentials(
                private_db_credentials,
                ds_access.database_type,
                ds_access.credentials_name,
            )

            if not isinstance(credentials, S3CredentialsConfig):
                raise InternalServerException("Could not get correct credentials")

            ds_access = DSS3Access.model_validate(ds_access)
            ds_access.access_key_id = credentials.access_key_id
            ds_access.secret_access_key = credentials.secret_access_key

            data_connector = S3Connector(metadata=ds_metadata, credentials=ds_access)
        case _:
            raise InternalServerException(f"Unknown database type: {ds_access.database_type}")

    match query:
        case DummyQueryModel():
            queue_name = "dummy_queue"
        case QueryModel():
            queue_name = "task_queue"
        case LomasRequestModel():
            queue_name = "cost_queue"

    new_task = Job(requested_by=user_name)

    app.state.jobs[str(new_task.uid)] = new_task

    await app.state.cost_queue_channel.default_exchange.publish(
        aio_pika.Message(
            body=f"{user_name}λ{dp_library}λ{data_connector.model_dump_json()}λ{query.model_dump_json()}".encode(),
            correlation_id=new_task.uid,
        ),
        routing_key=queue_name,
    )

    return new_task

Classes:

Functions:

FilterOutLiveSuccess #

Filter out INFO logs: GET /api/live HTTP/1.1 200 OK.

uvicorn_serve #

uvicorn_serve() -> None

Start the ASGI server for lomas.

Source code in server/lomas_server/uvicorn_serve.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def uvicorn_serve() -> None:
    """Start the ASGI server for lomas."""
    config = Config()

    log_config = LOGGING_CONFIG
    log_config["handlers"]["access"]["filters"] = [FilterOutLiveSuccess()]

    uvicorn.run(
        "lomas_server.app:app",
        host=config.server.host_ip,
        port=config.server.host_port,
        log_config=log_config,
        log_level=config.server.log_level,
        workers=1,
        reload=config.server.reload,
        forwarded_allow_ips=config.server.forwarded_allow_ips,
        root_path=config.server.root_path.removeprefix("/"),
        use_colors=True,
    )

Classes:

Functions:

TerminateTaskGroup #


              flowchart TD
              lomas_server.worker.TerminateTaskGroup[TerminateTaskGroup]

              

              click lomas_server.worker.TerminateTaskGroup href "" "lomas_server.worker.TerminateTaskGroup"
            

Exception raised to terminate a task group.

handle_exceptions #

handle_exceptions(exc: BaseException) -> JSONResponse

Transform KNOWN_EXCEPTIONS into a status_code and message for serialization.

In case of unkown exception, wraps it up as if it were an InternalServerException. In case of internal exception, the error message is forwarded to avoid potentially disclosing sensitive information.

Source code in server/lomas_server/worker.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def handle_exceptions(exc: BaseException) -> JSONResponse:
    """Transform KNOWN_EXCEPTIONS into a status_code and message for serialization.

    In case of unkown exception, wraps it up as if it were an InternalServerException.
    In case of internal exception, the error message is forwarded to avoid potentially
    disclosing sensitive information.
    """
    logger.error(exc)
    match exc:
        case ExternalLibraryException():
            return JSONResponse(
                status_code=status.HTTP_422_UNPROCESSABLE_CONTENT,
                content=jsonable_encoder(
                    ExternalLibraryExceptionModel(message=exc.error_message, library=exc.library)
                ),
            )
        case InternalServerException():
            return JSONResponse(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                content=jsonable_encoder(InternalServerExceptionModel()),
            )
        case InvalidQueryException():
            return JSONResponse(
                status_code=status.HTTP_400_BAD_REQUEST,
                content=jsonable_encoder(InvalidQueryExceptionModel(message=exc.error_message)),
            )
        case UnauthorizedAccessException():
            return JSONResponse(
                status_code=status.HTTP_403_FORBIDDEN,
                content=jsonable_encoder(UnauthorizedAccessExceptionModel(message=exc.error_message)),
            )
        case _:
            return JSONResponse(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                content=jsonable_encoder(InternalServerExceptionModel()),
            )

handle_cost_query async #

handle_cost_query(admin_database: Proxy, body: bytes) -> CostResponse | tuple[bytes, int]

Handle Cost query into CostResponse.

Source code in server/lomas_server/worker.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
async def handle_cost_query(admin_database: Proxy, body: bytes) -> CostResponse | tuple[bytes, int]:
    """Handle Cost query into CostResponse."""
    start_sec = time.time()
    message = body.decode()
    _, dp_library, data_connector_str, request_model_str = message.split("λ", 3)

    data_connector = ConnectorUnionTA.validate_json(data_connector_str)

    dp_querier: DPQuerier
    match dp_library:
        case DPLibraries.SMARTNOISE_SQL:
            request_model = SmartnoiseSQLRequestModel.model_validate_json(request_model_str)
            dp_querier = SmartnoiseSQLQuerier(data_connector, admin_database)

        case DPLibraries.OPENDP:
            request_model = OpenDPRequestModel.model_validate_json(request_model_str)
            dp_querier = OpenDPQuerier(data_connector, admin_database)

        case DPLibraries.DIFFPRIVLIB:
            request_model = DiffPrivLibRequestModel.model_validate_json(request_model_str)
            dp_querier = DiffPrivLibQuerier(data_connector, admin_database)

    try:
        eps_cost, delta_cost = dp_querier.cost(request_model)
        elapsed = time.time() - start_sec
        logger.debug(f"Done ({elapsed:.2f})")
        return CostResponse(epsilon=eps_cost, delta=delta_cost)
    except Exception as exc:  # pylint: disable=broad-exception-caught
        known_exc = handle_exceptions(exc)
        return known_exc.body, known_exc.status_code

handle_query async #

handle_query(admin_database: Proxy, body: bytes) -> QueryResponse | tuple[bytes, int]

Handle DP query into QueryResponse.

Source code in server/lomas_server/worker.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
async def handle_query(admin_database: Proxy, body: bytes) -> QueryResponse | tuple[bytes, int]:
    """Handle DP query into QueryResponse."""
    start_sec = time.time()
    message = body.decode()
    user_name, dp_library, data_connector_str, query_json_str = message.split("λ", 3)

    data_connector = ConnectorUnionTA.validate_json(data_connector_str)

    dp_querier: DPQuerier
    match dp_library:
        case DPLibraries.SMARTNOISE_SQL:
            query_json = SmartnoiseSQLQueryModel.model_validate_json(query_json_str)
            dp_querier = SmartnoiseSQLQuerier(data_connector, admin_database)

        case DPLibraries.OPENDP:
            query_json = OpenDPQueryModel.model_validate_json(query_json_str)
            dp_querier = OpenDPQuerier(data_connector, admin_database)

        case DPLibraries.DIFFPRIVLIB:
            query_json = DiffPrivLibQueryModel.model_validate_json(query_json_str)
            dp_querier = DiffPrivLibQuerier(data_connector, admin_database)

    try:
        query_response = await dp_querier.handle_query(query_json, user_name)
        elapsed = time.time() - start_sec
        logger.debug(f"Done ({elapsed:.2f})")
        return query_response
    except Exception as exc:  # pylint: disable=broad-exception-caught
        known_exc = handle_exceptions(exc)
        return known_exc.body, known_exc.status_code

handle_dummy_query async #

handle_dummy_query(
    admin_database: Proxy, body: bytes
) -> QueryResponse | tuple[bytes, int]

Handle DP-dummy query into QueryResponse.

Source code in server/lomas_server/worker.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
async def handle_dummy_query(admin_database: Proxy, body: bytes) -> QueryResponse | tuple[bytes, int]:
    """Handle DP-dummy query into QueryResponse."""
    start_sec = time.time()
    message = body.decode()
    user_name, dp_library, data_connector, query_model_str = message.split("λ", 3)

    dp_querier: DPQuerier
    match dp_library:
        case DPLibraries.SMARTNOISE_SQL:
            query_model = SmartnoiseSQLDummyQueryModel.model_validate_json(query_model_str)
            data_connector = await get_dummy_dataset_for_query(admin_database, query_model)
            dp_querier = SmartnoiseSQLQuerier(data_connector, admin_database)

        case DPLibraries.OPENDP:
            query_model = OpenDPDummyQueryModel.model_validate_json(query_model_str)
            data_connector = await get_dummy_dataset_for_query(admin_database, query_model)
            dp_querier = OpenDPQuerier(data_connector, admin_database)

        case DPLibraries.DIFFPRIVLIB:
            query_model = DiffPrivLibDummyQueryModel.model_validate_json(query_model_str)
            data_connector = await get_dummy_dataset_for_query(admin_database, query_model)
            dp_querier = DiffPrivLibQuerier(data_connector, admin_database)

    try:
        eps_cost, delta_cost = dp_querier.cost(query_model)
        result = dp_querier.query(query_model)
        dummy_query_response = QueryResponse(
            requested_by=user_name, result=result, epsilon=eps_cost, delta=delta_cost
        )
        elapsed = time.time() - start_sec
        logger.debug(f"Done ({elapsed:.2f})")
        return dummy_query_response
    except Exception as exc:  # pylint: disable=broad-exception-caught
        known_exc = handle_exceptions(exc)
        return known_exc.body, known_exc.status_code

process_message async #

process_message(
    channel: Channel,
    in_queue: str,
    out_queue: str,
    message_handler: Callable[[bytes], Any],
) -> None

General RabbitMQ Message handler -> processing -> response.

Source code in server/lomas_server/worker.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
async def process_message(
    channel: aio_pika.Channel, in_queue: str, out_queue: str, message_handler: Callable[[bytes], Any]
) -> None:
    """General RabbitMQ Message handler -> processing -> response."""
    queue = await channel.declare_queue(in_queue, durable=True)
    await channel.declare_queue(out_queue, durable=True)

    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            async with message.process():
                headers = None
                body = b""
                match await message_handler(message.body):
                    case (bytes(exc_body), int(status_code)):
                        headers = {"type": "exception", "status_code": status_code}
                        body = exc_body

                    case query_response:
                        logger.debug(
                            f"Response length: {len(query_response.model_dump_json())} {message.correlation_id}"
                        )
                        # logger.debug(query_response.model_dump_json())
                        body = query_response.model_dump_json().encode()

                await channel.default_exchange.publish(
                    aio_pika.Message(headers=headers, body=body, correlation_id=message.correlation_id),
                    routing_key=out_queue,
                )

force_terminate_task_group async #

force_terminate_task_group() -> Never

Used to force termination of a task group.

Source code in server/lomas_server/worker.py
229
230
231
async def force_terminate_task_group() -> Never:
    """Used to force termination of a task group."""
    raise TerminateTaskGroup

ask_exit #

ask_exit(signame: str, tg: TaskGroup) -> None

Signal handler for TaskGroup termination.

Source code in server/lomas_server/worker.py
234
235
236
237
def ask_exit(signame: str, tg: asyncio.TaskGroup) -> None:
    """Signal handler for TaskGroup termination."""
    logger.info(f"got signal {signame}: exit")
    tg.create_task(force_terminate_task_group())

process_all_queues async #

process_all_queues(config: Config) -> None

Handle & await all pika processing queues.

Source code in server/lomas_server/worker.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
async def process_all_queues(config: Config) -> None:
    """Handle & await all pika processing queues."""
    loop = asyncio.get_running_loop()
    connection = await rabbitmq_connect_queue(config)

    async with connection:
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=1)
        rpc = await RPC.create(channel, durable=True)

        try:
            async with asyncio.TaskGroup() as tg:
                tg.create_task(
                    process_message(channel, "task_queue", "task_response", partial(handle_query, rpc.proxy))
                )
                tg.create_task(
                    process_message(
                        channel, "cost_queue", "cost_response", partial(handle_cost_query, rpc.proxy)
                    )
                )
                tg.create_task(
                    process_message(
                        channel, "dummy_queue", "dummy_response", partial(handle_dummy_query, rpc.proxy)
                    )
                )

                # register signal for polite TaskGroup termination
                for signame in ["SIGINT", "SIGTERM"]:
                    loop.add_signal_handler(
                        getattr(signal, signame), functools.partial(ask_exit, signame, tg)
                    )
            # All tasks in Taskgroup are awaited here (aexit of TaskGroup context)
        except* TerminateTaskGroup:
            logger.info("Terminated")
        finally:
            await channel.close()
            await connection.close()

run #

run() -> None

Start the Worker loop.

Source code in server/lomas_server/worker.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
def run() -> None:
    """Start the Worker loop."""
    config = Config()
    init_logging(
        name="lomas_server", level=config.server.log_level, lomas_level=config.server.lomas_log_level
    )

    set_opendp_features_config(config.opendp_features)

    if config.telemetry.enabled:
        LoggingInstrumentor().instrument(set_logging_format=True)
        AioPikaInstrumentor().instrument()
        init_telemetry(config.telemetry)

    logger.info("Waiting for messages. To exit press CTRL+C")
    asyncio.run(process_all_queues(config))