2021-08-17 15:45:52 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								# BulkSink
  
						 
					
						
							
								
									
										
										
										
											2021-08-16 15:37:12 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2021-08-17 15:45:52 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								**BulkSink** is an optional component in workflow. It can be used to bulk update the records generated in a workflow. It needs to be used in conjuction with Stage
							 
						 
					
						
							
								
									
										
										
										
											2021-08-16 15:37:12 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								## API
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2021-08-17 15:45:52 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								```python
							 
						 
					
						
							
								
									
										
										
										
											2021-08-16 15:37:12 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								@dataclass   # type: ignore[misc] 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								class BulkSink(Closeable, metaclass=ABCMeta):
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    ctx: WorkflowContext
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    @classmethod 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    @abstractmethod 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "BulkSink":
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pass
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    @abstractmethod 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def write_records(self) -> None:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pass
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    @abstractmethod 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def get_status(self) -> BulkSinkStatus:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pass
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    @abstractmethod 
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								    def close(self) -> None:
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								        pass
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								```
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								**create** method is called during the workflow instantiation and creates a instance of the bulksink
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2021-08-17 15:45:52 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								**write\_records** this method is called only once in Workflow. Its developer responsibility to make bulk actions inside this method. Such as read the entire file or store to generate the API calls to external services
							 
						 
					
						
							
								
									
										
										
										
											2021-08-16 15:37:12 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
									
										
										
										
											2021-08-17 15:45:52 +00:00 
										
									 
								 
							 
							
								
									
										 
								
							 
							
								 
							
							
								**get\_status** to report the status of the bulk\_sink ex: how many records, failures or warnings etc..
							 
						 
					
						
							
								
									
										
										
										
											2021-08-16 15:37:12 -07:00 
										
									 
								 
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								**close** gets called before the workflow stops. Can be used to cleanup any connections or other resources.
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								## Example
  
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								
							 
						 
					
						
							
								
							 
							
								
							 
							
								 
							
							
								[Example implmentation ](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py#L36 )