Source code for lomas_server.private_dataset.s3_dataset

from typing import Optional

import boto3
import pandas as pd

from private_dataset.private_dataset import PrivateDataset
from utils.error_handler import InternalServerException


[docs] class S3Dataset(PrivateDataset): """ PrivateDataset for dataset in S3 storage. """ def __init__( self, metadata: dict, s3_parameters: dict, ) -> None: """Initializer. Does not load the dataset yet. Args: metadata (dict): The metadata dictionary. s3_parameters (dict): informations to access metadata """ super().__init__(metadata) self.client = boto3.client( "s3", endpoint_url=s3_parameters["s3_endpoint"], aws_access_key_id=s3_parameters["s3_aws_access_key_id"], aws_secret_access_key=s3_parameters["aws_secret_access_key"], ) self.s3_bucket: str = s3_parameters["s3_bucket"] self.s3_key: str = s3_parameters["s3_key"] self.df: Optional[pd.DataFrame] = None
[docs] def get_pandas_df(self) -> pd.DataFrame: """Get the data in pandas dataframe format Raises: InternalServerException: If the dataset cannot be read. Returns: pd.DataFrame: pandas dataframe of dataset """ if self.df is None: obj = self.client.get_object( Bucket=self.s3_bucket, Key=self.s3_key ) try: self.df = pd.read_csv(obj["Body"], dtype=self.dtypes) except Exception as err: raise InternalServerException( "Error reading csv at s3 path:" + f"{self.s3_bucket}/{self.s3_key}: {err}" ) from err # Notify observer since memory usage has changed for observer in self.dataset_observers: observer.update_memory_usage() return self.df