15 KiB
description |
---|
We are now going to present a high-level Python API as a type-safe and gentle wrapper for the OpenMetadata backend. |
Python API
In the Solution Design, we have been dissecting the internals of OpenMetadata. The main conclusion here is twofold:
- Everything is handled via the API, and
- Data structures (Entity definitions) are at the heart of the solution.
This means that whenever we need to interact with the metadata system or develop a new connector or logic, we have to make sure that we pass the proper inputs and handle the types of outputs.
Introducing the Python API
Let's suppose that we have our local OpenMetadata server running at http:localhost:8585
. We can play with it with simple cURL
or httpie
commands, and if we just want to take a look at the Entity instances we have lying around, that might probably be enough.
However, let's imagine that we want to create or update an ML Model Entity with a PUT
. To do so, we need to make sure that we are providing a proper JSON, covering all the attributes and types required by the Entity definition.
By reviewing the JSON Schema for the create operation and the fields definitions of the Entity, we could come up with a rather simple description of a toy ML Model:
{
"name": "my-model",
"description": "sample ML Model",
"algorithm": "regression",
"mlFeatures": [
{
"name": "age",
"dataType": "numerical",
"featureSources": [
{
"name": "age",
"dataType": "integer"
}
]
},
{
"name": "persona",
"dataType": "categorical",
"featureSources": [
{
"name": "age",
"dataType": "integer"
},
{
"name": "education",
"dataType": "string"
}
],
"featureAlgorithm": "PCA"
}
],
"mlHyperParameters": [
{
"name": "regularisation",
"value": "0.5"
}
]
}
If we needed to repeat this process with a full-fledged model that is built ad-hoc and updated during the CICD process, we would just be adding a hardly maintainable, error-prone requirement to our production deployment pipelines.
The same would happen if, inside the actual OpenMetadata code, there was not a way to easily interact with the API and make sure that we send proper data and can safely process the outputs.
Using Generated Sources
As OpenMetadata is a data-centric solution, we need to make sure we have the right ingredients at all times. That is why we have developed a high-level Python API, using pydantic
models automatically generated from the JSON Schemas.
OBS: If you are using a published version of the Ingestion Framework, you are already good to go, as we package the code with the
metadata.generated
module. If you are developing a new feature, you can get more information here.
This API wrapper helps developers and consumers in:
- Validating data during development and with specific error messages at runtime,
- Receiving typed responses to ease further processing.
Thanks to the recursive model setting of pydantic
the example above can be rewritten using only Python classes, and thus being able to get help from IDEs and the Python interpreter. We can rewrite the previous JSON as:
from metadata.generated.schema.api.data.createMlModel import CreateMlModelEntityRequest
from metadata.generated.schema.entity.data.mlmodel import (
FeatureSource,
FeatureSourceDataType,
FeatureType,
MlFeature,
MlHyperParameter,
MlModel,
)
model = CreateMlModelEntityRequest(
name="test-model-properties",
algorithm="algo",
mlFeatures=[
MlFeature(
name="age",
dataType=FeatureType.numerical,
featureSources=[
FeatureSource(
name="age",
dataType=FeatureSourceDataType.integer,
)
],
),
MlFeature(
name="persona",
dataType=FeatureType.categorical,
featureSources=[
FeatureSource(
name="age",
dataType=FeatureSourceDataType.integer,
),
FeatureSource(
name="education",
dataType=FeatureSourceDataType.string,
),
],
featureAlgorithm="PCA",
),
],
mlHyperParameters=[
MlHyperParameter(name="regularisation", value="0.5"),
],
)
One syntax to rule them all
Now that we know how to directly use the pydantic
models, we can start showcasing the solution. This module has been built with two main principles in mind:
- Reusability: We should be able to support existing and new entities with minimum effort,
- Extensibility: However, we are aware that not all Entities are the same. Some of them may require specific functionalities or slight variations (such as
Lineage
orLocation
), so it should be easy to identify those special methods and create new ones when needed.
To this end, we have the main class OpenMetadata
(source) based on Python's TypeVar
. Thanks to this we can exploit the complete power of the pydantic
models, having methods with Type Parameters that know how to respond to each Entity.
At the same time, we have the Mixins (source) module, with special extensions to some Entities.
Walkthrough
Let's use Python's API to create, update and delete a Table
Entity. Choosing the Table
is a nice starter, as its attributes define the following hierarchy:
DatabaseService -> Database -> Table
This will help us showcase how we can reuse the same syntax with the three different Entities.
1. Initialize OpenMetadata
OpenMetadata
is the class holding the connection to the API and handling the requests. We can instantiate this by passing the proper configuration to reach the server API:
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig
server_config = MetadataServerConfig(api_endpoint="http://localhost:8585/api")
metadata = OpenMetadata(server_config)
As this is just using a local development, the MetadataServerConfig
is rather simple. However, in there we would prepare settings such as auth_provider_type
or secret_key
.
From this point onwards, we will interact with the API by using OpenMetadata
methods.
An interesting validation we can already make at this point is verifying that the service is reachable and healthy. To do so, we can validate the Bool
output from:
metadata.health_check() # `True` means we are alright :)
2. Create the DatabaseService
Following the hierarchy, we need to start by defining a DatabaseService
. This will be system hosting our Database
, which will contain the Table
.
Recall how we have mainly two types of models:
- Entity definitions, such as
Table
,MlModel
orTopic
- API definitions, useful when running a
PUT
,POST
orPATCH
request:CreateTable
,CreateMlModel
orCreateTopic
.
As we are just creating Entities right now, we'll stick to the pydantic
models with the API definitions.
Let's imagine that we are defining a MySQL:
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceEntityRequest,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.type.jdbcConnection import JdbcInfo
create_service = CreateDatabaseServiceEntityRequest(
name="test-service-table",
serviceType=DatabaseServiceType.MySQL,
jdbc=JdbcInfo(driverClass="jdbc", connectionUrl="jdbc://localhost"),
)
Note how we can use both String
definitions for the attributes, as well as specific types when possible, such as serviceType=DatabaseServiceType.MySQL
. The less information we need to hardcode, the better.
We can review the information that will be passed to the API by visiting the JSON definition of the class we just instantiated. As all these models are powered by pydantic
, this conversion is transparent to us:
create_service.json()
# '{"name": "test-service-table", "description": null, "serviceType": "MySQL", "jdbc": {"driverClass": "jdbc", "connectionUrl": "jdbc://localhost"}, "ingestionSchedule": null}'
Executing the actual creation is easy! As our create_service
variable already holds the proper datatype, there is a single line to execute:
service_entity = metadata.create_or_update(data=create_service)
Moreover, running a create_or_update
will return us the Entity type, so we can explore its attributes easily:
type(service_entity)
# <class 'metadata.generated.schema.entity.services.databaseService.DatabaseService'
service_entity.json()
# '{"id": "6cfdfed2-66af-44e9-aea8-8add3912270f", "name": "test-service-table", "displayName": null, "serviceType": "MySQL", "description": null, "version": 0.1, "updatedAt": "2021-12-05T16:00:07.621000+00:00", "updatedBy": "anonymous", "href": "http://localhost:8585/api/v1/services/databaseServices/6cfdfed2-66af-44e9-aea8-8add3912270f", "jdbc": {"driverClass": "jdbc", "connectionUrl": "jdbc://localhost"}, "ingestionSchedule": null, "changeDescription": null}'
3. Create the Database
We can now repeat the process to create a Database
Entity. However, if we review the definition of the CreateDatabaseEntityRequest
model...
class CreateDatabaseEntityRequest(BaseModel):
name: database.DatabaseName = Field(
..., description='Name that identifies this database instance uniquely.'
)
description: Optional[str] = Field(
None,
description='Description of the database instance. What it has and how to use it.',
)
owner: Optional[entityReference.EntityReference] = Field(
None, description='Owner of this database'
)
service: entityReference.EntityReference = Field(
..., description='Link to the database service where this database is hosted in'
)
Note how the only non-optional fields are name
and service
. The type of service,
however, is EntityReference
. This is expected, as in there we need to pass the information of an existing Entity. In our case, the DatabaseService
we just created.
Repeating the exercise and reviewing the required fields to instantiate an EntityReference
we notice how we need to pass an id: uuid.UUID
and type: str
. Here we need to specify the id
and type
of our DatabaseService
.
Querying by name
The id
we actually saw it by printing the service_entity
JSON. However, let's imagine that it did not happen, and the only information we have from the DatabaseService
is its name.
To retrieve the id
, we should then ask to the metadata
to find our Entity by its FQDN:
service_query = metadata.get_by_name(entity=DatabaseService, fqdn="test-service-table")
We have just used the get_by_name
method. This method is the same that we will use for any Entity. This is why as an argument, we need to provide the entity
field. Again, instead of relying on error-prone handwritten parameters, we can just pass the pydantic
model we expect to get back. In our case, a DatabaseService
.
Let's now pass the DatabaseService
id to instantiate the EntityReference
. We do not even need to cast it to str
, as the EntityReference
class expects an UUID
as well:
from metadata.generated.schema.api.data.createDatabase import (
CreateDatabaseEntityRequest,
)
from metadata.generated.schema.type.entityReference import EntityReference
create_db = CreateDatabaseEntityRequest(
name="test-db",
service=EntityReference(id=service_entity.id, type="databaseService"),
)
db_entity = metadata.create_or_update(create_db)
4. Create the Table
Now that we have all the preparations ready, we can just reuse the same steps to create the Table
:
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
from metadata.generated.schema.entity.data.table import (
Column,
DataType,
Table,
)
create_table = CreateTableEntityRequest(
name="test",
database=db_entity.id,
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
table_entity = metadata.create_or_update(create_table)
5. Update the Table
Let's now update the Table
by adding an owner. This will require us to create a User
, and then update the Table
with it. Afterwards, we will validate that the information has been properly stored.
First, make sure that no owner has been set during the creation:
print(table_entity.owner)
# None
Now, create a User
:
from metadata.generated.schema.api.teams.createUser import CreateUserEntityRequest
user = metadata.create_or_update(
data=CreateUserEntityRequest(name="random-user", email="random@user.com"),
)
Update our instace of create_table
to add the owner
field (we need to use the Create
class as we'll run a PUT
), and update the Entity:
create_table.owner = EntityReference(id=user.id, type="user")
updated_table_entity = metadata.create_or_update(create_table)
print(updated_table_entity.owner)
# EntityReference(id=Uuid(__root__=UUID('48793f0c-5308-45c1-9bf4-06a82c8d7bf9')), type='user', name='random-user', description=None, displayName=None, href=Href(__root__=AnyUrl('http://localhost:8585/api/v1/users/48793f0c-5308-45c1-9bf4-06a82c8d7bf9', scheme='http', host='localhost', host_type='int_domain', port='8585', path='/api/v1/users/48793f0c-5308-45c1-9bf4-06a82c8d7bf9')))
If we did not save the updated_table_entity
variable and we should need to query it to review the owner
field, we can run the get_by_name
using the proper FQDN definition for Table
s:
my_table = metadata.get_by_name(entity=Table, fqdn="test-service-table.test_db.test")
6. Delete the Table
Finally, we can clean up by running the delete
method:
metadata.delete(entity=Table, entity_id=my_table.id)