Pere Miquel Brull 0d10f85e73
Docs updates for lineage, loggerLevel, metastore and requirements (#7085)
* Python version in requirements

* Add lineage sdk

* Deltalake metastore

* Add loggerLevel
2022-08-31 15:11:11 +02:00

7.5 KiB

title slug
Python SDK for Lineage /sdk/python/ingestion/lineage

Python SDK for Lineage

In this guide, we will use the Python SDK to create and fetch Lineage information.

For simplicity, we are going to create lineage between Tables. However, this would work with ANY entity.

Note that in OpenMetadata, the Lineage information is just a possible relationship between Entities. Other types of relationships for example could be:

  • Contains (a Database contains Schemas, which at the same time contain Tables),
  • or Ownership of any asset.

The point being, any Entity existent in OpenMetadata can be related to any other via Lineage.

In the following sections we will:

  • Create a Database Service, a Database, a Schema and two Tables,
  • Add Lineage between both Tables,
  • Get the Lineage information back.

A prerequisite for this section is to have previously gone through the following docs.

Creating the Entities

To prepare the necessary ingredients, execute the following steps.

All functions that we are going to use related to Lineage can be found in here

1. Preparing the Client

from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata

server_config = OpenMetadataConnection(hostPort="http://localhost:8585/api")
metadata = OpenMetadata(server_config)

assert metadata.health_check()  # Will fail if we cannot reach the server

2. Creating the Database Service

We are mocking a MySQL instance. Note how we need to pass the right configuration class MysqlConnection, as a parameter for the generic DatabaseConnection type.

from metadata.generated.schema.api.services.createDatabaseService import (
    CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
    MysqlConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
    DatabaseConnection,
    DatabaseService,
    DatabaseServiceType,
)

db_service = CreateDatabaseServiceRequest(
    name="test-service-db-lineage",
    serviceType=DatabaseServiceType.Mysql,
    connection=DatabaseConnection(
        config=MysqlConnection(
            username="username",
            password="password",
            hostPort="http://localhost:1234",
        )
    ),
)

db_service_entity = metadata.create_or_update(data=db_service)

3. Creating the Database

Any Entity that is created and linked to another Entity, has to hold the EntityReference to the Entity it relates to. In this case, a Database is bound to a specific service.

from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.type.entityReference import EntityReference

create_db = CreateDatabaseRequest(
    name="test-db",
    service=EntityReference(
        id=db_service_entity.id, type="databaseService"
    ),
)

create_db_entity = metadata.create_or_update(data=create_db)    

4. Creating the Schema

The same happens with the Schemas. They are related to a Database.

from metadata.generated.schema.api.data.createDatabaseSchema import (
    CreateDatabaseSchemaRequest,
)

create_schema = CreateDatabaseSchemaRequest(
    name="test-schema", database=EntityReference(
        id=create_db_entity.id, name="test-db", type="database"
    )
)

create_schema_entity = metadata.create_or_update(data=create_schema)

5. Creating the Tables

And finally, Tables are contained in a specific Schema, so we use the EntityReference here as well.

We are doing a simple example with a single column.

from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.table import Column, DataType

table_a = CreateTableRequest(
    name="tableA",
    databaseSchema=EntityReference(
        id=create_schema_entity.id, name="test-schema", type="databaseSchema"
    ),
    columns=[Column(name="id", dataType=DataType.BIGINT)],
)

table_b = CreateTableRequest(
    name="tableB",
    databaseSchema=EntityReference(
        id=create_schema_entity.id, name="test-schema", type="databaseSchema"
    ),
    columns=[Column(name="id", dataType=DataType.BIGINT)],
)

table_a_entity = metadata.create_or_update(data=table_a)
table_b_entity = metadata.create_or_update(data=table_b)

6. Adding Lineage

With everything prepared, we can now create the Lineage between both Entities. An AddLineageRequest type represents the edge between two Entities, typed under EntitiesEdge.

from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.entityLineage import EntitiesEdge

add_lineage_request = AddLineageRequest(
    description="test lineage",
    edge=EntitiesEdge(
        fromEntity=EntityReference(id=table_a_entity.id, type="table"),
        toEntity=EntityReference(id=table_b_entity.id, type="table"),
    ),
)

created_lineage = metadata.add_lineage(data=add_lineage_request)

The Python client will already return us a JSON object with the Lineage information about the fromEntity node we added:

{
  "entity": {
    "id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
    "type": "table",
    "name": "tableA",
    "fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableA",
    "deleted": false,
    "href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
  },
  "nodes": [
    {
      "id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
      "type": "table",
      "name": "tableB",
      "fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableB",
      "deleted": false,
      "href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
    }
  ],
  "upstreamEdges": [],
  "downstreamEdges": [
    {
      "fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
      "toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e"
    }
  ]
}

If the node were to have other edges already, they would be showing up here.

7. Fetching Lineage

Finally, let's fetch the lineage from the other node involved:

from metadata.generated.schema.entity.data.table import Table

metadata.get_lineage_by_name(
    entity=Table,
    fqn="test-service-db-lineage.test-db.test-schema.tableB",
    # Tune this to control how far in the lineage graph to go
    up_depth=1,
    down_depth=1
)

Which will give us the symmetric results from above

{
  "entity": {
    "id": "800caa0f-a149-48d2-a0ce-6ca84501767e",
    "type": "table",
    "name": "tableB",
    "fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableB",
    "deleted": false,
    "href": "http://localhost:8585/api/v1/tables/800caa0f-a149-48d2-a0ce-6ca84501767e"
  },
  "nodes": [
    {
      "id": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
      "type": "table",
      "name": "tableA",
      "fullyQualifiedName": "test-service-db-lineage.test-db.test-schema.tableA",
      "deleted": false,
      "href": "http://localhost:8585/api/v1/tables/e7bee99b-5c5e-43ec-805c-8beba04804f5"
    }
  ],
  "upstreamEdges": [
    {
      "fromEntity": "e7bee99b-5c5e-43ec-805c-8beba04804f5",
      "toEntity": "800caa0f-a149-48d2-a0ce-6ca84501767e"
    }
  ],
  "downstreamEdges": []
}

You can also get lineage by ID using the get_lineage_by_id method, which accepts entity_id instead of fqn.