2021-10-15 11:31:33 -07:00

3.7 KiB

Sink

The Sink will get the event emitted by the source, one at a time. It can use this record to make external service calls to store or index etc.For OpenMetadata we have MetadataRestTablesSink

API

@dataclass  # type: ignore[misc]
class Sink(Closeable, metaclass=ABCMeta):
    """All Sinks must inherit this base class."""

    ctx: WorkflowContext

    @classmethod
    @abstractmethod
    def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Sink":
        pass

    @abstractmethod
    def write_record(self, record: Record) -> None:
        # must call callback when done.
        pass

    @abstractmethod
    def get_status(self) -> SinkStatus:
        pass

    @abstractmethod
    def close(self) -> None:
        pass

create method is called during the workflow instantiation and creates an instance of the sink

write_record this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc.

get_status to report the status of the sink ex: how many records, failures or warnings etc.

close gets called before the workflow stops. Can be used to clean up any connections or other resources.

Example

Example implementation

class MetadataRestTablesSink(Sink):
    config: MetadataTablesSinkConfig
    status: SinkStatus

    def __init__(self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig):
        super().__init__(ctx)
        self.config = config
        self.metadata_config = metadata_config
        self.status = SinkStatus()
        self.wrote_something = False
        self.rest = REST(self.metadata_config)

    @classmethod
    def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
        config = MetadataTablesSinkConfig.parse_obj(config_dict)
        metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
        return cls(ctx, config, metadata_config)

    def write_record(self, table_and_db: OMetaDatabaseAndTable) -> None:
        try:
            db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name,
                                                     description=table_and_db.database.description,
                                                     service=EntityReference(id=table_and_db.database.service.id,
                                                                             type="databaseService"))
            db = self.rest.create_database(db_request)
            table_request = CreateTableEntityRequest(name=table_and_db.table.name,
                                                     columns=table_and_db.table.columns,
                                                     description=table_and_db.table.description,
                                                     database=db.id)
            created_table = self.rest.create_or_update_table(table_request)
            logger.info(
                'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
            self.status.records_written(
                '{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
        except (APIError, ValidationError) as err:
            logger.error(
                "Failed to ingest table {} in database {} ".format(table_and_db.table.name, table_and_db.database.name))
            logger.error(err)
            self.status.failures(table_and_db.table.name)

    def get_status(self):
        return self.status

    def close(self):
        pass