Ingestion: Airflow integration to ingest metadata about pipelines and tasks (#609)

* [WIP] Airlfow integration

* [WIP] Airlfow integration

* [WIP] airflow integration

* [WIP] Airflow

* [WIP] Airflow

* Fix #608: Ingestion: Airflow integration to ingest metadata about pipelines and tasks

* Fix #608: Ingestion: Airflow integration to ingest metadata about pipelines and tasks

* Update DashboardServiceResource.java

Co-authored-by: Ayush Shah <ayush@getcollate.io>
This commit is contained in:
Sriharsha Chintalapani 2021-09-29 11:32:09 -07:00 committed by GitHub
parent a275b1fee3
commit bfec0bfbed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
84 changed files with 410 additions and 119 deletions

View File

@ -46,6 +46,8 @@ import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.openmetadata.catalog.exception.CatalogExceptionMessage.entityNotFound;
@ -162,10 +164,19 @@ public abstract class PipelineRepository {
// Service can't be changed in update since service name is part of FQN and
// change to a different service will result in a different FQN and creation of a new database under the new service
storedPipeline.setService(service);
storedPipeline.setTasks(updatedPipeline.getTasks());
updateTaskRelationships(storedPipeline);
//Airflow lineage backend gets executed per task in a dag. This means we will not a get full picture of the pipeline
// in each call. Hence we may create a pipeline and add a single task when one task finishes in a pipleine
// in the next task run we may have to update. To take care of this we will merge the tasks
List<EntityReference> storedTasks = storedPipeline.getTasks();
if (updatedPipeline.getTasks() != null) {
List<EntityReference> updatedTasks = Stream.concat(storedPipeline.getTasks().stream(),
updatedPipeline.getTasks().stream()).collect(Collectors.toList());
storedPipeline.setTasks(updatedTasks);
}
storedPipeline.setService(service);
updateTaskRelationships(storedPipeline);
return new PutResponse<>(Response.Status.OK, storedPipeline);
}

View File

@ -274,6 +274,7 @@ public class TaskResource {
.withDescription(create.getDescription())
.withService(create.getService())
.withTaskUrl(create.getTaskUrl())
.withDownstreamTasks(create.getDownstreamTasks())
.withTaskConfig(create.getTaskConfig())
.withTags(create.getTags())
.withOwner(create.getOwner());

View File

@ -1,5 +1,5 @@
{
"$id": "https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/api/data/createPipeline.json",
"$id": "https://open-metadata.org/schema/api/data/createPipeline.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Create Pipeline entity request",
"description": "Create Pipeline entity request",

View File

@ -24,19 +24,13 @@
"type": "string",
"format": "uri"
},
"upstreamTasks": {
"description": "All the tasks that are upstream of this task.",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
},
"default": null
},
"downstreamTasks": {
"description": "All the tasks that are downstream of this task.",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
"type": "string",
"minLength": 1,
"maxLength": 64
},
"default": null
},

View File

@ -56,19 +56,13 @@
"type": "string",
"format": "uri"
},
"upstreamTasks": {
"description": "All the tasks that are upstream of this task.",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
},
"default": null
},
"downstreamTasks": {
"description": "All the tasks that are downstream of this task.",
"type": "array",
"items": {
"$ref": "../../type/entityReference.json"
"type": "string",
"minLength": 1,
"maxLength": 64
},
"default": null
},

View File

