mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-09 18:21:23 +00:00
260 lines
7.5 KiB
Markdown
260 lines
7.5 KiB
Markdown
![]() |
---
|
||
|
title: Python SDK for Lineage
|
||
|
slug: /sdk/python/ingestion/lineage
|
||
|
---
|
||
|
|
||
|
# Python SDK for Lineage
|
||
|
|
||
|
In this guide, we will use the Python SDK to create and fetch Lineage information.
|
||
|
|
||
|
For simplicity, we are going to create lineage between Tables. However, this would work with ANY entity.
|
||
|
|
||
|
<Note>
|
||
|
|
||
|
Note that in OpenMetadata, the Lineage information is just a possible relationship between Entities. Other types
|
||
|
of relationships for example could be:
|
||
|
|
||
|
- Contains (a Database contains Schemas, which at the same time contain Tables),
|
||
|
- or Ownership of any asset.
|
||
|
|
||
|
The point being, any Entity existent in OpenMetadata can be related to any other via Lineage.
|
||
|
|
||
|
</Note>
|
||
|
|
||
|
In the following sections we will:
|
||
|
- Create a Database Service, a Database, a Schema and two Tables,
|
||
|
- Add Lineage between both Tables,
|
||
|
- Get the Lineage information back.
|
||
|
|
||
|
A **prerequisite** for this section is to have previously gone through the following [docs](/sdk/python).
|
||
|
|
||
|
## Creating the Entities
|
||
|
|
||
|
To prepare the necessary ingredients, execute the following steps.
|
||
|
|
||
|
All functions that we are going to use related to Lineage can be found in [here](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/ometa/mixins/lineage_mixin.py)
|
||
|
|
||
|
### 1. Preparing the Client
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||
|
OpenMetadataConnection,
|
||
|
)
|
||
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||
|
|
||
|
server_config = OpenMetadataConnection(hostPort="http://localhost:8585/api")
|
||
|
metadata = OpenMetadata(server_config)
|
||
|
|
||
|
assert metadata.health_check() # Will fail if we cannot reach the server
|
||
|
```
|
||
|
|
||
|
### 2. Creating the Database Service
|
||
|
|
||
|
We are mocking a MySQL instance. Note how we need to pass the right configuration class `MysqlConnection`, as a
|
||
|
parameter for the generic `DatabaseConnection` type.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.services.createDatabaseService import (
|
||
|
CreateDatabaseServiceRequest,
|
||
|
)
|
||
|
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
|
||
|
MysqlConnection,
|
||
|
)
|
||
|
from metadata.generated.schema.entity.services.databaseService import (
|
||
|
DatabaseConnection,
|
||
|
DatabaseService,
|
||
|
DatabaseServiceType,
|
||
|
)
|
||
|
|
||
|
db_service = CreateDatabaseServiceRequest(
|
||
|
name="test-service-db-lineage",
|
||
|
serviceType=DatabaseServiceType.Mysql,
|
||
|
connection=DatabaseConnection(
|
||
|
config=MysqlConnection(
|
||
|
username="username",
|
||
|
password="password",
|
||
|
hostPort="http://localhost:1234",
|
||
|
)
|
||
|
),
|
||
|
)
|
||
|
|
||
|
db_service_entity = metadata.create_or_update(data=db_service)
|
||
|
```
|
||
|
|
||
|
### 3. Creating the Database
|
||
|
|
||
|
Any Entity that is created and linked to another Entity, has to hold the `EntityReference` to the Entity it
|
||
|
relates to. In this case, a Database is bound to a specific service.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
|
||
|
from metadata.generated.schema.type.entityReference import EntityReference
|
||
|
|
||
|
create_db = CreateDatabaseRequest(
|
||
|
name="test-db",
|
||
|
service=EntityReference(
|
||
|
id=db_service_entity.id, type="databaseService"
|
||
|
),
|
||
|
)
|
||
|
|
||
|
create_db_entity = metadata.create_or_update(data=create_db)
|
||
|
```
|
||
|
|
||
|
### 4. Creating the Schema
|
||
|
|
||
|
The same happens with the Schemas. They are related to a Database.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.data.createDatabaseSchema import (
|
||
|
CreateDatabaseSchemaRequest,
|
||
|
)
|
||
|
|
||
|
create_schema = CreateDatabaseSchemaRequest(
|
||
|
name="test-schema", database=EntityReference(
|
||
|
id=create_db_entity.id, name="test-db", type="database"
|
||
|
)
|
||
|
)
|
||
|
|
||
|
create_schema_entity = metadata.create_or_update(data=create_schema)
|
||
|
```
|
||
|
|
||
|
### 5. Creating the Tables
|
||
|
|
||
|
And finally, Tables are contained in a specific Schema, so we use the `EntityReference` here as well.
|
||
|
|
||
|
We are doing a simple example with a single column.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.data.createTable import CreateTableRequest
|
||
|
from metadata.generated.schema.entity.data.table import Column, DataType
|
||
|
|
||
|
table_a = CreateTableRequest(
|
||
|
name="tableA",
|
||
|
databaseSchema=EntityReference(
|
||
|
id=create_schema_entity.id, name="test-schema", type="databaseSchema"
|
||
|
),
|
||
|
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
||
|
)
|
||
|
|
||
|
table_b = CreateTableRequest(
|
||
|
name="tableB",
|
||
|
databaseSchema=EntityReference(
|
||
|
id=create_schema_entity.id, name="test-schema", type="databaseSchema"
|
||
|
),
|
||
|
columns=[Column(name="id", dataType=DataType.BIGINT)],
|
||
|
)
|
||
|
|
||
|
table_a_entity = metadata.create_or_update(data=table_a)
|
||
|
table_b_entity = metadata.create_or_update(data=table_b)
|
||
|
```
|
||
|
|
||
|
### 6. Adding Lineage
|
||
|
|
||
|
With everything prepared, we can now create the Lineage between both Entities. An `AddLineageRequest` type
|
||
|
represents the edge between two Entities, typed under `EntitiesEdge`.
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||
|
from metadata.generated.schema.type.entityLineage import EntitiesEdge
|
||
|
|
||
|
add_lineage_request = AddLineageRequest(
|
||
|
description="test lineage",
|
||
|
edge=EntitiesEdge(
|
||
|
fromEntity=EntityReference(id=table_a_entity.id, type="table"),
|
||
|
toEntity=EntityReference(id=table_b_entity.id, type="table"),
|
||
|
),
|
||
|
)
|
||
|
|
||
|
created_lineage = metadata.add_lineage(data=add_lineage_request)
|
||
|
```
|
||
|
|
||
|
The Python client will already return us a JSON object with the Lineage information about the `fromEntity` node
|
||
|
we added:
|
||
|
|
||
|
```json
|
||
|
{
|
||
|
"entity": {
|
||
|
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
|
||
|
"type": "table",
|
||
|
"name": "tableA",
|
||
|
"fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableA",
|
||
|
"deleted": false,
|
||
|
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
|
||
|
},
|
||
|
"nodes": [
|
||
|
{
|
||
|
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
|
||
|
"type": "table",
|
||
|
"name": "tableB",
|
||
|
"fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableB",
|
||
|
"deleted": false,
|
||
|
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
|
||
|
}
|
||
|
],
|
||
|
"upstreamEdges": [],
|
||
|
"downstreamEdges": [
|
||
|
{
|
||
|
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
|
||
|
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e"
|
||
|
}
|
||
|
]
|
||
|
}
|
||
|
```
|
||
|
|
||
|
If the node were to have other edges already, they would be showing up here.
|
||
|
|
||
|
### 7. Fetching Lineage
|
||
|
|
||
|
Finally, let's fetch the lineage from the other node involved:
|
||
|
|
||
|
```python
|
||
|
from metadata.generated.schema.entity.data.table import Table
|
||
|
|
||
|
metadata.get_lineage_by_name(
|
||
|
entity=Table,
|
||
|
fqn="test-service-db-lineage.test-db.test-schema.tableB",
|
||
|
# Tune this to control how far in the lineage graph to go
|
||
|
up_depth=1,
|
||
|
down_depth=1
|
||
|
)
|
||
|
```
|
||
|
|
||
|
Which will give us the symmetric results from above
|
||
|
|
||
|
```json
|
||
|
{
|
||
|
"entity": {
|
||
|
"id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
|
||
|
"type": "table",
|
||
|
"name": "tableB",
|
||
|
"fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableB",
|
||
|
"deleted": false,
|
||
|
"href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
|
||
|
},
|
||
|
"nodes": [
|
||
|
{
|
||
|
"id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
|
||
|
"type": "table",
|
||
|
"name": "tableA",
|
||
|
"fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableA",
|
||
|
"deleted": false,
|
||
|
"href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
|
||
|
}
|
||
|
],
|
||
|
"upstreamEdges": [
|
||
|
{
|
||
|
"fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
|
||
|
"toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e"
|
||
|
}
|
||
|
],
|
||
|
"downstreamEdges": []
|
||
|
}
|
||
|
```
|
||
|
|
||
|
<Tip>
|
||
|
|
||
|
You can also get lineage by ID using the `get_lineage_by_id` method, which accepts `entity_id` instead of `fqn`.
|
||
|
|
||
|
</Tip>
|
||
|
|