mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-11-03 03:59:12 +00:00
Add ingestion docs to implement a connector
This commit is contained in:
parent
7a8612f987
commit
8027f631c3
@ -0,0 +1,46 @@
|
||||
---
|
||||
description: >-
|
||||
This design doc will walk through developing a connector for OpenMetadata
|
||||
---
|
||||
|
||||
# Ingestion
|
||||
|
||||
Ingestion is a simple python framework to ingest the metadata from various sources.
|
||||
|
||||
|
||||
##API
|
||||
|
||||
Please look at our framework [APIs](https://github.com/open-metadata/OpenMetadata/tree/main/ingestion/src/metadata/ingestion/api)
|
||||
|
||||
|
||||
## Workflow
|
||||
|
||||
[workflow](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/api/workflow.py) is a simple orchestration job that runs the components in an Order.
|
||||
|
||||
It Consists of [Source](./source.md) , Optional [Processor](./processor.md), [Sink](./sink.md) . It also provides support for [Stage](./stage.md) , [BulkSink](./bulksink.md)
|
||||
|
||||
Workflow execution happens in serial fashion.
|
||||
|
||||
1. It runs **source** component and retrieves and record it may emit
|
||||
2. if **processor** component is configured it sends the record to processor first
|
||||
3. There can be multiple processors attached to the workflow it passes them in the order they are configurd
|
||||
4. Once the **processors** finished , it sends the modified to record to Sink. All of these happens per record
|
||||
|
||||
In the cases where we need to aggregation over the records, we can use **stage** to write to a file or other store. Use the file written to in **stage** and pass it to **bulksink** to publish to external services such as **openmetadata** or **elasticsearch**
|
||||
|
||||
|
||||
|
||||
{% page-ref page="source.md" %}
|
||||
|
||||
{% page-ref page="processor.md" %}
|
||||
|
||||
{% page-ref page="sink.md" %}
|
||||
|
||||
{% page-ref page="stage.md" %}
|
||||
|
||||
{% page-ref page="bulksink.md" %}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -0,0 +1,44 @@
|
||||
#BulkSink
|
||||
|
||||
**BulkSink** is an optional component in workflow. It can be used to bulk update the records
|
||||
generated in a workflow. It needs to be used in conjuction with Stage
|
||||
|
||||
## API
|
||||
|
||||
```py
|
||||
@dataclass # type: ignore[misc]
|
||||
class BulkSink(Closeable, metaclass=ABCMeta):
|
||||
ctx: WorkflowContext
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "BulkSink":
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def write_records(self) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_status(self) -> BulkSinkStatus:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def close(self) -> None:
|
||||
pass
|
||||
```
|
||||
|
||||
**create** method is called during the workflow instantiation and creates a instance of the bulksink
|
||||
|
||||
**write_records** this method is called only once in Workflow. Its developer responsibility to make bulk actions inside this method. Such as read the entire file or store to generate
|
||||
the API calls to external services
|
||||
|
||||
**get_status** to report the status of the bulk_sink ex: how many records, failures or warnings etc..
|
||||
|
||||
**close** gets called before the workflow stops. Can be used to cleanup any connections or other resources.
|
||||
|
||||
|
||||
## Example
|
||||
|
||||
[Example implmentation](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py#L36)
|
||||
|
||||
@ -0,0 +1,102 @@
|
||||
#Processor
|
||||
|
||||
**Processor** is an optional component in workflow. It can be used to modify the record
|
||||
coming from sources. Processor receives a record from source and can modify and re-emit the
|
||||
event back to workflow.
|
||||
|
||||
|
||||
## API
|
||||
|
||||
```py
|
||||
@dataclass
|
||||
class Processor(Closeable, metaclass=ABCMeta):
|
||||
ctx: WorkflowContext
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Processor":
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def process(self, record: Record) -> Record:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_status(self) -> ProcessorStatus:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def close(self) -> None:
|
||||
pass
|
||||
```
|
||||
|
||||
**create** method is called during the workflow instantiation and creates a instance of the processor
|
||||
|
||||
**process** this method is called for each record coming down in workflow chain and can be used to modify or enrich the record
|
||||
|
||||
**get_status** to report the status of the processor ex: how many records, failures or warnings etc..
|
||||
|
||||
**close** gets called before the workflow stops. Can be used to cleanup any connections or other resources.
|
||||
|
||||
|
||||
## Example
|
||||
|
||||
Example implmentation
|
||||
|
||||
```py
|
||||
|
||||
class PiiProcessor(Processor):
|
||||
config: PiiProcessorConfig
|
||||
metadata_config: MetadataServerConfig
|
||||
status: ProcessorStatus
|
||||
client: REST
|
||||
|
||||
def __init__(self, ctx: WorkflowContext, config: PiiProcessorConfig, metadata_config: MetadataServerConfig):
|
||||
super().__init__(ctx)
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.status = ProcessorStatus()
|
||||
self.client = REST(self.metadata_config)
|
||||
self.tags = self.__get_tags()
|
||||
self.column_scanner = ColumnNameScanner()
|
||||
self.ner_scanner = NERScanner()
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||
config = PiiProcessorConfig.parse_obj(config_dict)
|
||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||
return cls(ctx, config, metadata_config)
|
||||
|
||||
def __get_tags(self) -> {}:
|
||||
user_tags = self.client.list_tags_by_category("user")
|
||||
tags_dict = {}
|
||||
for tag in user_tags:
|
||||
tags_dict[tag.name.__root__] = tag
|
||||
return tags_dict
|
||||
|
||||
def process(self, table_and_db: OMetaDatabaseAndTable) -> Record:
|
||||
for column in table_and_db.table.columns:
|
||||
pii_tags = []
|
||||
pii_tags += self.column_scanner.scan(column.name.__root__)
|
||||
pii_tags += self.ner_scanner.scan(column.name.__root__)
|
||||
tag_labels = []
|
||||
for pii_tag in pii_tags:
|
||||
if snake_to_camel(pii_tag) in self.tags.keys():
|
||||
tag_entity = self.tags[snake_to_camel(pii_tag)]
|
||||
else:
|
||||
logging.debug("Fail to tag column {} with tag {}".format(column.name, pii_tag))
|
||||
continue
|
||||
tag_labels.append(TagLabel(tagFQN=tag_entity.fullyQualifiedName,
|
||||
labelType='Automated',
|
||||
state='Suggested',
|
||||
href=tag_entity.href))
|
||||
column.tags = tag_labels
|
||||
|
||||
return table_and_db
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
def get_status(self) -> ProcessorStatus:
|
||||
return self.status
|
||||
```
|
||||
@ -0,0 +1,94 @@
|
||||
#Sink
|
||||
|
||||
Sink will get the event emitted by the source, one at a time. It can use this record to make external service calls to store or index etc.. For OpenMetadata
|
||||
we have [MetadataRestTablesSink](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/sink/metadata_rest_tables.py)
|
||||
|
||||
## API
|
||||
|
||||
```py
|
||||
@dataclass # type: ignore[misc]
|
||||
class Sink(Closeable, metaclass=ABCMeta):
|
||||
"""All Sinks must inherit this base class."""
|
||||
|
||||
ctx: WorkflowContext
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Sink":
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def write_record(self, record: Record) -> None:
|
||||
# must call callback when done.
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_status(self) -> SinkStatus:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def close(self) -> None:
|
||||
pass
|
||||
```
|
||||
|
||||
**create** method is called during the workflow instantiation and creates a instance of the sink
|
||||
|
||||
**write_record** this method is called for each record coming down in workflow chain and can be used to store the record in external services etc..
|
||||
|
||||
**get_status** to report the status of the sink ex: how many records, failures or warnings etc..
|
||||
|
||||
**close** gets called before the workflow stops. Can be used to cleanup any connections or other resources.
|
||||
|
||||
|
||||
## Example
|
||||
|
||||
Example implmentation
|
||||
|
||||
```py
|
||||
|
||||
class MetadataRestTablesSink(Sink):
|
||||
config: MetadataTablesSinkConfig
|
||||
status: SinkStatus
|
||||
|
||||
def __init__(self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig):
|
||||
super().__init__(ctx)
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.status = SinkStatus()
|
||||
self.wrote_something = False
|
||||
self.rest = REST(self.metadata_config)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||
config = MetadataTablesSinkConfig.parse_obj(config_dict)
|
||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||
return cls(ctx, config, metadata_config)
|
||||
|
||||
def write_record(self, table_and_db: OMetaDatabaseAndTable) -> None:
|
||||
try:
|
||||
db_request = CreateDatabaseEntityRequest(name=table_and_db.database.name,
|
||||
description=table_and_db.database.description,
|
||||
service=EntityReference(id=table_and_db.database.service.id,
|
||||
type="databaseService"))
|
||||
db = self.rest.create_database(db_request)
|
||||
table_request = CreateTableEntityRequest(name=table_and_db.table.name,
|
||||
columns=table_and_db.table.columns,
|
||||
description=table_and_db.table.description,
|
||||
database=db.id)
|
||||
created_table = self.rest.create_or_update_table(table_request)
|
||||
logger.info(
|
||||
'Successfully ingested {}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
|
||||
self.status.records_written(
|
||||
'{}.{}'.format(table_and_db.database.name.__root__, created_table.name.__root__))
|
||||
except (APIError, ValidationError) as err:
|
||||
logger.error(
|
||||
"Failed to ingest table {} in database {} ".format(table_and_db.table.name, table_and_db.database.name))
|
||||
logger.error(err)
|
||||
self.status.failures(table_and_db.table.name)
|
||||
|
||||
def get_status(self):
|
||||
return self.status
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
```
|
||||
@ -0,0 +1,83 @@
|
||||
#Source
|
||||
|
||||
Source is the connector to external systems and outputs a record for downstream to process and push to OpenMetadata.
|
||||
|
||||
##Source API
|
||||
|
||||
```py
|
||||
@dataclass # type: ignore[misc]
|
||||
class Source(Closeable, metaclass=ABCMeta):
|
||||
ctx: WorkflowContext
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Source":
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def prepare(self):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def next_record(self) -> Iterable[Record]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_status(self) -> SourceStatus:
|
||||
pass
|
||||
```
|
||||
|
||||
**create** method is used to create an instance of Source
|
||||
|
||||
**prepare** will be called through Python's init method. This will be a place where you could make connections to external sources or initiate the client library
|
||||
|
||||
**next_record** is where the client can connect to external resource and emit the data downstream
|
||||
|
||||
**get_status** is for [workflow](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/api/workflow.py) to call and report the status of the source such as how many records its processed any failures or warnings
|
||||
|
||||
|
||||
## Example
|
||||
|
||||
A simple example of this implementation is
|
||||
|
||||
```py
|
||||
class SampleTablesSource(Source):
|
||||
|
||||
def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
|
||||
super().__init__(ctx)
|
||||
self.status = SampleTableSourceStatus()
|
||||
self.config = config
|
||||
self.metadata_config = metadata_config
|
||||
self.client = REST(metadata_config)
|
||||
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
|
||||
self.database = json.load(open(config.sample_schema_folder + "/database.json", 'r'))
|
||||
self.tables = json.load(open(config.sample_schema_folder + "/tables.json", 'r'))
|
||||
self.service = get_service_or_create(self.service_json, metadata_config)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, metadata_config_dict, ctx):
|
||||
config = SampleTableSourceConfig.parse_obj(config_dict)
|
||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||
return cls(config, metadata_config, ctx)
|
||||
|
||||
def prepare(self):
|
||||
pass
|
||||
|
||||
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
|
||||
db = DatabaseEntity(id=uuid.uuid4(),
|
||||
name=self.database['name'],
|
||||
description=self.database['description'],
|
||||
service=EntityReference(id=self.service.id, type=self.config.service_type))
|
||||
for table in self.tables['tables']:
|
||||
table_metadata = TableEntity(**table)
|
||||
table_and_db = OMetaDatabaseAndTable(table=table_metadata, database=db)
|
||||
self.status.scanned(table_metadata.name.__root__)
|
||||
yield table_and_db
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
def get_status(self):
|
||||
return self.status
|
||||
```
|
||||
|
||||
@ -0,0 +1,79 @@
|
||||
#Stage
|
||||
|
||||
**Stage** is an optional component in workflow. It can be used to store the records in a
|
||||
file or data store and can be used to aggregate the work done by a processor.
|
||||
|
||||
## API
|
||||
|
||||
```py
|
||||
@dataclass # type: ignore[misc]
|
||||
class Stage(Closeable, metaclass=ABCMeta):
|
||||
ctx: WorkflowContext
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Stage":
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def stage_record(self, record: Record):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_status(self) -> StageStatus:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def close(self) -> None:
|
||||
pass
|
||||
```
|
||||
|
||||
**create** method is called during the workflow instantiation and creates a instance of the processor
|
||||
|
||||
**stage_record** this method is called for each record coming down in workflow chain and can be used to store the record. This method doesn't emit anything for the downstream to process on.
|
||||
|
||||
**get_status** to report the status of the stage ex: how many records, failures or warnings etc..
|
||||
|
||||
**close** gets called before the workflow stops. Can be used to cleanup any connections or other resources.
|
||||
|
||||
|
||||
## Example
|
||||
|
||||
Example implmentation
|
||||
|
||||
```py
|
||||
|
||||
class FileStage(Stage):
|
||||
config: FileStageConfig
|
||||
status: StageStatus
|
||||
|
||||
def __init__(self, ctx: WorkflowContext, config: FileStageConfig, metadata_config: MetadataServerConfig):
|
||||
super().__init__(ctx)
|
||||
self.config = config
|
||||
self.status = StageStatus()
|
||||
|
||||
fpath = pathlib.Path(self.config.filename)
|
||||
self.file = fpath.open("w")
|
||||
self.wrote_something = False
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
|
||||
config = FileStageConfig.parse_obj(config_dict)
|
||||
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
|
||||
return cls(ctx, config, metadata_config)
|
||||
|
||||
def stage_record(
|
||||
self,
|
||||
record: TableEntity
|
||||
) -> None:
|
||||
json_record = json.loads(record.json())
|
||||
self.file.write(json.dumps(json_record))
|
||||
self.file.write("\n")
|
||||
self.status.records_status(record)
|
||||
|
||||
def get_status(self):
|
||||
return self.status
|
||||
|
||||
def close(self):
|
||||
self.file.close()
|
||||
```
|
||||
Loading…
x
Reference in New Issue
Block a user