[Docs] - Workflow Breaking Changes - Custom Connectors (#13834)

* quickstart

* [Docs] - Workflow Breaking Changes - Custom Connectors
This commit is contained in:
Pere Miquel Brull 2023-11-03 08:24:03 +01:00 committed by GitHub
parent 1c56d8c181
commit ca9a92e39d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 105 additions and 257 deletions

View File

@ -198,6 +198,29 @@ If you try to run your workflows externally and start noticing `ImportError`s, y
In 1.1.7 and below you could run the Usage Workflow as `metadata ingest -c <path to yaml>`. Now, the Usage Workflow
has its own command `metadata usage -c <path to yaml>`.
### Custom Connectors
In 1.2.0 we have reorganized the internals of our Workflow handling to centralize status & exception management. This
will simplify how you need to take care of status and exceptions on your Custom Connectors code, while helping developers
to make decisions on those errors that need to be shared in the Workflow.
{% note %}
If you want to take a look at an updated Custom Connector and its changes, you can review the demo [PR](https://github.com/open-metadata/openmetadata-demo/pull/34/files).
{% /note %}
Let's list the changes down:
1. You don't need to handle the `SourceStatus` anymore. The new basic Workflow class will take care of things for you. Therefore, this import
`from metadata.ingestion.api.source import SourceStatus` is deprecated.
2. The `Source` class is now imported from `from metadata.ingestion.api.steps import Source` (instead of `from metadata.ingestion.api.source import Source`)
3. We are now initializing the `OpenMetadata` object at the Workflow level (to share it better in each step). Therefore,
the source `__init__` method signature is now `def __init__(self, config: WorkflowSource, metadata: OpenMetadata):`. Make sure to store the `self.metadata` object
during the `__init__` and don't forget to call `super().__init__()`.
4. We are updating how the status & exception management happens in the connectors. Now each `yield` result is wrapped by
an `Either` (imported from `from metadata.ingestion.api.models import Either`). Your correct data will be `yield`ed in a `right`, while
the errors are tracked in a `left`. Read more about the Workflow management [here](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/workflow/README.md).
### Other Changes
- Pipeline Status are now timestamps in milliseconds.

View File

@ -12,7 +12,7 @@ Are you exploring or doing a PoC? It won't get easier than following our Quickst
{% inlineCallout
icon="celebration"
bold="Quickstart OpenMetadata"
href="/quick-start/local-deployment" %}
href="/quick-start/local-docker-deployment" %}
Get OpenMetadata up and running in under 5 minutes!
{% /inlineCallout %}
@ -34,7 +34,7 @@ We support different kinds of deployment:
color="violet-70"
icon="celebration"
bold="Local Docker Deployment"
href="/quick-start/local-deployment"%}
href="/quick-start/local-docker-deployment"%}
Get OpenMetadata up and running locally in under 7 minutes!
{%/inlineCallout%}
{%inlineCallout

View File

@ -9,35 +9,17 @@ slug: /sdk/python/build-connector/bulk-sink
## API
```python
@dataclass # type: ignore[misc]
class BulkSink(Closeable, metaclass=ABCMeta):
ctx: WorkflowContext
class BulkSink(BulkStep, ABC):
"""All Stages must inherit this base class."""
@classmethod
# From the parent - Adding here just to showcase
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "BulkSink":
pass
@abstractmethod
def write_records(self) -> None:
pass
@abstractmethod
def get_status(self) -> BulkSinkStatus:
pass
@abstractmethod
def close(self) -> None:
def run(self) -> None:
pass
```
**create** method is called during the workflow instantiation and creates an instance of the bulk sink.
**write_records** this method is called only once in Workflow. Its developer responsibility is to make bulk actions inside this method. Such as read the entire file or store to generate the API calls to external services.
**get_status** to report the status of the bulk_sink ex: how many records, failures or warnings etc.
**close** gets called before the workflow stops. Can be used to clean up any connections or other resources.
**run** this method is called only once in Workflow. Its developer responsibility is to make bulk actions inside this method. Such as read the entire file or store to generate the API calls to external services.
## Example
[Example implementation](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/bulksink/metadata_usage.py#L52)

View File

@ -12,6 +12,7 @@ Ingestion is a simple python framework to ingest the metadata from various sourc
Please look at our framework [APIs](https://github.com/open-metadata/OpenMetadata/tree/main/ingestion/src/metadata/ingestion/api).
## Workflow
[workflow](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/api/workflow.py) is a simple orchestration job that runs the components in an Order.
A workflow consists of [Source](/sdk/python/build-connector/source) and [Sink](/sdk/python/build-connector/sink). It also provides support for [Stage](/sdk/python/build-connector/stage) and [BulkSink](/sdk/python/build-connector/bulk-sink).
@ -26,6 +27,36 @@ Workflow execution happens in a serial fashion.
In the cases where we need aggregation over the records, we can use the **stage** to write to a file or other store. Use the file written to in **stage** and pass it to **bulk sink** to publish to external services such as **OpenMetadata** or **Elasticsearch**.
Each `Step` comes from this generic definition:
```python
class Step(ABC, Closeable):
"""All Workflow steps must inherit this base class."""
status: Status
def __init__(self):
self.status = Status()
@classmethod
@abstractmethod
def create(cls, config_dict: dict, metadata: OpenMetadata) -> "Step":
pass
def get_status(self) -> Status:
return self.status
@abstractmethod
def close(self) -> None:
pass
```
so we always need to inform the methods:
- `create` to initialize the actual step.
- `close` in case there's any connection that needs to be terminated.
On top of this, you can find further notes on each specific step in the links below:
{% inlineCalloutContainer %}
{% inlineCallout
color="violet-70"
@ -56,3 +87,5 @@ In the cases where we need aggregation over the records, we can use the **stage*
It can be used to bulk update the records generated in a workflow.
{% /inlineCallout %}
{% /inlineCalloutContainer %}
Read more about the Workflow management [here](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/workflow/README.md).

View File

@ -9,87 +9,18 @@ The **Sink** will get the event emitted by the source, one at a time. It can use
## API
```python
@dataclass # type: ignore[misc]
class Sink(Closeable, metaclass=ABCMeta):
"""All Sinks must inherit this base class."""
class Sink(ReturnStep, ABC):
"""All Sinks must inherit this base class."""
ctx: WorkflowContext
@classmethod
# From the parent - Just to showcase
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Sink":
pass
@abstractmethod
def write_record(self, record: Record) -> None:
# must call callback when done.
pass
@abstractmethod
def get_status(self) -> SinkStatus:
pass
@abstractmethod
def close(self) -> None:
pass
def _run(self, record: Entity) -> Either:
"""
Main entrypoint to execute the step
"""
```
**create** method is called during the workflow instantiation and creates an instance of the sink.
**write_record** this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc.
**get_status** to report the status of the sink ex: how many records, failures or warnings etc.
**close** gets called before the workflow stops. Can be used to clean up any connections or other resources.
**_run** this method is called for each record coming down in the workflow chain and can be used to store the record in external services etc.
## Example
Example implementation
```python
class MetadataRestTablesSink(Sink):
config: MetadataTablesSinkConfig
status: SinkStatus
def __init__(self, ctx: WorkflowContext, config: MetadataTablesSinkConfig, metadata_config: MetadataServerConfig):
super().__init__(ctx)
self.config = config
self.metadata_config = metadata_config
self.status = SinkStatus()
self.wrote_something = False
self.rest = REST(self.metadata_config)
@classmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
config = MetadataTablesSinkConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(ctx, config, metadata_config)
def write_record(self, entity_request) -> None:
log = f"{type(entity_request).__name__} [{entity_request.name.__root__}]"
try:
created = self.metadata.create_or_update(entity_request)
if created:
self.status.records_written(
f"{type(created).__name__}: {created.fullyQualifiedName.__root__}"
)
logger.debug(f"Successfully ingested {log}")
else:
self.status.failure(log)
logger.error(f"Failed to ingest {log}")
except (APIError, HTTPError) as err:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to ingest {log} due to api request failure: {err}")
self.status.failure(log)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to ingest {log}: {exc}")
self.status.failure(log)
def get_status(self):
return self.status
def close(self):
pass
```
[Example implementation](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/sink/metadata_rest.py#L87)

View File

@ -10,115 +10,40 @@ The **Source** is the connector to external systems and outputs a record for dow
## Source API
```python
@dataclass # type: ignore[misc]
class Source(Closeable, metaclass=ABCMeta):
ctx: WorkflowContext
@classmethod
class Source(IterStep, ABC):
"""
Abstract source implementation. The workflow will run
its next_record and pass them to the next step.
"""
metadata: OpenMetadata
connection_obj: Any
service_connection: Any
# From the parent - Adding here just to showcase
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Source":
pass
def _iter(self) -> Iterable[Either]:
"""Main entrypoint to run through the Iterator"""
@abstractmethod
def prepare(self):
pass
@abstractmethod
def next_record(self) -> Iterable[Record]:
pass
@abstractmethod
def get_status(self) -> SourceStatus:
def test_connection(self) -> None:
pass
```
**create** method is used to create an instance of Source.
**prepare** will be called through Python's init method. This will be a place where you could make connections to external sources or initiate the client library.
**next_record** is where the client can connect to an external resource and emit the data downstream.
**_iter** is where the client can connect to an external resource and emit the data downstream.
**test_connection** is used (by OpenMetadata supported connectors ONLY) to validate permissions and connectivity before moving forward with the ingestion.
**get_status** is for the [workflow](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/api/workflow.py) to call and report the status of the source such as how many records its processed any failures or warnings.
## Example
A simple example of this implementation is
```python
class SampleTablesSource(Source):
def __init__(self, config: SampleTableSourceConfig, metadata_config: MetadataServerConfig, ctx):
super().__init__(ctx)
self.status = SampleTableSourceStatus()
self.config = config
self.metadata_config = metadata_config
self.client = REST(metadata_config)
self.service_json = json.load(open(config.sample_schema_folder + "/service.json", 'r'))
self.database = json.load(open(config.sample_schema_folder + "/database.json", 'r'))
self.tables = json.load(open(config.sample_schema_folder + "/tables.json", 'r'))
self.service = get_service_or_create(self.service_json, metadata_config)
@classmethod
def create(cls, config_dict, metadata_config_dict, ctx):
config = SampleTableSourceConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(config, metadata_config, ctx)
def prepare(self):
pass
def next_record(self) -> Iterable[Entity]:
yield from self.yield_create_request_database_service(self.config)
service_entity: DatabaseService = self.metadata.get_by_name(
entity=DatabaseService, fqn=self.config.serviceName
)
yield CreateDatabaseRequest(
name="awesome-database",
service=service_entity.fullyQualifiedName,
)
database_entity: Database = self.metadata.get_by_name(
entity=Database, fqn.build(
self.metadata,
entity_type=Database,
service_name=self.context.database_service.name.__root__,
database_name="awesome-database",
)
)
yield CreateDatabaseSchemaRequest(
name="awesome-schema",
description="description",
database=database_entity.fullyQualifiedName,
)
database_schema_entity: DatabaseSchema = self.metadata.get_by_name(
entity=DatabaseSchema, fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.context.database_service.name.__root__,
database_name="awesome-database",
schema_name="awesome-schema"
)
)
yield CreateTableRequest(
name="awesome-table",
description="description",
columns="columns",
databaseSchema=database_schema_entity.fullyQualifiedName,
tableConstraints=table.get("tableConstraints"),
tableType=table["tableType"],
)
def close(self):
pass
def get_status(self):
return self.status
```
A simple example of this implementation can be found in our demo Custom Connector [here](https://github.com/open-metadata/openmetadata-demo/blob/main/custom-connector/connector/my_csv_connector.py)
## For Consumers of Openmetadata-ingestion to define custom connectors in their own package with same namespace

View File

@ -8,71 +8,25 @@ The **Stage** is an optional component in the workflow. It can be used to store
## API
```python
@dataclass # type: ignore[misc]
class Stage(Closeable, metaclass=ABCMeta):
ctx: WorkflowContext
class Stage(StageStep, ABC):
"""All Stages must inherit this base class."""
@classmethod
# From the parent - just to showcase
@abstractmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext) -> "Stage":
pass
def _run(self, record: Entity) -> Iterable[Either[str]]:
"""
Main entrypoint to execute the step.
@abstractmethod
def stage_record(self, record: Record):
pass
@abstractmethod
def get_status(self) -> StageStatus:
pass
@abstractmethod
def close(self) -> None:
pass
Note that the goal of this step is to store the
processed data somewhere (e.g., a file). We will
return an iterable to keep track of the processed
entities / exceptions, but the next step (Bulk Sink)
won't read these results. It will directly
pick up the file components.
"""
```
**create** method is called during the workflow instantiation and creates an instance of the processor.
**stage_record** this method is called for each record coming down in the workflow chain and can be used to store the record. This method doesn't emit anything for the downstream to process on.
**get_status** to report the status of the stage ex: how many records, failures or warnings etc.
**close** gets called before the workflow stops. Can be used to clean up any connections or other resources.
**_run** this method is called for each record coming down in the workflow chain and can be used to store the record. This method doesn't emit anything for the downstream to process on.
## Example
Example implementation
```python
class FileStage(Stage):
config: FileStageConfig
status: StageStatus
def __init__(self, ctx: WorkflowContext, config: FileStageConfig, metadata_config: MetadataServerConfig):
super().__init__(ctx)
self.config = config
self.status = StageStatus()
fpath = pathlib.Path(self.config.filename)
self.file = fpath.open("w")
self.wrote_something = False
@classmethod
def create(cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext):
config = FileStageConfig.parse_obj(config_dict)
metadata_config = MetadataServerConfig.parse_obj(metadata_config_dict)
return cls(ctx, config, metadata_config)
def stage_record(
self,
record: TableEntity
) -> None:
json_record = json.loads(record.json())
self.file.write(json.dumps(json_record))
self.file.write("\n")
self.status.records_status(record)
def get_status(self):
return self.status
def close(self):
self.file.close()
```
[Example implementation](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/stage/table_usage.py#L42)