mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-25 07:42:40 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			92 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
			
		
		
	
	
			92 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
| # Sink
 | |
| 
 | |
| The Sink will get the event emitted by the source, one at a time. It can use this record to make external service calls to store or index etc.For OpenMetadata we have [MetadataRestTablesSink](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/sink/metadata_rest.py)
 | |
| 
 | |
| ## API
 | |
| 
 | |
| ```python
 | |
| @dataclass  # type: ignore[misc]
 | |
| class Sink(Closeable, metaclass=ABCMeta):
 | |
|     """All Sinks must inherit this base class."""
 | |
| 
 | |
|     ctx: WorkflowContext
 | |
| 
 | |
|     @classmethod
 | |
|     @abstractmethod
 | |
|     def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Sink":
 | |
|         pass
 | |
| 
 | |
|     @abstractmethod
 | |
|     def write_record(self, record: Record) -> None:
 | |
|         # must call callback when done.
 | |
|         pass
 | |
| 
 | |
|     @abstractmethod
 | |
|     def get_status(self) -> SinkStatus:
 | |
|         pass
 | |
| 
 | |
|     @abstractmethod
 | |
|     def close(self) -> None:
 | |
|         pass
 | |
| ```
 | |
| 
 | |
| **create** method is called during the workflow instantiation and creates an instance of the sink
 | |
| 
 | |
| **write_record** this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc.
 | |
| 
 | |
| **get_status** to report the status of the sink ex: how many records, failures or warnings etc.
 | |
| 
 | |
| **close** gets called before the workflow stops. Can be used to clean up any connections or other resources.
 | |
| 
 | |
| ## Example
 | |
| 
 | |
| Example implementation
 | |
| 
 | |
| ```python
 | |
| class MetadataRestTablesSink(Sink):
 | |
|     config: MetadataTablesSinkConfig
 | |
|     status: SinkStatus
 | |
| 
 | |
|     def __init__(self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig):
 | |
|         super().__init__(ctx)
 | |
|         self.config = config
 | |
|         self.metadata_config = metadata_config
 | |
|         self.status = SinkStatus()
 | |
|         self.wrote_something = False
 | |
|         self.rest = REST(self.metadata_config)
 | |
| 
 | |
|     @classmethod
 | |
|     def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
 | |
|         config = MetadataTablesSinkConfig.parse_obj(config_dict)
 | |
|         metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
 | |
|         return cls(ctx, config, metadata_config)
 | |
| 
 | |
|     def write_record(self, table_and_db: OMetaDatabaseAndTable) -> None:
 | |
|         try:
 | |
|             db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name,
 | |
|                                                      description=table_and_db.database.description,
 | |
|                                                      service=EntityReference(id=table_and_db.database.service.id,
 | |
|                                                                              type="databaseService"))
 | |
|             db = self.rest.create_database(db_request)
 | |
|             table_request = CreateTableEntityRequest(name=table_and_db.table.name,
 | |
|                                                      columns=table_and_db.table.columns,
 | |
|                                                      description=table_and_db.table.description,
 | |
|                                                      database=db.id)
 | |
|             created_table = self.rest.create_or_update_table(table_request)
 | |
|             logger.info(
 | |
|                 'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
 | |
|             self.status.records_written(
 | |
|                 '{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
 | |
|         except (APIError, ValidationError) as err:
 | |
|             logger.error(
 | |
|                 "Failed to ingest table {} in database {} ".format(table_and_db.table.name, table_and_db.database.name))
 | |
|             logger.error(err)
 | |
|             self.status.failures(table_and_db.table.name)
 | |
| 
 | |
|     def get_status(self):
 | |
|         return self.status
 | |
| 
 | |
|     def close(self):
 | |
|         pass
 | |
| ```
 | 
