Data Ingestion Framework | Factory Pattern | Feedback Welcomed
"""
Factory Pattern - We have a specific Factory which is responsible for providing us with the desired object as per the input.
Abstract Factory Pattern - We have a Factory (abstract) on top of Factory (prev) which allows for decoupling the Factories and addition of new Factory in the code is easier
"""

"""
Problem Statement - Design a Data Ingestion System where we have Sources (API, File, DataLake) and Storage (Cloud, OnPremise)
"""

"""
Use Case Flow -
1. System is requested to injest data from a certain source which can be an API, File, etc
2. The data is ingested into a certain source based on the request
"""

"""
Main Entities
1. CloudStorage implements IngestionService
    1. CloudAPISource implements IngestToDB
    2. CloudFileSource implements IngestToDB
    3. CloudDataLakeSource implements IngestToDB
2. OnPremiseStorage implements IngestionService
    1. APISource implements IngestToDB
    2. FileSource implements IngestToDB
    3. DataLakeSource implements IngestToDB
3. StorageFactory - responsible for providing desired Storage as per request
4. IngestionService - Interface (IngestDataToDB)
5. IngestToDB - Interface (IngestData)
"""

"""
Classes - 
1. IngestionService
    methods -
    IngestDataToDB -> getSourceObject and call IngestData of the source Object
    getSourceObject -> abstract method
2. OnPremiseStorage
    methods - 
    getSourceObject -> return either of APISource, FileSource, DataLakeSource based on reqType
3. CloudStorage
    methods - 
    getSourceObject -> return either of CloudAPISource, CloudFileSource, CloudDataLakeSource based on reqType
4. IngestToDB
    methods - 
    IngestData - abstract
5. APISource
6. FileSource
7. DataLakeSource
8. CloudAPISource
9. CloudFileSource
10. CloudDataLakeSource
"""

"""
Code Implementation
"""

# IngestToDB.py - Factory Pattern
from abc import ABC, abstractmethod
class IngestToDB(ABC):
    """
    Init @ IngestToDb
    """
    def __init__(self):
        pass

    """
    IngestData @ IngestToDB
    """
    @abstractmethod
    def IngestData(self):
        pass

# APISource.py
class APISource(IngestToDB):
    """
    Init @ APISource
    """
    def __init__(self):
        pass

    """
    Init @ APISource
    """
    def IngestData(self):
        print("Ingesting Data from API Source")


# DataLakeSource.py
class DataLakeSource(IngestToDB):
    """
    Init @ DataLakeSource
    """
    def __init__(self):
        pass

    """
    Init @ DataLakeSource
    """
    def IngestData(self):
        print("Ingesting Data from DataLake Source")


# FileSource.py
class FileSource(IngestToDB):
    """
    Init @ FileSource
    """
    def __init__(self):
        pass

    """
    Init @ FileSource
    """
    def IngestData(self):
        print("Ingesting Data from File Source")

# CloudAPISource.py
class CloudAPISource(IngestToDB):
    """
    Init @ CloudAPISource
    """
    def __init__(self):
        pass

    """
    Init @ CloudAPISource
    """
    def IngestData(self):
        print("Cloud: Ingesting Data from Cloud API Source")


# CloudDataLakeSource.py
class CloudDataLakeSource(IngestToDB):
    """
    Init @ CloudDataLakeSource
    """
    def __init__(self):
        pass

    """
    Init @ CloudDataLakeSource
    """
    def IngestData(self):
        print("Cloud: Ingesting Data from Cloud DataLake Source")


# CloudFileSource.py
class CloudFileSource(IngestToDB):
    """
    Init @ CloudFileSource
    """
    def __init__(self):
        pass

    """
    Init @ CloudFileSource
    """
    def IngestData(self):
        print("Cloud: Ingesting Data from Cloud File Source")


# IngestionService.py - Abstract Factory
class IngestionService(ABC):
    """
    Init @ IngestionService
    """
    def __init__(self):
        pass
    
    """
    IngestDataToDB @ IngestionService
    """
    def ingestDataToDB(self, source):
        sourceObject = self.getSourceInstance(source)
        sourceObject.IngestData()
    
    """
    getSourceInstance (abstract) @ IngestionService
    """
    @abstractmethod
    def getSourceInstance(self, source):
        pass

# CloudStorage.py
class CloudStorage(IngestionService):
    """
    Init @ CloudStorage
    """
    def __init__(self):
        pass
    
    """
    getSourceInstance (Ovveride) @ CloudStorage
    """
    def getSourceInstance(self, source):
        if(source == "API"):
            return CloudAPISource()
        elif(source == "FILE"):
            return CloudFileSource()
        elif(source == "DATALAKE"):
            return CloudDataLakeSource()
        else:
            print("Unknown source")

# OnPremiseStorage.py
class OnPremiseStorage(IngestionService):
    """
    Init @ OnPremiseStorage
    """
    def __init__(self):
        pass
    
    """
    getSourceInstance (Ovveride) @ OnPremiseStorage
    """
    def getSourceInstance(self, source):
        if(source == "API"):
            return APISource()
        elif(source == "FILE"):
            return FileSource()
        elif(source == "DATALAKE"):
            return DataLakeSource()
        else:
            print("Unknown source")
    
# StorageFactory.py
class StorageFactory:

    """
    Init @ StorageFactory
    """
    def __init__(self):
        pass
    
    """
    Storage Object @ StorageFactory
    """
    def getStorageObject(self, source):
        if(source == "CLOUD"):
            return CloudStorage()
        elif(source == "ONPREMISE"):
            return OnPremiseStorage()
        else:
            print("Unknown source")
            return None

# DataIngestionFramework.py
class DataIngestionFramework:
    """
    Init @ DataIngestionFramework
    """
    def __init__(self):
        pass

    """
    Ingest @ DataIngestionFramework
    """
    def ingest(self, storage, source):
        storageFactory = StorageFactory()
        storageObject = storageFactory.getStorageObject(storage)
        storageObject.ingestDataToDB(source)

"""
Testing
"""
DIFramework = DataIngestionFramework()
DIFramework.ingest("CLOUD", "API")
DIFramework.ingest("CLOUD", "FILE")
DIFramework.ingest("ONPREMISE", "DATALAKE")

Comments (3)