@ -1,7 +1,7 @@
{
"$id": "https://open-metadata.org/schema/entity/services/messagingService.json",
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Messaging Service",
"title": "Pipeline Service",
"description": "This schema defines the Pipeline Service entity, such as Airflow and Prefect.",
"type": "object",
"definitions": {

View File

@ -14,16 +14,16 @@
# limitations under the License.
import pathlib
from datetime import timedelta
from airflow import DAG
from airflow_provider_openmetadata import DAG
try:
from airflow.operators.python import PythonOperator
from airflow_provider_openmetadata.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from airflow_provider_openmetadata.operators.python_operator import PythonOperator
from metadata.config.common import load_config_file
from metadata.ingestion.api.workflow import Workflow
from airflow.utils.dates import days_ago
from airflow_provider_openmetadata.utils.dates import days_ago
default_args = {
"owner": "user_name",

View File

@ -6,15 +6,12 @@ python-dateutil~=2.8.1
SQLAlchemy~=1.4.5
pandas~=1.2.4
Faker~=8.1.1
elasticsearch~=7.12.0
spacy~=3.0.5
commonregex~=1.5.4
setuptools~=57.0.0
PyHive~=0.6.4
ldap3~=2.9.1
confluent_kafka>=1.5.0
fastavro>=1.2.0
google~=3.0.0
okta~=2.0.0
PyMySQL~=1.0.2
great-expectations>=0.13.31

View File

@ -59,10 +59,10 @@ base_requirements = {
"email-validator>=1.0.3",
"wheel~=0.36.2",
"python-jose==3.3.0",
"okta>=1.7.0",
"sqlalchemy>=1.3.24",
"sql-metadata~=2.0.0",
"requests~=2.25.1"
"requests~=2.26",
"PyYAML"
}
pii_requirements = {
"en_core_web_sm@https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0.tar.gz#egg=en_core_web",
@ -110,7 +110,8 @@ plugins: Dict[str, Set[str]] = {
"superset": {},
"tableau": {"tableau-api-lib==0.1.22"},
"vertica": {"sqlalchemy-vertica[vertica-python]>=0.0.5"},
"report-server": report_requirements
"report-server": report_requirements,
"airflow": {"apache-airflow >= 1.10.2"}
}
build_options = {"includes": ["_cffi_backend"]}
@ -137,6 +138,7 @@ setup(
packages=find_namespace_packages(where='./src', exclude=['tests*']),
entry_points={
"console_scripts": ["metadata = metadata.cmd:metadata"],
"apache_airflow_provider": ["provider_info = airflow_provider_openmetadata:get_provider_config"],
},
install_requires=list(base_requirements),
extras_require={

View File

@ -0,0 +1,10 @@
import metadata
def get_provider_config():
return {
"name": "OpenMetadata",
"description": "OpenMetadata <https://open-metadata.org/>",
"package-name": "openmetadata-ingestion",
"version": "0.0.1"
}

View File

@ -0,0 +1,194 @@
import json
import traceback
from typing import TYPE_CHECKING, Dict, List, Optional
import dateutil
from airflow.configuration import conf
from airflow.lineage.backend import LineageBackend
from metadata.generated.schema.api.data.createPipeline import CreatePipelineEntityRequest
from metadata.generated.schema.api.data.createTask import CreateTaskEntityRequest
from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceEntityRequest
from metadata.generated.schema.entity.services.pipelineService import PipelineServiceType
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig, OpenMetadataAPIClient
if TYPE_CHECKING:
from airflow import DAG
from airflow.models.baseoperator import BaseOperator
from metadata.config.common import ConfigModel
class OpenMetadataLineageConfig(ConfigModel):
airflow_service_name: str = "airflow"
api_endpoint: str = "http://localhost:8585"
auth_provider_type: str = "no-auth"
secret_key: str = None
def get_lineage_config() -> OpenMetadataLineageConfig:
"""Load the lineage config from airflow_provider_openmetadata.cfg."""
airflow_service_name = conf.get("lineage", "airflow_service_name", fallback="airflow")
api_endpoint = conf.get("lineage", "openmetadata_api_endpoint", fallback="http://localhost:8585")
auth_provider_type = conf.get("lineage", "auth_provider_type", fallback="no-auth")
secret_key = conf.get("lineage", "secret_key", fallback=None)
return OpenMetadataLineageConfig.parse_obj({'airflow_service_name': airflow_service_name,
'api_endpoint': api_endpoint,
'auth_provider_type': auth_provider_type,
'secret_key': secret_key})
allowed_task_keys = [
"_downstream_task_ids",
"_inlets",
"_outlets",
"_task_type",
"_task_module",
"depends_on_past",
"email",
"label",
"execution_timeout",
"end_date",
"start_date",
"sla",
"sql",
"task_id",
"trigger_rule",
"wait_for_downstream",
]
allowed_flow_keys = [
"_access_control",
"_concurrency",
"_default_view",
"catchup",
"fileloc",
"is_paused_upon_creation",
"start_date",
"tags",
"timezone",
]
def parse_lineage_to_openmetadata(config: OpenMetadataLineageConfig,
context: Dict,
operator: "BaseOperator",
inlets: List,
outlets: List,
client: OpenMetadataAPIClient):
from airflow.serialization.serialized_objects import (
SerializedBaseOperator,
SerializedDAG,
)
import ast
operator.log.info("Parsing Lineage for OpenMetadata")
dag: "DAG" = context["dag"]
task: BaseOperator = context["task"]
pipeline_service_url = conf.get("webserver", "base_url")
dag_url = f"{pipeline_service_url}/tree?dag_id={dag.dag_id}"
task_url = f"{pipeline_service_url}/taskinstance/list/?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={task.task_id}"
dag_properties: Dict[str, str] = {
key: repr(value) for (key, value) in SerializedDAG.serialize_dag(dag).items()
}
for key in dag.get_serialized_fields():
if key not in dag_properties:
dag_properties[key] = repr(getattr(dag, key))
task_properties: Dict[str, str] = {
key: repr(value)
for (key, value) in SerializedBaseOperator.serialize_operator(task).items()
}
for key in task.get_serialized_fields():
if key not in task_properties:
task_properties[key] = repr(getattr(task, key))
task_properties = {
k: v for (k, v) in task_properties.items() if k in allowed_task_keys
}
dag_properties = {
k: v for (k, v) in dag_properties.items() if k in allowed_flow_keys
}
timestamp = int(dateutil.parser.parse(context["ts"]).timestamp() * 1000)
owner = dag.owner
tags = dag.tags
airflow_service_entity = None
operator.log.info("Get Airflow Service ID")
airflow_service_entity = client.get_pipeline_service(config.airflow_service_name)
if airflow_service_entity is None:
pipeline_service = CreatePipelineServiceEntityRequest(
name=config.airflow_service_name,
serviceType=PipelineServiceType.Airflow,
pipelineUrl=pipeline_service_url
)
airflow_service_entity = client.create_pipeline_service(pipeline_service)
operator.log.info("airflow service is created {}", airflow_service_entity)
operator.log.info(task_properties)
operator.log.info(dag_properties)
downstream_tasks = []
if '_downstream_task_ids' in task_properties:
downstream_tasks = ast.literal_eval(task_properties['_downstream_task_ids'])
operator.log.info("downstream tasks {}".format(downstream_tasks))
create_task = CreateTaskEntityRequest(
name=task_properties['task_id'],
displayName=task_properties['label'],
taskUrl=task_url,
upstreamTasks=None,
downstreamTasks=downstream_tasks,
service=EntityReference(id=airflow_service_entity.id, type='pipelineService')
)
task = client.create_or_update_task(create_task)
operator.log.info("Created Task {}".format(task))
operator.log.info("Dag {}".format(dag))
create_pipeline = CreatePipelineEntityRequest(
name=dag.dag_id,
displayName=dag.dag_id,
description=dag.description,
pipelineUrl=dag_url,
tasks=[EntityReference(id=task.id, type='task')],
service=EntityReference(id=airflow_service_entity.id, type='pipelineService')
)
pipeline = client.create_or_update_pipeline(create_pipeline)
operator.log.info("Create Pipeline {}".format(pipeline))
class OpenMetadataLineageBackend(LineageBackend):
"""
Sends lineage data from tasks to OpenMetadata.
Configurable via ``airflow_provider_openmetadata.cfg`` as follows: ::
[lineage]
backend = airflow_provider_openmetadata.lineage.OpenMetadataLineageBackend
airflow_service_name = airflow #make sure this service_name matches the one configured in openMetadata
openmetadata_api_endpoint = http://localhost:8585
auth_provider_type = no-auth # use google here if you are configuring google as SSO
secret_key = google-client-secret-key # it needs to be configured only if you are using google as SSO
"""
def __init__(self) -> None:
super().__init__()
_ = get_lineage_config()
@staticmethod
def send_lineage(
operator: "BaseOperator",
inlets: Optional[List] = None,
outlets: Optional[List] = None,
context: Dict = None,
) -> None:
config = get_lineage_config()
metadata_config = MetadataServerConfig.parse_obj(
{
'api_endpoint': config.api_endpoint,
'auth_provider_type': config.auth_provider_type,
'secret_key': config.secret_key
}
)
client = OpenMetadataAPIClient(metadata_config)
try:
parse_lineage_to_openmetadata(
config, context, operator, operator.inlets, operator.outlets, client
)
except Exception as e:
operator.log.error(e)
operator.log.error(traceback.print_exc())

View File

@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
---
package-name: apache-airflow-providers-openmetadata
name: OpenMetadata
description: |
`OpenMetadata <https://open-metadata.org/>`__
versions:
- 0.0.1
additional-dependencies:
- apache-airflow>=1.1.10
integrations:
- integration-name: OpenMetadata
external-doc-url: https://open-metadata.org
tags: [service]

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/personalDataTags.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/piiTags.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/tierTags.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/userTags.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/catalogVersion.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createChart.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDashboard.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDatabase.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createPipeline.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTable.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTask.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations
@ -28,10 +28,7 @@ class CreateTaskEntityRequest(BaseModel):
None,
description='Task URL to visit/manage. This URL points to respective pipeline service UI',
)
upstreamTasks: Optional[List[entityReference.EntityReference]] = Field(
None, description='All the tasks that are upstream of this task.'
)
downstreamTasks: Optional[List[entityReference.EntityReference]] = Field(
downstreamTasks: Optional[List[constr(min_length=1, max_length=64)]] = Field(
None, description='All the tasks that are downstream of this task.'
)
taskConfig: Optional[task.TaskConfig] = Field(

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTopic.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/feed/createThread.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDashboardService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDatabaseService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createMessagingService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createPipelineService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDashboardService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDatabaseService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateMessagingService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updatePipelineService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/setOwner.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTag.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTagCategory.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createTeam.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createUser.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/bots.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/chart.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/dashboard.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/database.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/metrics.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/pipeline.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/report.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/table.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/task.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations
@ -37,10 +37,7 @@ class Task(BaseModel):
None,
description='Task URL to visit/manage. This URL points to respective pipeline service UI',
)
upstreamTasks: Optional[List[entityReference.EntityReference]] = Field(
None, description='All the tasks that are upstream of this task.'
)
downstreamTasks: Optional[List[entityReference.EntityReference]] = Field(
downstreamTasks: Optional[List[constr(min_length=1, max_length=64)]] = Field(
None, description='All the tasks that are downstream of this task.'
)
taskConfig: Optional[TaskConfig] = Field(None, description='Task Configuration.')

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/topic.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/feed/thread.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/dashboardService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/databaseService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/messagingService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/pipelineService.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations
@ -17,7 +17,7 @@ class PipelineServiceType(Enum):
Prefect = 'Prefect'
class MessagingService(BaseModel):
class PipelineService(BaseModel):
id: basic.Uuid = Field(
..., description='Unique identifier of this pipeline service instance.'
)

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/tags/tagCategory.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/team.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/user.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/auditLog.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/basic.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/collectionDescriptor.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/dailyCount.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityReference.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityUsage.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/jdbcConnection.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/paging.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/profile.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/schedule.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/tagLabel.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/usageDetails.json
# timestamp: 2021-09-27T15:46:37+00:00
# timestamp: 2021-09-28T21:56:25+00:00
from __future__ import annotations

View File

@ -22,24 +22,29 @@ from metadata.config.common import ConfigModel
from metadata.generated.schema.api.data.createChart import CreateChartEntityRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardEntityRequest
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest
from metadata.generated.schema.api.data.createPipeline import CreatePipelineEntityRequest
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
from metadata.generated.schema.api.data.createTask import CreateTaskEntityRequest
from metadata.generated.schema.api.data.createTopic import CreateTopic
from metadata.generated.schema.api.services.createDashboardService import CreateDashboardServiceEntityRequest
from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceEntityRequest
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.table import Table, TableData, TableJoins, TableProfile
from metadata.generated.schema.entity.data.task import Task
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.tags.tagCategory import Tag
from metadata.ingestion.models.table_queries import TableUsageRequest
from metadata.ingestion.ometa.auth_provider import AuthenticationProvider
from metadata.ingestion.ometa.client import REST, ClientConfig
from metadata.ingestion.ometa.client import REST, ClientConfig, APIError
import google.auth
import google.auth.transport.requests
@ -57,6 +62,8 @@ DatabaseServiceEntities = List[DatabaseService]
DatabaseEntities = List[Database]
Tags = List[Tag]
TableProfiles = List[TableProfile]
Pipelines = List[Pipeline]
class TableEntities(BaseModel):
@ -197,8 +204,11 @@ class OpenMetadataAPIClient(object):
def get_database_service(self, service_name: str) -> DatabaseService:
"""Get the Database service"""
resp = self.client.get('/services/databaseServices?name={}'.format(service_name))
return DatabaseService(**resp['data'][0]) if len(resp['data']) > 0 else None
try:
resp = self.client.get('/services/databaseServices?name={}'.format(service_name))
return DatabaseService(**resp)
except APIError as err:
return None
def get_database_service_by_id(self, service_id: str) -> DatabaseService:
"""Get the Database Service by ID"""
@ -322,8 +332,11 @@ class OpenMetadataAPIClient(object):
def get_messaging_service(self, service_name: str) -> MessagingService:
"""Get the Messaging service"""
resp = self.client.get('/services/messagingServices?name={}'.format(service_name))
return MessagingService(**resp['data'][0]) if len(resp['data']) > 0 else None
try:
resp = self.client.get('/services/messagingServices/name/{}'.format(service_name))
return DashboardService(**resp)
except APIError as err:
return None
def get_messaging_service_by_id(self, service_id: str) -> MessagingService:
"""Get the Messaging Service by ID"""
@ -361,8 +374,11 @@ class OpenMetadataAPIClient(object):
def get_dashboard_service(self, service_name: str) -> DashboardService:
"""Get the Dashboard service"""
resp = self.client.get('/services/dashboardServices?name={}'.format(service_name))
return DashboardService(**resp['data'][0]) if len(resp['data']) > 0 else None
try:
resp = self.client.get('/services/dashboardServices/name/{}'.format(service_name))
return DashboardService(**resp)
except APIError as err:
return None
def get_dashboard_service_by_id(self, service_id: str) -> DashboardService:
"""Get the Dashboard Service by ID"""
@ -410,5 +426,51 @@ class OpenMetadataAPIClient(object):
after = resp['paging']['after'] if 'after' in resp['paging'] else None
return DashboardEntities(dashboards=dashboards, total=total, after=after)
def get_pipeline_service(self, service_name: str) -> PipelineService:
"""Get the Pipeline service"""
try:
resp = self.client.get('/services/pipelineServices/name/{}'.format(service_name))
return PipelineService(**resp)
except APIError as err:
return None
def get_pipeline_service_by_id(self, service_id: str) -> PipelineService:
"""Get the Pipeline Service by ID"""
resp = self.client.get('/services/pipelineServices/{}'.format(service_id))
return PipelineService(**resp)
def create_pipeline_service(self,
pipeline_service: CreatePipelineServiceEntityRequest) -> PipelineService:
"""Create a new Pipeline Service"""
resp = self.client.post('/services/pipelineServices', data=pipeline_service.json())
return PipelineService(**resp)
def create_or_update_task(self, create_task_request: CreateTaskEntityRequest) -> Task:
"""Create or Update a Task """
resp = self.client.put('/tasks', data=create_task_request.json())
return Task(**resp)
def get_task_by_id(self, chart_id: str, fields: [] = ['tags, service']) -> Task:
"""Get Task By ID"""
params = {'fields': ",".join(fields)}
resp = self.client.get('/tasks/{}'.format(chart_id), data=params)
return Task(**resp)
def create_or_update_pipeline(self, create_pipeline_request: CreatePipelineEntityRequest) -> Pipeline:
"""Create or Update a Pipeline """
resp = self.client.put('/pipelines', data=create_pipeline_request.json())
return Pipeline(**resp)
def list_pipelines(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Pipelines:
""" List all pipelines"""
if fields is None:
resp = self.client.get('/pipelines')
else:
resp = self.client.get('/pipelines?fields={}&offset={}&limit={}'.format(fields, offset, limit))
if self._use_raw_data:
return resp
else:
return [Pipeline(**t) for t in resp['data']]
def close(self):
self.client.close()