mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-10-23 23:04:23 +00:00
455 lines
15 KiB
Markdown
455 lines
15 KiB
Markdown
![]() |
---
|
||
|
title: Python SDK for Lineage
|
||
|
slug: /sdk/python/ingestion/lineage
|
||
|
---
|
||
|
|
||
|
# Python SDK for Lineage
|
||
|
|
||
|
In this guide, we will use the Python SDK to create and fetch Lineage information.
|
||
|
|
||
|
For simplicity, we are going to create lineage between Tables. However, this would work with ANY entity.
|
||
|
|
||
|
You can find the Lineage Entity defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json),
|
||
|
as well as the Entity defining the payload to add a new lineage: [AddLineage](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/api/lineage/addLineage.json).
|
||
|
|
||
|
{% note %}
|
||
|
|
||
|
Note that in OpenMetadata, the Lineage information is just a possible relationship between Entities. Other types
|
||
|
of relationships for example could be:
|
||
|
|
||
|
- Contains (a Database contains Schemas, which at the same time contain Tables),
|
||
|
- or Ownership of any asset.
|
||
|
|
||
|
The point being, any Entity existent in OpenMetadata can be related to any other via Lineage.
|
||
|
|
||
|
{% /note %}
|
||
|
|
||
|
In the following sections we will:
|
||
|
- Create a Database Service, a Database, a Schema and two Tables,
|
||
|
- Add Lineage between both Tables,
|
||
|
- Get the Lineage information back.
|
||
|
|
||
|
A **prerequisite** for this section is to have previously gone through the following [docs](/sdk/python).
|
||
|
|
||
|
## Creating the Entities
|
||
|
|
||
|
To prepare the necessary ingredients, execute the following steps.
|
||
|
|
||
|
All functions that we are going to use related to Lineage can be found in [here](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py)
|
||
|
|
||
|
### 1. Preparing the Client
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||
|
OpenMetadataConnection,
|
||
|
)
|
||
|
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
|
||
|
OpenMetadataJWTClientConfig,
|
||
|
)
|
||
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||
|
|
||
|
server_config = OpenMetadataConnection(
|
||
|
hostPort="http://localhost:8585/api",
|
||
|
authProvider="openmetadata",
|
||
|
securityConfig=OpenMetadataJWTClientConfig(
|
||
|
jwtToken="<token>"
|
||
|
),
|
||
|
)
|
||
|
metadata = OpenMetadata(server_config)
|
||
|
|
||
|
assert metadata.health_check() # Will fail if we cannot reach the server
|
||
|
```
|
||
|
|
||
|
### 2. Creating the Database Service
|
||
|
|
||
|
We are mocking a MySQL instance. Note how we need to pass the right configuration class `MysqlConnection`, as a
|
||
|
parameter for the generic `DatabaseConnection` type.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.services.createDatabaseService import (
|
||
|
CreateDatabaseServiceRequest,
|
||
|
)
|
||
|
from metadata.generated.schema.entity.services.connections.database.common.basicAuth import (
|
||
|
BasicAuth,
|
||
|
)
|
||
|
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
||
|
MysqlConnection,
|
||
|
)
|
||
|
from metadata.generated.schema.entity.services.databaseService import (
|
||
|
DatabaseConnection,
|
||
|
DatabaseService,
|
||
|
DatabaseServiceType,
|
||
|
)
|
||
|
|
||
|
db_service = CreateDatabaseServiceRequest(
|
||
|
name="test-service-db-lineage",
|
||
|
serviceType=DatabaseServiceType.Mysql,
|
||
|
connection=DatabaseConnection(
|
||
|
config=MysqlConnection(
|
||
|
username="username",
|
||
|
authType=BasicAuth(password="password"),
|
||
|
hostPort="http://localhost:1234",
|
||
|
)
|
||
|
),
|
||
|
)
|
||
|
|
||
|
db_service_entity = metadata.create_or_update(data=db_service)
|
||
|
```
|
||
|
|
||
|
### 3. Creating the Database
|
||
|
|
||
|
Any Entity that is created and linked to another Entity, has to hold the `fullyQualifiedName` to the Entity it
|
||
|
relates to. In this case, a Database is bound to a specific service.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||
|
|
||
|
create_db = CreateDatabaseRequest(
|
||
|
name="test-db",
|
||
|
service=db_service_entity.fullyQualifiedName,
|
||
|
)
|
||
|
|
||
|
create_db_entity = metadata.create_or_update(data=create_db)
|
||
|
```
|
||
|
|
||
|
### 4. Creating the Schema
|
||
|
|
||
|
The same happens with the Schemas. They are related to a Database.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||
|
CreateDatabaseSchemaRequest,
|
||
|
)
|
||
|
|
||
|
create_schema = CreateDatabaseSchemaRequest(
|
||
|
name="test-schema", database=create_db_entity.fullyQualifiedName
|
||
|
)
|
||
|
|
||
|
create_schema_entity = metadata.create_or_update(data=create_schema)
|
||
|
```
|
||
|
|
||
|
### 5. Creating the Tables
|
||
|
|
||
|
And finally, Tables are contained in a specific Schema, so we use the `fullyQualifiedName` here as well.
|
||
|
|
||
|
We are doing a simple example with a single column.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||
|
from metadata.generated.schema.entity.data.table import Column, DataType
|
||
|
|
||
|
table_a = CreateTableRequest(
|
||
|
name="tableA",
|
||
|
databaseSchema=create_schema_entity.fullyQualifiedName,
|
||
|
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
||
|
)
|
||
|
|
||
|
table_b = CreateTableRequest(
|
||
|
name="tableB",
|
||
|
databaseSchema=create_schema_entity.fullyQualifiedName,
|
||
|
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
||
|
)
|
||
|
|
||
|
table_a_entity = metadata.create_or_update(data=table_a)
|
||
|
table_b_entity = metadata.create_or_update(data=table_b)
|
||
|
```
|
||
|
|
||
|
### 6. Adding Lineage
|
||
|
|
||
|
With everything prepared, we can now create the Lineage between both Entities. An `AddLineageRequest` type
|
||
|
represents the edge between two Entities, typed under `EntitiesEdge`.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||
|
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
||
|
|
||
|
add_lineage_request = AddLineageRequest(
|
||
|
edge=EntitiesEdge(
|
||
|
description="test lineage",
|
||
|
fromEntity=EntityReference(id=table_a_entity.id, type="table"),
|
||
|
toEntity=EntityReference(id=table_b_entity.id, type="table"),
|
||
|
),
|
||
|
)
|
||
|
|
||
|
created_lineage = metadata.add_lineage(data=add_lineage_request)
|
||
|
```
|
||
|
|
||
|
The Python client will already return us a JSON object with the Lineage information about the `fromEntity` node
|
||
|
we added:
|
||
|
|
||
|
```json
|
||
|
{
|
||
|
"entity": {
|
||
|
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
|
||
|
"type": "table",
|
||
|
"name": "tableA",
|
||
|
"fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableA",
|
||
|
"deleted": false,
|
||
|
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
|
||
|
},
|
||
|
"nodes": [
|
||
|
{
|
||
|
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
|
||
|
"type": "table",
|
||
|
"name": "tableB",
|
||
|
"fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableB",
|
||
|
"deleted": false,
|
||
|
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
|
||
|
}
|
||
|
],
|
||
|
"upstreamEdges": [],
|
||
|
"downstreamEdges": [
|
||
|
{
|
||
|
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
|
||
|
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e"
|
||
|
}
|
||
|
]
|
||
|
}
|
||
|
```
|
||
|
|
||
|
If the node were to have other edges already, they would be showing up here.
|
||
|
|
||
|
If we validate the Lineage from the UI, we will see:
|
||
|
|
||
|
{% image src="/images/v1.5/sdk/python/ingestion/lineage/simple-lineage.png" alt="simple-lineage" /%}
|
||
|
|
||
|
|
||
|
### 7. Fetching Lineage
|
||
|
|
||
|
Finally, let's fetch the lineage from the other node involved:
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.entity.data.table import Table
|
||
|
|
||
|
metadata.get_lineage_by_name(
|
||
|
entity=Table,
|
||
|
fqn="test-service-db-lineage.test-db.test-schema.tableB",
|
||
|
# Tune this to control how far in the lineage graph to go
|
||
|
up_depth=1,
|
||
|
down_depth=1
|
||
|
)
|
||
|
```
|
||
|
|
||
|
Which will give us the symmetric results from above
|
||
|
|
||
|
```json
|
||
|
{
|
||
|
"entity": {
|
||
|
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
|
||
|
"type": "table",
|
||
|
"name": "tableB",
|
||
|
"fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableB",
|
||
|
"deleted": false,
|
||
|
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
|
||
|
},
|
||
|
"nodes": [
|
||
|
{
|
||
|
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
|
||
|
"type": "table",
|
||
|
"name": "tableA",
|
||
|
"fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableA",
|
||
|
"deleted": false,
|
||
|
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
|
||
|
}
|
||
|
],
|
||
|
"upstreamEdges": [
|
||
|
{
|
||
|
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
|
||
|
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e"
|
||
|
}
|
||
|
],
|
||
|
"downstreamEdges": []
|
||
|
}
|
||
|
```
|
||
|
|
||
|
{% note noteType="Tip" %}
|
||
|
|
||
|
You can also get lineage by ID using the `get_lineage_by_id` method, which accepts `entity_id` instead of `fqn`.
|
||
|
|
||
|
{% /note %}
|
||
|
|
||
|
# Lineage Details
|
||
|
|
||
|
Note how when adding lineage information we give to the API an [AddLineage](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/api/lineage/addLineage.json)
|
||
|
Request. This is composed of an Entity Edge, whose definition you can find [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/type/entityLineage.json#L75).
|
||
|
|
||
|
In a nutshell, an Entity Edge has:
|
||
|
1. The Entity Reference as the lineage origin,
|
||
|
2. The Entity Reference as the lineage destination,
|
||
|
3. Optionally, Lineage Details.
|
||
|
|
||
|
In the Lineage Details property we can pass further information specific about Table to Table lineage:
|
||
|
- `sqlQuery` specifying the transformation,
|
||
|
- An array of `columnsLineage` as an object with an array of source and destination columns, as well as their own specific transformation function,
|
||
|
- Optionally, the Entity Reference of a Pipeline powering the transformation from Table A to Table B.
|
||
|
|
||
|
The API call will be exactly the same as before, but now we will add more ingredients when defining our objects. Let's
|
||
|
see how to do that and play with the possible combinations:
|
||
|
|
||
|
First, import the required classes and create a new table:
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.type.entityLineage import (
|
||
|
ColumnLineage,
|
||
|
EntitiesEdge,
|
||
|
LineageDetails,
|
||
|
)
|
||
|
|
||
|
# Prepare a new table
|
||
|
table_c = CreateTableRequest(
|
||
|
name="tableC",
|
||
|
databaseSchema=create_schema_entity.ifullyQualifiedName,
|
||
|
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
||
|
)
|
||
|
|
||
|
table_c_entity = metadata.create_or_update(data=table_c)
|
||
|
```
|
||
|
|
||
|
## Column Level Lineage
|
||
|
|
||
|
We can start by linking our columns together. For that we are going to create:
|
||
|
|
||
|
1. A `ColumnLineage` object, linking our Table A column ID -> Table C column ID. Note that this can be a list!
|
||
|
2. A `LineageDetails` object, passing the column lineage and the SQL query that powers the transformation.
|
||
|
|
||
|
```python
|
||
|
column_lineage = ColumnLineage(
|
||
|
fromColumns=["test-service-db-lineage.test-db.test-schema.tableA.id"],
|
||
|
toColumn="test-service-db-lineage.test-db.test-schema.tableC.id"
|
||
|
)
|
||
|
|
||
|
lineage_details = LineageDetails(
|
||
|
sqlQuery="SELECT * FROM AWESOME",
|
||
|
columnsLineage=[column_lineage],
|
||
|
)
|
||
|
|
||
|
add_lineage_request = AddLineageRequest(
|
||
|
edge=EntitiesEdge(
|
||
|
fromEntity=EntityReference(id=table_a_entity.id, type="table"),
|
||
|
toEntity=EntityReference(id=table_c_entity.id, type="table"),
|
||
|
lineageDetails=lineage_details,
|
||
|
),
|
||
|
)
|
||
|
|
||
|
created_lineage = metadata.add_lineage(data=add_lineage_request)
|
||
|
```
|
||
|
|
||
|
This information will now be reflected in the UI as well:
|
||
|
|
||
|
{% image src="/images/v1.5/sdk/python/ingestion/lineage/lineage-col.png" alt="lineage-col" /%}
|
||
|
|
||
|
### Adding a Pipeline Reference
|
||
|
|
||
|
We can as well pass the reference to the pipeline used to create the lineage (e.g., the ETL feeding the tables).
|
||
|
|
||
|
To prepare this example, we need to start by creating the Pipeline Entity. Again, we'll need first
|
||
|
to prepare the Pipeline Service:
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
|
||
|
from metadata.generated.schema.api.services.createPipelineService import (
|
||
|
CreatePipelineServiceRequest,
|
||
|
)
|
||
|
from metadata.generated.schema.entity.services.pipelineService import (
|
||
|
PipelineConnection,
|
||
|
PipelineService,
|
||
|
PipelineServiceType,
|
||
|
)
|
||
|
from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
|
||
|
BackendConnection,
|
||
|
)
|
||
|
|
||
|
pipeline_service = CreatePipelineServiceRequest(
|
||
|
name="test-service-pipeline",
|
||
|
serviceType=PipelineServiceType.Airflow,
|
||
|
connection=PipelineConnection(
|
||
|
config=AirflowConnection(
|
||
|
hostPort="http://localhost:8080",
|
||
|
connection=BackendConnection(),
|
||
|
),
|
||
|
),
|
||
|
)
|
||
|
|
||
|
pipeline_service_entity = metadata.create_or_update(data=pipeline_service)
|
||
|
|
||
|
create_pipeline = CreatePipelineRequest(
|
||
|
name="test",
|
||
|
service=pipeline_service_entity.fullyQualifiedName,
|
||
|
)
|
||
|
|
||
|
pipeline_entity = metadata.create_or_update(data=create_pipeline)
|
||
|
```
|
||
|
|
||
|
With these ingredients ready, we can then follow the code above and add there a `pipeline` argument
|
||
|
as an Entity Reference:
|
||
|
|
||
|
```python
|
||
|
lineage_details = LineageDetails(
|
||
|
sqlQuery="SELECT * FROM AWESOME",
|
||
|
columnsLineage=[column_lineage],
|
||
|
pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
|
||
|
)
|
||
|
```
|
||
|
|
||
|
{% note %}
|
||
|
|
||
|
The UI currently supports showing the column lineage information. Data about the SQL queries and the Pipeline Entities
|
||
|
will be surfaced soon. Thanks!
|
||
|
|
||
|
{% /note %}
|
||
|
|
||
|
# Automated SQL lineage
|
||
|
|
||
|
In case you want OpenMetadata to identify the lineage based on the sql query, then you can make use of the method `add_lineage_by_query` of the python SDK to parser the sql and generate the lineage in OpenMetadata.
|
||
|
|
||
|
|
||
|
follow the below code snippet for the example:
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
||
|
|
||
|
database_service: DatabaseService = metadata.get_by_name(
|
||
|
entity=DatabaseService, fqn="my_service"
|
||
|
)
|
||
|
|
||
|
metadata.add_lineage_by_query(
|
||
|
database_service=database_service,
|
||
|
timeout=200, # timeout in seconds
|
||
|
sql="insert into target_table(id) as select id from source_table" # your sql query
|
||
|
)
|
||
|
```
|
||
|
|
||
|
Above example would create a lineage between `target_table` and `source_table` within `my_service` database service.
|
||
|
|
||
|
|
||
|
## Automated SQL lineage via CLI
|
||
|
|
||
|
To create the automated sql lineage via CLI, you need to make sure that you have installed the openmetadata-ingestion package in your local environment using command `pip install openmetadata-ingestion`.
|
||
|
|
||
|
Once that is done you will have to prepare a yaml file as follows.
|
||
|
|
||
|
```yaml
|
||
|
serviceName: local_mysql
|
||
|
query: insert into target_table(id) as select id from source_table
|
||
|
# filePath: test.sql
|
||
|
# parseTimeout: 360 # timeout in seconds
|
||
|
workflowConfig:
|
||
|
# loggerLevel: DEBUG # DEBUG, INFO, WARN or ERROR
|
||
|
openMetadataServerConfig:
|
||
|
hostPort: <OpenMetadata host and port>
|
||
|
authProvider: <OpenMetadata auth provider>
|
||
|
```
|
||
|
|
||
|
- **serviceName**: Name of the database service which contains the table involved in query.
|
||
|
- **query**: You can specify the raw sql query within the yaml file itself.
|
||
|
- **filePath**: In case the query is too big then you can also save query in a file and pass the path to the file in this field.
|
||
|
- **parseTimeout**: Timeout for the lineage parsing process.
|
||
|
- **workflowConfig**: The main property here is the openMetadataServerConfig, where you can define the host and security provider of your OpenMetadata installation.
|
||
|
|
||
|
|
||
|
|
||
|
Once the yaml file is prepare you can run the command
|
||
|
|
||
|
```
|
||
|
metadata lineage -c path/to/your_config_yaml.yaml
|
||
|
```
|