mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-29 01:32:01 +00:00
Prep lineage information (#7940)
This commit is contained in:
parent
9c64aedd77
commit
89b80b121b
@ -9,6 +9,9 @@ In this guide, we will use the Python SDK to create and fetch Lineage informatio
|
||||
|
||||
For simplicity, we are going to create lineage between Tables. However, this would work with ANY entity.
|
||||
|
||||
You can find the Lineage Entity defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json),
|
||||
as well as the Entity defining the payload to add a new lineage: [AddLineage](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/api/lineage/addLineage.json).
|
||||
|
||||
<Note>
|
||||
|
||||
Note that in OpenMetadata, the Lineage information is just a possible relationship between Entities. Other types
|
||||
@ -203,6 +206,11 @@ we added:
|
||||
|
||||
If the node were to have other edges already, they would be showing up here.
|
||||
|
||||
If we validate the Lineage from the UI, we will see:
|
||||
|
||||
<Image src="/images/sdk/python/ingestion/lineage/simple-lineage.png" alt="simple-lineage"/>
|
||||
|
||||
|
||||
### 7. Fetching Lineage
|
||||
|
||||
Finally, let's fetch the lineage from the other node involved:
|
||||
@ -257,3 +265,134 @@ You can also get lineage by ID using the `get_lineage_by_id` method, which accep
|
||||
|
||||
</Tip>
|
||||
|
||||
# Lineage Details
|
||||
|
||||
Note how when adding lineage information we give to the API an [AddLineage](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/api/lineage/addLineage.json)
|
||||
Request. This is composed of an Entity Edge, whose definition you can find [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json#L75).
|
||||
|
||||
In a nutshell, an Entity Edge has:
|
||||
1. The Entity Reference as the lineage origin,
|
||||
2. The Entity Reference as the lineage destination,
|
||||
3. Optionally, Lineage Details.
|
||||
|
||||
In the Lineage Details property we can pass further information specific about Table to Table lineage:
|
||||
- `sqlQuery` specifying the transformation,
|
||||
- An array of `columnsLineage` as an object with an array of source and destination columns, as well as their own specific transformation function,
|
||||
- Optionally, the Entity Reference of a Pipeline powering the transformation from Table A to Table B.
|
||||
|
||||
The API call will be exactly the same as before, but now we will add more ingredients when defining our objects. Let's
|
||||
see how to do that and play with the possible combinations:
|
||||
|
||||
First, import the required classes and create a new table:
|
||||
|
||||
```python
|
||||
from metadata.generated.schema.type.entityLineage import (
|
||||
ColumnLineage,
|
||||
EntitiesEdge,
|
||||
LineageDetails,
|
||||
)
|
||||
|
||||
# Prepare a new table
|
||||
table_c = CreateTableRequest(
|
||||
name="tableC",
|
||||
databaseSchema=EntityReference(
|
||||
id=create_schema_entity.id, name="test-schema", type="databaseSchema"
|
||||
),
|
||||
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
||||
)
|
||||
|
||||
table_c_entity = metadata.create_or_update(data=table_c)
|
||||
```
|
||||
|
||||
## Column Level Lineage
|
||||
|
||||
We can start by linking our columns together. For that we are going to create:
|
||||
|
||||
1. A `ColumnLineage` object, linking our Table A column ID -> Table C column ID. Note that this can be a list!
|
||||
2. A `LineageDetails` object, passing the column lineage and the SQL query that powers the transformation.
|
||||
|
||||
```python
|
||||
column_lineage = ColumnLineage(
|
||||
fromColumns=["test-service-db-lineage.test-db.test-schema.tableA.id"],
|
||||
toColumn="test-service-db-lineage.test-db.test-schema.tableC.id"
|
||||
)
|
||||
|
||||
lineage_details = LineageDetails(
|
||||
sqlQuery="SELECT * FROM AWESOME",
|
||||
columnsLineage=[column_lineage],
|
||||
)
|
||||
|
||||
add_lineage_request = AddLineageRequest(
|
||||
edge=EntitiesEdge(
|
||||
fromEntity=EntityReference(id=table_a_entity.id, type="table"),
|
||||
toEntity=EntityReference(id=table_c_entity.id, type="table"),
|
||||
lineageDetails=lineage_details,
|
||||
),
|
||||
)
|
||||
|
||||
created_lineage = metadata.add_lineage(data=add_lineage_request)
|
||||
```
|
||||
|
||||
This information will now be reflected in the UI as well:
|
||||
|
||||
<Image src="/images/sdk/python/ingestion/lineage/lineage-col.png" alt="lineage-col"/>
|
||||
|
||||
### Adding a Pipeline Reference
|
||||
|
||||
We can as well pass the reference to the pipeline used to create the lineage (e.g., the ETL feeding the tables).
|
||||
|
||||
To prepare this example, we need to start by creating the Pipeline Entity. Again, we'll need first
|
||||
to prepare the Pipeline Service:
|
||||
|
||||
```python
|
||||
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
||||
from metadata.generated.schema.api.services.createPipelineService import (
|
||||
CreatePipelineServiceRequest,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.pipelineService import (
|
||||
PipelineConnection,
|
||||
PipelineService,
|
||||
PipelineServiceType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
|
||||
BackendConnection,
|
||||
)
|
||||
|
||||
pipeline_service = CreatePipelineServiceRequest(
|
||||
name="test-service-pipeline",
|
||||
serviceType=PipelineServiceType.Airflow,
|
||||
connection=PipelineConnection(
|
||||
config=AirflowConnection(
|
||||
hostPort="http://localhost:8080",
|
||||
connection=BackendConnection(),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
pipeline_service_entity = metadata.create_or_update(data=pipeline_service)
|
||||
|
||||
create_pipeline = CreatePipelineRequest(
|
||||
name="test",
|
||||
service=EntityReference(id=pipeline_service_entity.id, type="pipelineService"),
|
||||
)
|
||||
|
||||
pipeline_entity = metadata.create_or_update(data=create_pipeline)
|
||||
```
|
||||
|
||||
With these ingredients ready, we can then follow the code above and add there a `pipeline` argument
|
||||
as an Entity Reference:
|
||||
|
||||
```python
|
||||
lineage_details = LineageDetails(
|
||||
sqlQuery="SELECT * FROM AWESOME",
|
||||
columnsLineage=[column_lineage],
|
||||
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
|
||||
)
|
||||
```
|
||||
|
||||
<Note>
|
||||
|
||||
The UI currently supports showing the column lineage information. Data about the SQL queries and the Pipeline Entities
|
||||
will be surfaced soon. Thanks!
|
||||
|
||||
</Note>
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user