Source code for lomas_server.data_connector.s3_connector

from typing import Optional

import boto3
import pandas as pd

from lomas_server.data_connector.data_connector import DataConnector
from lomas_server.utils.collection_models import Metadata
from lomas_server.utils.config import S3CredentialsConfig
from lomas_server.utils.error_handler import InternalServerException


[docs] class S3Connector(DataConnector): """ DataConnector for dataset in S3 storage. """ def __init__( self, metadata: Metadata, credentials: S3CredentialsConfig, ) -> None: """Initializer. Does not load the dataset yet. Args: metadata (Metadata): The metadata dictionary. s3_parameters (dict): informations to access metadata """ super().__init__(metadata) self.client = boto3.client( "s3", endpoint_url=credentials.endpoint_url, aws_access_key_id=credentials.access_key_id, aws_secret_access_key=credentials.secret_access_key, ) self.bucket: str = credentials.bucket self.key: str = credentials.key self.df: Optional[pd.DataFrame] = None
[docs] def get_pandas_df(self) -> pd.DataFrame: """Get the data in pandas dataframe format Raises: InternalServerException: If the dataset cannot be read. Returns: pd.DataFrame: pandas dataframe of dataset """ if self.df is None: obj = self.client.get_object(Bucket=self.bucket, Key=self.key) try: self.df = pd.read_csv(obj["Body"], dtype=self.dtypes) except Exception as err: raise InternalServerException( "Error reading csv at s3 path:" + f"{self.bucket}/{self.key}: {err}" ) from err return self.df