13 KiB
		
	
	
	
	
	
	
	
			
		
		
	
	| title | slug | 
|---|---|
| Python SDK for Lineage | /sdk/python/ingestion/lineage | 
Python SDK for Lineage
In this guide, we will use the Python SDK to create and fetch Lineage information.
For simplicity, we are going to create lineage between Tables. However, this would work with ANY entity.
You can find the Lineage Entity defined here, as well as the Entity defining the payload to add a new lineage: AddLineage.
{% note %}
Note that in OpenMetadata, the Lineage information is just a possible relationship between Entities. Other types of relationships for example could be:
- Contains (a Database contains Schemas, which at the same time contain Tables),
- or Ownership of any asset.
The point being, any Entity existent in OpenMetadata can be related to any other via Lineage.
{% /note %}
In the following sections we will:
- Create a Database Service, a Database, a Schema and two Tables,
- Add Lineage between both Tables,
- Get the Lineage information back.
A prerequisite for this section is to have previously gone through the following docs.
Creating the Entities
To prepare the necessary ingredients, execute the following steps.
All functions that we are going to use related to Lineage can be found in here
1. Preparing the Client
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
)
from metadata.generated.schema.security.client.openMetadataJWTClientConfig import (
    OpenMetadataJWTClientConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
server_config = OpenMetadataConnection(
    hostPort="http://localhost:8585/api",
    authProvider="openmetadata",
    securityConfig=OpenMetadataJWTClientConfig(
        jwtToken="<token>"
    ),
)
metadata = OpenMetadata(server_config)
assert metadata.health_check()  # Will fail if we cannot reach the server
2. Creating the Database Service
We are mocking a MySQL instance. Note how we need to pass the right configuration class MysqlConnection, as a
parameter for the generic DatabaseConnection type.
from metadata.generated.schema.api.services.createDatabaseService import (
    CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
    MysqlConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
    DatabaseConnection,
    DatabaseService,
    DatabaseServiceType,
)
db_service = CreateDatabaseServiceRequest(
    name="test-service-db-lineage",
    serviceType=DatabaseServiceType.Mysql,
    connection=DatabaseConnection(
        config=MysqlConnection(
            username="username",
            password="password",
            hostPort="http://localhost:1234",
        )
    ),
)
db_service_entity = metadata.create_or_update(data=db_service)
3. Creating the Database
Any Entity that is created and linked to another Entity, has to hold the fullyQualifiedName to the Entity it
relates to. In this case, a Database is bound to a specific service.
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
create_db = CreateDatabaseRequest(
    name="test-db",
    service=db_service_entity.fullyQualifiedName,
)
create_db_entity = metadata.create_or_update(data=create_db)    
4. Creating the Schema
The same happens with the Schemas. They are related to a Database.
from metadata.generated.schema.api.data.createDatabaseSchema import (
    CreateDatabaseSchemaRequest,
)
create_schema = CreateDatabaseSchemaRequest(
    name="test-schema", database=create_db_entity.fullyQualifiedName
)
create_schema_entity = metadata.create_or_update(data=create_schema)
5. Creating the Tables
And finally, Tables are contained in a specific Schema, so we use the fullyQualifiedName here as well.
We are doing a simple example with a single column.
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.table import Column, DataType
table_a = CreateTableRequest(
    name="tableA",
    databaseSchema=create_schema_entity.fullyQualifiedName,
    columns=[Column(name="id", dataType=DataType.BIGINT)],
)
table_b = CreateTableRequest(
    name="tableB",
    databaseSchema=create_schema_entity.fullyQualifiedName,
    columns=[Column(name="id", dataType=DataType.BIGINT)],
)
table_a_entity = metadata.create_or_update(data=table_a)
table_b_entity = metadata.create_or_update(data=table_b)
6. Adding Lineage
With everything prepared, we can now create the Lineage between both Entities. An AddLineageRequest type
represents the edge between two Entities, typed under EntitiesEdge.
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
add_lineage_request = AddLineageRequest(
    description="test lineage",
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=table_a_entity.id, type="table"),
        toEntity=EntityReference(id=table_b_entity.id, type="table"),
    ),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
