mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-08 01:29:37 +00:00

* GitBook: [#177] Documentation Update - Airflow * GitBook: [#195] Removing Cron from databaseServices * GitBook: [#196] Added trino * GitBook: [#197] removed cron from config * GitBook: [#198] Added Redash Documentation * GitBook: [#199] Added Bigquery Usage Documentation * GitBook: [#200] Added page link for presto * GitBook: [#201] Added Local Docker documentation * GitBook: [#202] Added Documentation for Local Docker Setup * GitBook: [#203] Added Git Command to clone Openmetadata in docs * GitBook: [#207] links update * GitBook: [#208] Updating Airflow Documentation * GitBook: [#210] Adding Python installation package under Airflow Lineage config * GitBook: [#211] Change the links to 0.5..0 * GitBook: [#213] Move buried connectors page up * GitBook: [#214] Update to connectors page * GitBook: [#215] Removed sub-categories * GitBook: [#212] Adding Discovery tutorial * GitBook: [#220] Updated steps to H2s. * GitBook: [#230] Complex queries * GitBook: [#231] Add lineage to feature overview * GitBook: [#232] Make feature overview headers verbs instead of nouns * GitBook: [#233] Add data reliability to features overview * GitBook: [#234] Add complex data types to feature overview * GitBook: [#235] Simplify and further distinguish discovery feature headers * GitBook: [#236] Add data importance to feature overview * GitBook: [#237] Break Connectors into its own section * GitBook: [#238] Reorganize first section of docs. * GitBook: [#239] Add connectors to feature overview * GitBook: [#240] Organize layout of feature overview into feature categories as agreed with Harsha. * GitBook: [#242] Make overview paragraph more descriptive. * GitBook: [#243] Create a link to Connectors section from feature overview. * GitBook: [#244] Add "discover data through association" to feature overview. * GitBook: [#245] Update importance and owners gifs * GitBook: [#246] Include a little more descriptive documentation for key features. * GitBook: [#248] Small tweaks to intro paragraph. * GitBook: [#249] Clean up data profiler paragraph. * GitBook: [#250] Promote Complex Data Types to its own feature. * GitBook: [#251] Update to advanced search * GitBook: [#252] Update Roadmap * GitBook: [#254] Remove old features page (text and screenshot based). * GitBook: [#255] Remove references to removed page. * GitBook: [#256] Add Descriptions and Tags section to feature overview. * GitBook: [#257] Update title for "Know Your Data" Co-authored-by: Ayush Shah <ayush.shah@deuexsolutions.com> Co-authored-by: Suresh Srinivas <suresh@getcollate.io> Co-authored-by: Shannon Bradshaw <shannon.bradshaw@arrikto.com> Co-authored-by: OpenMetadata <github@harsha.io>
92 lines
3.7 KiB
Markdown
92 lines
3.7 KiB
Markdown
# Sink
|
|
|
|
The Sink will get the event emitted by the source, one at a time. It can use this record to make external service calls to store or index etc.For OpenMetadata we have [MetadataRestTablesSink](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/sink/metadata_rest.py)
|
|
|
|
## API
|
|
|
|
```python
|
|
@dataclass # type: ignore[misc]
|
|
class Sink(Closeable, metaclass=ABCMeta):
|
|
"""All Sinks must inherit this base class."""
|
|
|
|
ctx: WorkflowContext
|
|
|
|
@classmethod
|
|
@abstractmethod
|
|
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Sink":
|
|
pass
|
|
|
|
@abstractmethod
|
|
def write_record(self, record: Record) -> None:
|
|
# must call callback when done.
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_status(self) -> SinkStatus:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def close(self) -> None:
|
|
pass
|
|
```
|
|
|
|
**create** method is called during the workflow instantiation and creates an instance of the sink
|
|
|
|
**write_record** this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc.
|
|
|
|
**get_status** to report the status of the sink ex: how many records, failures or warnings etc.
|
|
|
|
**close** gets called before the workflow stops. Can be used to clean up any connections or other resources.
|
|
|
|
## Example
|
|
|
|
Example implementation
|
|
|
|
```python
|
|
class MetadataRestTablesSink(Sink):
|
|
config: MetadataTablesSinkConfig
|
|
status: SinkStatus
|
|
|
|
def __init__(self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig):
|
|
super().__init__(ctx)
|
|
self.config = config
|
|
self.metadata_config = metadata_config
|
|
self.status = SinkStatus()
|
|
self.wrote_something = False
|
|
self.rest = REST(self.metadata_config)
|
|
|
|
@classmethod
|
|
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
|
config = MetadataTablesSinkConfig.parse_obj(config_dict)
|
|
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
|
return cls(ctx, config, metadata_config)
|
|
|
|
def write_record(self, table_and_db: OMetaDatabaseAndTable) -> None:
|
|
try:
|
|
db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name,
|
|
description=table_and_db.database.description,
|
|
service=EntityReference(id=table_and_db.database.service.id,
|
|
type="databaseService"))
|
|
db = self.rest.create_database(db_request)
|
|
table_request = CreateTableEntityRequest(name=table_and_db.table.name,
|
|
columns=table_and_db.table.columns,
|
|
description=table_and_db.table.description,
|
|
database=db.id)
|
|
created_table = self.rest.create_or_update_table(table_request)
|
|
logger.info(
|
|
'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
|
|
self.status.records_written(
|
|
'{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
|
|
except (APIError, ValidationError) as err:
|
|
logger.error(
|
|
"Failed to ingest table {} in database {} ".format(table_and_db.table.name, table_and_db.database.name))
|
|
logger.error(err)
|
|
self.status.failures(table_and_db.table.name)
|
|
|
|
def get_status(self):
|
|
return self.status
|
|
|
|
def close(self):
|
|
pass
|
|
```
|