parthp2107 e2578d6be3
Added documentation changes done in 0.5.0 branch to main (#1168)
* GitBook: [#177] Documentation Update - Airflow

* GitBook: [#195] Removing Cron from databaseServices

* GitBook: [#196] Added trino

* GitBook: [#197] removed cron from config

* GitBook: [#198] Added Redash Documentation

* GitBook: [#199] Added Bigquery Usage Documentation

* GitBook: [#200] Added page link for presto

* GitBook: [#201] Added Local Docker documentation

* GitBook: [#202] Added Documentation for Local Docker Setup

* GitBook: [#203] Added Git Command to clone Openmetadata in docs

* GitBook: [#207] links update

* GitBook: [#208] Updating Airflow Documentation

* GitBook: [#210] Adding Python installation package under Airflow Lineage config

* GitBook: [#211] Change the links to 0.5..0

* GitBook: [#213] Move buried connectors page up

* GitBook: [#214] Update to connectors page

* GitBook: [#215] Removed sub-categories

* GitBook: [#212] Adding Discovery tutorial

* GitBook: [#220] Updated steps to H2s.

* GitBook: [#230] Complex queries

* GitBook: [#231] Add lineage to feature overview

* GitBook: [#232] Make feature overview headers verbs instead of nouns

* GitBook: [#233] Add data reliability to features overview

* GitBook: [#234] Add complex data types to feature overview

* GitBook: [#235] Simplify and further distinguish discovery feature headers

* GitBook: [#236] Add data importance to feature overview

* GitBook: [#237] Break Connectors into its own section

* GitBook: [#238] Reorganize first section of docs.

* GitBook: [#239] Add connectors to feature overview

* GitBook: [#240] Organize layout of feature overview into feature categories as agreed with Harsha.

* GitBook: [#242] Make overview paragraph more descriptive.

* GitBook: [#243] Create a link to Connectors section from feature overview.

* GitBook: [#244] Add "discover data through association" to feature overview.

* GitBook: [#245] Update importance and owners gifs

* GitBook: [#246] Include a little more descriptive documentation for key features.

* GitBook: [#248] Small tweaks to intro paragraph.

* GitBook: [#249] Clean up data profiler paragraph.

* GitBook: [#250] Promote Complex Data Types to its own feature.

* GitBook: [#251] Update to advanced search

* GitBook: [#252] Update Roadmap

* GitBook: [#254] Remove old features page (text and screenshot based).

* GitBook: [#255] Remove references to removed page.

* GitBook: [#256] Add Descriptions and Tags section to feature overview.

* GitBook: [#257] Update title for "Know Your Data"

Co-authored-by: Ayush Shah <ayush.shah@deuexsolutions.com>
Co-authored-by: Suresh Srinivas <suresh@getcollate.io>
Co-authored-by: Shannon Bradshaw <shannon.bradshaw@arrikto.com>
Co-authored-by: OpenMetadata <github@harsha.io>
2021-11-13 09:33:20 -08:00

3.7 KiB

Sink

The Sink will get the event emitted by the source, one at a time. It can use this record to make external service calls to store or index etc.For OpenMetadata we have MetadataRestTablesSink

API

@dataclass  # type: ignore[misc]
class Sink(Closeable, metaclass=ABCMeta):
    """All Sinks must inherit this base class."""

    ctx: WorkflowContext

    @classmethod
    @abstractmethod
    def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Sink":
        pass

    @abstractmethod
    def write_record(self, record: Record) -> None:
        # must call callback when done.
        pass

    @abstractmethod
    def get_status(self) -> SinkStatus:
        pass

    @abstractmethod
    def close(self) -> None:
        pass

create method is called during the workflow instantiation and creates an instance of the sink

write_record this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc.

get_status to report the status of the sink ex: how many records, failures or warnings etc.

close gets called before the workflow stops. Can be used to clean up any connections or other resources.

Example

Example implementation

class MetadataRestTablesSink(Sink):
    config: MetadataTablesSinkConfig
    status: SinkStatus

    def __init__(self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig):
        super().__init__(ctx)
        self.config = config
        self.metadata_config = metadata_config
        self.status = SinkStatus()
        self.wrote_something = False
        self.rest = REST(self.metadata_config)

    @classmethod
    def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
        config = MetadataTablesSinkConfig.parse_obj(config_dict)
        metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
        return cls(ctx, config, metadata_config)

    def write_record(self, table_and_db: OMetaDatabaseAndTable) -> None:
        try:
            db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name,
                                                     description=table_and_db.database.description,
                                                     service=EntityReference(id=table_and_db.database.service.id,
                                                                             type="databaseService"))
            db = self.rest.create_database(db_request)
            table_request = CreateTableEntityRequest(name=table_and_db.table.name,
                                                     columns=table_and_db.table.columns,
                                                     description=table_and_db.table.description,
                                                     database=db.id)
            created_table = self.rest.create_or_update_table(table_request)
            logger.info(
                'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
            self.status.records_written(
                '{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
        except (APIError, ValidationError) as err:
            logger.error(
                "Failed to ingest table {} in database {} ".format(table_and_db.table.name, table_and_db.database.name))
            logger.error(err)
            self.status.failures(table_and_db.table.name)

    def get_status(self):
        return self.status

    def close(self):
        pass