Fix #1004: Update Pipeline Ingestion and Indexing to accommodate new API changes (#1005)

This commit is contained in:
Sriharsha Chintalapani 2021-11-01 07:22:01 -07:00 committed by GitHub
parent d393b90bc4
commit 9b7d3e313f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
77 changed files with 289 additions and 98 deletions

View File

@ -4,56 +4,184 @@
"displayName": "Presto ETL",
"description": "Presto ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=presto_etl",
"tasks": ["presto_task", "assert_table_exists"]
"tasks": [
{
"name": "presto_task",
"displayName": "Presto Task",
"description": "Airflow operator to perform ETL on presto tables",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": ["assert_table_exists"],
"taskType": "PrestoOperator"
},
{
"name": "assert_table_exists",
"displayName": "Assert Table Exists",
"description": "Assert if a table exists",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": [],
"taskType": "HiveOperator"
}
]
},
{
"name": "dim_address_etl",
"displayName": "dim_address etl",
"description": "dim_address ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=dim_address_etl",
"tasks": ["dim_address_task", "assert_table_exists"]
"tasks": [{
"name": "dim_address_task",
"displayName": "dim_address Task",
"description": "Airflow operator to perform ETL and generate dim_address table",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task",
"downstreamTasks": ["assert_table_exists"],
"taskType": "PrestoOperator"
},
{
"name": "assert_table_exists",
"displayName": "Assert Table Exists",
"description": "Assert if a table exists",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": [],
"taskType": "HiveOperator"
}
]
},
{
"name": "dim_user_etl",
"displayName": "dim_user etl",
"description": "dim_user ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=dim_user_etl",
"tasks": ["dim_user_task", "assert_table_exists"]
"tasks": [{
"name": "dim_user_task",
"displayName": "dim_user Task",
"description": "Airflow operator to perform ETL and generate dim_user table",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_user_task",
"downstreamTasks": ["assert_table_exists"],
"taskType": "PrestoOperator"
},
{
"name": "assert_table_exists",
"displayName": "Assert Table Exists",
"description": "Assert if a table exists",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": [],
"taskType": "HiveOperator"
}
]
},
{
"name": "dim_location_etl",
"displayName": "dim_location etl",
"description": "diim_location ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=dim_address_etl",
"tasks": ["dim_location_task", "assert_table_exists"]
"tasks": [{
"name": "dim_location_task",
"displayName": "dim_location Task",
"description": "Airflow operator to perform ETL and generate dim_location table",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_location_task",
"downstreamTasks": ["assert_table_exists"],
"taskType": "PrestoOperator"
},
{
"name": "assert_table_exists",
"displayName": "Assert Table Exists",
"description": "Assert if a table exists",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": [],
"taskType": "HiveOperator"
}
]
},
{
"name": "dim_product_etl",
"displayName": "dim_product etl",
"description": "diim_product ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=dim_address_etl",
"tasks": ["dim_product_task", "assert_table_exists"]
"tasks": [{
"name": "dim_product_task",
"displayName": "dim_product Task",
"description": "Airflow operator to perform ETL and generate dim_product table",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_product_task",
"downstreamTasks": ["assert_table_exists"],
"taskType": "PrestoOperator"
},
{
"name": "assert_table_exists",
"displayName": "Assert Table Exists",
"description": "Assert if a table exists",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": [],
"taskType": "HiveOperator"
}
]
},
{
"name": "trino_etl",
"displayName": "Trino ETL",
"description": "Trino ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=trino_etl",
"tasks": ["trino_task", "assert_table_exists"]
"tasks": [{
"name": "trino_task",
"displayName": "Trino Task",
"description": "Airflow operator to perform ETL on trino tables",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": ["assert_table_exists"],
"taskType": "TrinoOperator"
},
{
"name": "assert_table_exists",
"displayName": "Assert Table Exists",
"description": "Assert if a table exists",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": [],
"taskType": "HiveOperator"
}
]
},
{
"name": "hive_etl",
"displayName": "Hive ETL",
"description": "Hive ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=hive_etl",
"tasks": ["hive_create_table", "assert_table_exists"]
"tasks": [{
"name": "hive_create_table",
"displayName": "Hive Create Table",
"description": "Hive Create Table Task",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=hive_create_table",
"downstreamTasks": ["assert_table_exits"],
"taskType": "HiveOperator"
},
{
"name": "assert_table_exists",
"displayName": "Assert Table Exists",
"description": "Assert if a table exists",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": [],
"taskType": "HiveOperator"
}
]
},
{
"name": "snowflake_etl",
"displayName": "Snowflake ETL",
"description": "Snowflake ETL pipeline",
"pipelineUrl": "http://localhost:8080/tree?dag_id=snowflake_etl",
"tasks": ["snowflake_etl", "assert_table_exists"]
"tasks": [{
"name": "snowflake_task",
"displayName": "Snowflake Task",
"description": "Airflow operator to perform ETL on snowflake tables",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": ["assert_table_exists"],
"taskType": "SnowflakeOperator"
},
{
"name": "assert_table_exists",
"displayName": "Assert Table Exists",
"description": "Assert if a table exists",
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
"downstreamTasks": [],
"taskType": "HiveOperator"
}]
}
]
}

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/personalDataTags.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/piiTags.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/tierTags.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: data/tags/userTags.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/catalogVersion.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createChart.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDashboard.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createDatabase.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createModel.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createPipeline.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations
@ -8,6 +8,7 @@ from typing import List, Optional
from pydantic import AnyUrl, BaseModel, Field, constr
from ...entity.data import pipeline
from ...type import basic, entityReference, tagLabel
@ -32,7 +33,7 @@ class CreatePipelineEntityRequest(BaseModel):
startDate: Optional[basic.DateTime] = Field(
None, description='Start date of the workflow'
)
tasks: Optional[List[entityReference.EntityReference]] = Field(
tasks: Optional[List[pipeline.Task]] = Field(
None, description='All the tasks that are part of pipeline.'
)
tags: Optional[List[tagLabel.TagLabel]] = Field(

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTable.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/data/createTopic.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/feed/createThread.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/lineage/addLineage.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDashboardService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createDatabaseService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createMessagingService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/createPipelineService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDashboardService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateDatabaseService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updateMessagingService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/services/updatePipelineService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/setOwner.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTag.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/tags/createTagCategory.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createTeam.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/api/teams/createUser.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/bots.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/chart.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/dashboard.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/database.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/metrics.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/model.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/pipeline.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations
@ -11,6 +11,37 @@ from pydantic import AnyUrl, BaseModel, Field, constr
from ...type import basic, entityHistory, entityReference, tagLabel
class Task(BaseModel):
name: constr(min_length=1, max_length=64) = Field(
..., description='Name that identifies this task instance uniquely.'
)
displayName: Optional[str] = Field(
None,
description='Display Name that identifies this Task. It could be title or label from the pipeline services.',
)
fullyQualifiedName: Optional[constr(min_length=1, max_length=64)] = Field(
None,
description="A unique name that identifies a pipeline in the format 'ServiceName.PipelineName.TaskName'.",
)
description: Optional[str] = Field(None, description='Description of this Task.')
taskUrl: Optional[AnyUrl] = Field(
None,
description='Task URL to visit/manage. This URL points to respective pipeline service UI.',
)
downstreamTasks: Optional[List[constr(min_length=1, max_length=64)]] = Field(
None, description='All the tasks that are downstream of this task.'
)
taskType: Optional[str] = Field(
None, description='Type of the Task. Usually refers to the class it implements.'
)
taskSQL: Optional[basic.SqlQuery] = Field(
None, description='SQL used in the task. Can be used to determine the lineage.'
)
tags: Optional[List[tagLabel.TagLabel]] = Field(
None, description='Tags for this task.'
)
class Pipeline(BaseModel):
id: basic.Uuid = Field(
..., description='Unique identifier that identifies a pipeline instance.'
@ -46,7 +77,7 @@ class Pipeline(BaseModel):
startDate: Optional[basic.DateTime] = Field(
None, description='Start date of the workflow.'
)
tasks: Optional[List[entityReference.EntityReference]] = Field(
tasks: Optional[List[Task]] = Field(
None, description='All the tasks that are part of pipeline.'
)
followers: Optional[entityReference.EntityReferenceList] = Field(

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/report.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/table.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/data/topic.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/feed/thread.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/dashboardService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/databaseService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/messagingService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/services/pipelineService.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/tags/tagCategory.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/team.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/entity/teams/user.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,3 +1,3 @@
# generated by datamodel-codegen:
# filename: json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/auditLog.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/basic.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/collectionDescriptor.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/dailyCount.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -0,0 +1,43 @@
# generated by datamodel-codegen:
# filename: schema/type/entityHistory.json
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations
from typing import List, Optional
from pydantic import BaseModel, Extra, Field, confloat
class EntityVersionHistory(BaseModel):
class Config:
extra = Extra.forbid
entityType: str = Field(
...,
description='Entity type, such as `database`, `table`, `dashboard`, for which this version history is produced.',
)
versions: List
class EntityVersion(BaseModel):
__root__: confloat(ge=0.1, multiple_of=0.1) = Field(
...,
description='Metadata version of the entity in the form `Major.Minor`. First version always starts from `0.1` when the entity is created. When the backward compatible changes are made to the entity, only the `Minor` version is incremented - example `1.0` is changed to `1.1`. When backward incompatible changes are made the `Major` version is incremented - example `1.1` to `2.0`.',
)
class ChangeDescription(BaseModel):
class Config:
extra = Extra.forbid
fieldsAdded: Optional[List[str]] = Field(
None, description='Fields added during the version changes.'
)
fieldsUpdated: Optional[List[str]] = Field(
None, description='Fields modified during the version changes.'
)
fieldsDeleted: Optional[List[str]] = Field(
None, description='Fields deleted during the version changes.'
)
previousVersion: Optional[EntityVersion] = None

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityLineage.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityReference.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/entityUsage.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/jdbcConnection.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/paging.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/profile.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/schedule.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/tagLabel.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -1,6 +1,6 @@
# generated by datamodel-codegen:
# filename: schema/type/usageDetails.json
# timestamp: 2021-10-31T08:53:25+00:00
# timestamp: 2021-10-31T21:55:34+00:00
from __future__ import annotations

View File

@ -359,14 +359,14 @@ class ElasticsearchSink(Sink):
tier = pipeline_tag.tagFQN
else:
tags.add(pipeline_tag.tagFQN)
tasks: List[Task] = self._get_tasks(pipeline.tasks)
tasks: List[Task] = pipeline.tasks
task_names = []
task_descriptions = []
for task in tasks:
task_names.append(task.displayName)
if task.description is not None:
task_descriptions.append(task.description)
if len(task.tags) > 0:
if tags in task and len(task.tags) > 0:
for col_tag in task.tags:
tags.add(col_tag.tagFQN)

View File

@ -352,7 +352,6 @@ class SampleDataSource(Source):
yield from self.ingest_topics()
yield from self.ingest_charts()
yield from self.ingest_dashboards()
yield from self.ingest_tasks()
yield from self.ingest_pipelines()
yield from self.ingest_lineage()
yield from self.ingest_users()
@ -436,25 +435,14 @@ class SampleDataSource(Source):
yield task_ev
def ingest_pipelines(self) -> Iterable[Dashboard]:
tasks = self.client.list_tasks("service")
task_dict = {}
for task in tasks:
task_dict[task.name] = task
for pipeline in self.pipelines["pipelines"]:
task_refs = []
for task in pipeline["tasks"]:
if task in task_dict:
task_refs.append(
EntityReference(id=task_dict[task].id, type="task")
)
pipeline_ev = Pipeline(
id=uuid.uuid4(),
name=pipeline["name"],
displayName=pipeline["displayName"],
description=pipeline["description"],
pipelineUrl=pipeline["pipelineUrl"],
tasks=task_refs,
tasks=pipeline["tasks"],
service=EntityReference(
id=self.pipeline_service.id, type="pipelineService"
),