# Sink The Sink will get the event emitted by the source, one at a time. It can use this record to make external service calls to store or index etc.For OpenMetadata we have [MetadataRestTablesSink](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/sink/metadata_rest.py) ## API ```python @dataclass # type: ignore[misc] class Sink(Closeable, metaclass=ABCMeta): """All Sinks must inherit this base class.""" ctx: WorkflowContext @classmethod @abstractmethod def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Sink": pass @abstractmethod def write_record(self, record: Record) -> None: # must call callback when done. pass @abstractmethod def get_status(self) -> SinkStatus: pass @abstractmethod def close(self) -> None: pass ``` **create** method is called during the workflow instantiation and creates an instance of the sink **write_record** this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc. **get_status** to report the status of the sink ex: how many records, failures or warnings etc. **close** gets called before the workflow stops. Can be used to clean up any connections or other resources. ## Example Example implementation ```python class MetadataRestTablesSink(Sink): config: MetadataTablesSinkConfig status: SinkStatus def __init__(self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig): super().__init__(ctx) self.config = config self.metadata_config = metadata_config self.status = SinkStatus() self.wrote_something = False self.rest = REST(self.metadata_config) @classmethod def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext): config = MetadataTablesSinkConfig.parse_obj(config_dict) metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict) return cls(ctx, config, metadata_config) def write_record(self, table_and_db: OMetaDatabaseAndTable) -> None: try: db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name, description=table_and_db.database.description, service=EntityReference(id=table_and_db.database.service.id, type="databaseService")) db = self.rest.create_database(db_request) table_request = CreateTableEntityRequest(name=table_and_db.table.name, columns=table_and_db.table.columns, description=table_and_db.table.description, database=db.id) created_table = self.rest.create_or_update_table(table_request) logger.info( 'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__)) self.status.records_written( '{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__)) except (APIError, ValidationError) as err: logger.error( "Failed to ingest table {} in database {} ".format(table_and_db.table.name, table_and_db.database.name)) logger.error(err) self.status.failures(table_and_db.table.name) def get_status(self): return self.status def close(self): pass ```