The Python client will already return us a JSON object with the Lineage information about the fromEntity node
we added:
{
  "entity": {
    "id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
    "type": "table",
    "name": "tableA",
    "fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableA",
    "deleted": false,
    "href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
  },
  "nodes": [
    {
      "id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
      "type": "table",
      "name": "tableB",
      "fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableB",
      "deleted": false,
      "href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
    }
  ],
  "upstreamEdges": [],
  "downstreamEdges": [
    {
      "fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
      "toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e"
    }
  ]
}
If the node were to have other edges already, they would be showing up here.
If we validate the Lineage from the UI, we will see:
{% image src="/images/v1.0.0/sdk/python/ingestion/lineage/simple-lineage.png" alt="simple-lineage" /%}
7. Fetching Lineage
Finally, let's fetch the lineage from the other node involved:
from metadata.generated.schema.entity.data.table import Table
metadata.get_lineage_by_name(
    entity=Table,
    fqn="test-service-db-lineage.test-db.test-schema.tableB",
    # Tune this to control how far in the lineage graph to go
    up_depth=1,
    down_depth=1
)
Which will give us the symmetric results from above
{
  "entity": {
    "id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
    "type": "table",
    "name": "tableB",
    "fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableB",
    "deleted": false,
    "href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
  },
  "nodes": [
    {
      "id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
      "type": "table",
      "name": "tableA",
      "fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableA",
      "deleted": false,
      "href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
    }
  ],
  "upstreamEdges": [
    {
      "fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
      "toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e"
    }
  ],
  "downstreamEdges": []
}
{% note noteType="Tip" %}
You can also get lineage by ID using the get_lineage_by_id method, which accepts entity_id instead of fqn.
{% /note %}
Lineage Details
Note how when adding lineage information we give to the API an AddLineage Request. This is composed of an Entity Edge, whose definition you can find here.
In a nutshell, an Entity Edge has:
- The Entity Reference as the lineage origin,
- The Entity Reference as the lineage destination,
- Optionally, Lineage Details.
In the Lineage Details property we can pass further information specific about Table to Table lineage:
- sqlQueryspecifying the transformation,
- An array of columnsLineageas an object with an array of source and destination columns, as well as their own specific transformation function,
- Optionally, the Entity Reference of a Pipeline powering the transformation from Table A to Table B.
The API call will be exactly the same as before, but now we will add more ingredients when defining our objects. Let's see how to do that and play with the possible combinations:
First, import the required classes and create a new table:
from metadata.generated.schema.type.entityLineage import (
    ColumnLineage,
    EntitiesEdge,
    LineageDetails,
)
# Prepare a new table
table_c = CreateTableRequest(
    name="tableC",
    databaseSchema=create_schema_entity.fullyQualifiedName,
    columns=[Column(name="id", dataType=DataType.BIGINT)],
)
table_c_entity = metadata.create_or_update(data=table_c)
Column Level Lineage
We can start by linking our columns together. For that we are going to create:
- A ColumnLineageobject, linking our Table A column ID -> Table C column ID. Note that this can be a list!
- A LineageDetailsobject, passing the column lineage and the SQL query that powers the transformation.
column_lineage = ColumnLineage(
    fromColumns=["test-service-db-lineage.test-db.test-schema.tableA.id"], 
    toColumn="test-service-db-lineage.test-db.test-schema.tableC.id"
)
lineage_details = LineageDetails(
    sqlQuery="SELECT * FROM AWESOME",
    columnsLineage=[column_lineage],
)
add_lineage_request = AddLineageRequest(
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=table_a_entity.id, type="table"),
        toEntity=EntityReference(id=table_c_entity.id, type="table"),
        lineageDetails=lineage_details,
    ),
)
created_lineage = metadata.add_lineage(data=add_lineage_request)
This information will now be reflected in the UI as well:
{% image src="/images/v1.0.0/sdk/python/ingestion/lineage/lineage-col.png" alt="lineage-col" /%}
Adding a Pipeline Reference
We can as well pass the reference to the pipeline used to create the lineage (e.g., the ETL feeding the tables).
To prepare this example, we need to start by creating the Pipeline Entity. Again, we'll need first to prepare the Pipeline Service:
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.services.createPipelineService import (
    CreatePipelineServiceRequest,
)
from metadata.generated.schema.entity.services.pipelineService import (
    PipelineConnection,
    PipelineService,
    PipelineServiceType,
)
from metadata.generated.schema.entity.services.connections.pipeline.backendConnection import (
    BackendConnection,
)
pipeline_service = CreatePipelineServiceRequest(
    name="test-service-pipeline",
    serviceType=PipelineServiceType.Airflow,
    connection=PipelineConnection(
        config=AirflowConnection(
            hostPort="http://localhost:8080",
            connection=BackendConnection(),
        ),
    ),
)
pipeline_service_entity = metadata.create_or_update(data=pipeline_service)
create_pipeline = CreatePipelineRequest(
    name="test",
    service=pipeline_service_entity.fullyQualifiedName,
)
pipeline_entity = metadata.create_or_update(data=create_pipeline)
With these ingredients ready, we can then follow the code above and add there a pipeline argument
as an Entity Reference:
lineage_details = LineageDetails(
    sqlQuery="SELECT * FROM AWESOME",
    columnsLineage=[column_lineage],
    pipeline=EntityReference(id=pipeline_entity.id, type="pipeline"),
)
{% note %}
The UI currently supports showing the column lineage information. Data about the SQL queries and the Pipeline Entities will be surfaced soon. Thanks!
{% /note %}
