2021-08-17 15:45:52 +00:00
# Sink
2021-08-16 15:37:12 -07:00
2021-11-13 23:03:20 +05:30
The Sink will get the event emitted by the source, one at a time. It can use this record to make external service calls to store or index etc.For OpenMetadata we have [MetadataRestTablesSink ](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/sink/metadata_rest.py )
2021-08-16 15:37:12 -07:00
## API
2021-08-17 15:45:52 +00:00
```python
2021-08-16 15:37:12 -07:00
@dataclass # type: ignore[misc]
class Sink(Closeable, metaclass=ABCMeta):
"""All Sinks must inherit this base class."""
ctx: WorkflowContext
@classmethod
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Sink":
pass
@abstractmethod
def write_record(self, record: Record) -> None:
# must call callback when done.
pass
@abstractmethod
def get_status(self) -> SinkStatus:
pass
@abstractmethod
def close(self) -> None:
pass
```
2021-11-13 23:03:20 +05:30
**create** method is called during the workflow instantiation and creates an instance of the sink
2021-08-16 15:37:12 -07:00
2021-11-13 23:03:20 +05:30
**write_record** this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc.
2021-08-16 15:37:12 -07:00
2021-11-13 23:03:20 +05:30
**get_status** to report the status of the sink ex: how many records, failures or warnings etc.
2021-08-16 15:37:12 -07:00
2021-11-13 23:03:20 +05:30
**close** gets called before the workflow stops. Can be used to clean up any connections or other resources.
2021-08-16 15:37:12 -07:00
## Example
2021-11-13 23:03:20 +05:30
Example implementation
2021-08-16 15:37:12 -07:00
2021-08-17 15:45:52 +00:00
```python
2021-08-16 15:37:12 -07:00
class MetadataRestTablesSink(Sink):
config: MetadataTablesSinkConfig
status: SinkStatus
def __init__ (self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
self.status = SinkStatus()
self.wrote_something = False
self.rest = REST(self.metadata_config)
@classmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
config = MetadataTablesSinkConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(ctx, config, metadata_config)
def write_record(self, table_and_db: OMetaDatabaseAndTable) -> None:
try:
db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name,
description=table_and_db.database.description,
service=EntityReference(id=table_and_db.database.service.id,
type="dat abaseService"))
db = self.rest.create_database(db_request)
table_request = CreateTableEntityRequest(name=table_and_db.table.name,
columns=table_and_db.table.columns,
description=table_and_db.table.description,
database=db.id)
created_table = self.rest.create_or_update_table(table_request)
logger.info(
'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
self.status.records_written(
'{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
except (APIError, ValidationError) as err:
logger.error(
"Failed to ingest table {} in database {} ".format(table_and_db.table.name, table_and_db.database.name))
logger.error(err)
self.status.failures(table_and_db.table.name)
def get_status(self):
return self.status
def close(self):
pass
```