mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-08 05:26:19 +00:00
This commit is contained in:
parent
824d7cbcdf
commit
37796c655b
@ -120,6 +120,8 @@ public class SearchResource {
|
||||
searchSourceBuilder = buildTopicSearchBuilder(query, from, size);
|
||||
} else if (index.equals("dashboard_search_index")) {
|
||||
searchSourceBuilder = buildDashboardSearchBuilder(query, from, size);
|
||||
} else if (index.equals("pipeline_search_index")) {
|
||||
searchSourceBuilder = buildPipelineSearchBuilder(query, from, size);
|
||||
} else {
|
||||
searchSourceBuilder = buildTableSearchBuilder(query, from, size);
|
||||
}
|
||||
@ -251,4 +253,32 @@ public class SearchResource {
|
||||
|
||||
return searchSourceBuilder;
|
||||
}
|
||||
|
||||
private SearchSourceBuilder buildPipelineSearchBuilder(String query, int from, int size) {
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
HighlightBuilder.Field highlightTableName =
|
||||
new HighlightBuilder.Field("pipeline_name");
|
||||
highlightTableName.highlighterType("unified");
|
||||
HighlightBuilder.Field highlightDescription =
|
||||
new HighlightBuilder.Field("description");
|
||||
highlightDescription.highlighterType("unified");
|
||||
HighlightBuilder hb = new HighlightBuilder();
|
||||
hb.field(highlightDescription);
|
||||
hb.field(highlightTableName);
|
||||
hb.preTags("<span class=\"text-highlighter\">");
|
||||
hb.postTags("</span>");
|
||||
searchSourceBuilder.query(QueryBuilders.queryStringQuery(query)
|
||||
.field("pipeline_name", 5.0f)
|
||||
.field("description")
|
||||
.field("task_names")
|
||||
.field("task_descriptions")
|
||||
.lenient(true))
|
||||
.aggregation(AggregationBuilders.terms("Service").field("service_type"))
|
||||
.aggregation(AggregationBuilders.terms("Tier").field("tier"))
|
||||
.aggregation(AggregationBuilders.terms("Tags").field("tags"))
|
||||
.highlighter(hb)
|
||||
.from(from).size(size);
|
||||
|
||||
return searchSourceBuilder;
|
||||
}
|
||||
}
|
||||
|
||||
24
ingestion/examples/sample_data/pipelines/pipelines.json
Normal file
24
ingestion/examples/sample_data/pipelines/pipelines.json
Normal file
@ -0,0 +1,24 @@
|
||||
{
|
||||
"pipelines": [{
|
||||
"name": "presto_etl",
|
||||
"displayName": "Presto ETL",
|
||||
"description": "Presto ETL pipeline",
|
||||
"pipelineUrl": "http://localhost:8080/tree?dag_id=presto_etl",
|
||||
"tasks": ["presto_task", "assert_table_exists"]
|
||||
},
|
||||
{
|
||||
"name": "hive_etl",
|
||||
"displayName": "Hive ETL",
|
||||
"description": "Hive ETL pipeline",
|
||||
"pipelineUrl": "http://localhost:8080/tree?dag_id=hive_etl",
|
||||
"tasks": ["hive_create_table", "assert_table_exists"]
|
||||
},
|
||||
{
|
||||
"name": "snowflake_etl",
|
||||
"displayName": "Snowflake ETL",
|
||||
"description": "Snowflake ETL pipeline",
|
||||
"pipelineUrl": "http://localhost:8080/tree?dag_id=snowflake_etl",
|
||||
"tasks": ["snowflake_etl", "assert_table_exists"]
|
||||
}
|
||||
]
|
||||
}
|
||||
8
ingestion/examples/sample_data/pipelines/service.json
Normal file
8
ingestion/examples/sample_data/pipelines/service.json
Normal file
@ -0,0 +1,8 @@
|
||||
{
|
||||
"id": "a6fb4f54-ba3d-4a16-97f0-766713199189",
|
||||
"name": "sample_airflow",
|
||||
"serviceType": "Airflow",
|
||||
"description": "Airflow service",
|
||||
"href": "null",
|
||||
"pipelineUrl": "http://localhost:8080"
|
||||
}
|
||||
35
ingestion/examples/sample_data/pipelines/tasks.json
Normal file
35
ingestion/examples/sample_data/pipelines/tasks.json
Normal file
@ -0,0 +1,35 @@
|
||||
{
|
||||
"tasks": [{
|
||||
"name": "hive_create_table",
|
||||
"displayName": "Hive Create Table",
|
||||
"description": "Hive Create Table Task",
|
||||
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=hive_create_table",
|
||||
"downstreamTasks": ["assert_table_exits"],
|
||||
"taskType": "HiveOperator"
|
||||
},
|
||||
{
|
||||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
},
|
||||
{
|
||||
"name": "snowflake_task",
|
||||
"displayName": "Snowflake Task",
|
||||
"description": "Airflow operator to perform ETL on snowflake tables",
|
||||
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "SnowflakeOperator"
|
||||
},
|
||||
{
|
||||
"name": "presto_task",
|
||||
"displayName": "Presto Task",
|
||||
"description": "Airflow operator to perform ETL on presto tables",
|
||||
"taskUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
}
|
||||
]
|
||||
}
|
||||
@ -233,6 +233,24 @@ class DashboardESDocument(BaseModel):
|
||||
daily_percentile_rank: int
|
||||
|
||||
|
||||
class PipelineESDocument(BaseModel):
|
||||
""" Elastic Search Mapping doc for Pipelines """
|
||||
pipeline_id: str
|
||||
service: str
|
||||
service_type: str
|
||||
pipeline_name: str
|
||||
suggest: List[dict]
|
||||
description: Optional[str] = None
|
||||
last_updated_timestamp: Optional[int]
|
||||
task_names: List[str]
|
||||
task_descriptions: List[str]
|
||||
tags: List[str]
|
||||
fqdn: str
|
||||
tier: Optional[str] = None
|
||||
owner: str
|
||||
followers: List[str]
|
||||
|
||||
|
||||
class DashboardOwner(BaseModel):
|
||||
"""Dashboard owner"""
|
||||
username: str
|
||||
@ -309,5 +327,3 @@ class DatasetProfile(BaseModel):
|
||||
row_count: int = None
|
||||
col_count: int = None
|
||||
col_profiles: List[DatasetColumnProfile] = None
|
||||
|
||||
|
||||
|
||||
@ -57,12 +57,12 @@ import json
|
||||
from jose import jwt
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DatabaseServiceEntities = List[DatabaseService]
|
||||
DatabaseEntities = List[Database]
|
||||
Tags = List[Tag]
|
||||
TableProfiles = List[TableProfile]
|
||||
Pipelines = List[Pipeline]
|
||||
|
||||
Tasks = List[Task]
|
||||
|
||||
|
||||
class TableEntities(BaseModel):
|
||||
@ -88,6 +88,7 @@ class PipelineEntities(BaseModel):
|
||||
total: int
|
||||
after: str = None
|
||||
|
||||
|
||||
class MetadataServerConfig(ConfigModel):
|
||||
api_endpoint: str
|
||||
api_version: str = 'v1'
|
||||
@ -263,7 +264,7 @@ class OpenMetadataAPIClient(object):
|
||||
""" Delete Database using ID """
|
||||
self.client.delete('/databases/{}'.format(database_id))
|
||||
|
||||
def list_tables(self, fields: str = None, after: str = None, limit: int = 1000000) -> TableEntities:
|
||||
def list_tables(self, fields: str = None, after: str = None, limit: int = 1000) -> TableEntities:
|
||||
""" List all tables"""
|
||||
|
||||
if fields is None:
|
||||
@ -353,7 +354,7 @@ class OpenMetadataAPIClient(object):
|
||||
resp = self.client.put('/topics', data=create_topic_request.json())
|
||||
return Topic(**resp)
|
||||
|
||||
def list_topics(self, fields: str = None, after: str = None, limit: int = 1000000) -> TopicEntities:
|
||||
def list_topics(self, fields: str = None, after: str = None, limit: int = 1000) -> TopicEntities:
|
||||
""" List all topics"""
|
||||
if fields is None:
|
||||
resp = self.client.get('/tables')
|
||||
@ -409,7 +410,7 @@ class OpenMetadataAPIClient(object):
|
||||
resp = self.client.put('/dashboards', data=create_dashboard_request.json())
|
||||
return Dashboard(**resp)
|
||||
|
||||
def list_dashboards(self, fields: str = None, after: str = None, limit: int = 1000000) -> DashboardEntities:
|
||||
def list_dashboards(self, fields: str = None, after: str = None, limit: int = 1000) -> DashboardEntities:
|
||||
""" List all dashboards"""
|
||||
|
||||
if fields is None:
|
||||
@ -442,40 +443,58 @@ class OpenMetadataAPIClient(object):
|
||||
return PipelineService(**resp)
|
||||
|
||||
def create_pipeline_service(self,
|
||||
pipeline_service: CreatePipelineServiceEntityRequest) -> PipelineService:
|
||||
pipeline_service: CreatePipelineServiceEntityRequest) -> PipelineService:
|
||||
"""Create a new Pipeline Service"""
|
||||
try:
|
||||
resp = self.client.post('/services/pipelineServices', data=pipeline_service.json())
|
||||
return PipelineService(**resp)
|
||||
except APIError as err:
|
||||
return None
|
||||
return None
|
||||
|
||||
def create_or_update_task(self, create_task_request: CreateTaskEntityRequest) -> Task:
|
||||
"""Create or Update a Task """
|
||||
resp = self.client.put('/tasks', data=create_task_request.json())
|
||||
return Task(**resp)
|
||||
|
||||
def get_task_by_id(self, chart_id: str, fields: [] = ['tags, service']) -> Task:
|
||||
def get_task_by_id(self, task_id: str, fields: [] = ['tags, service']) -> Task:
|
||||
"""Get Task By ID"""
|
||||
params = {'fields': ",".join(fields)}
|
||||
resp = self.client.get('/tasks/{}'.format(chart_id), data=params)
|
||||
resp = self.client.get('/tasks/{}'.format(task_id), data=params)
|
||||
return Task(**resp)
|
||||
|
||||
def list_tasks(self, fields: str = None, offset: int = 0, limit: int = 1000) -> Tasks:
|
||||
""" List all tasks"""
|
||||
if fields is None:
|
||||
resp = self.client.get('/tasks?offset={}&limit={}'.format(offset, limit))
|
||||
else:
|
||||
resp = self.client.get('/tasks?fields={}&offset={}&limit={}'.format(fields, offset, limit))
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [Task(**t) for t in resp['data']]
|
||||
|
||||
def create_or_update_pipeline(self, create_pipeline_request: CreatePipelineEntityRequest) -> Pipeline:
|
||||
"""Create or Update a Pipeline """
|
||||
resp = self.client.put('/pipelines', data=create_pipeline_request.json())
|
||||
return Pipeline(**resp)
|
||||
|
||||
def list_pipelines(self, fields: str = None, offset: int = 0, limit: int = 1000000) -> Pipelines:
|
||||
def list_pipelines(self, fields: str = None, after: str = None, limit: int = 1000) -> PipelineEntities:
|
||||
""" List all pipelines"""
|
||||
if fields is None:
|
||||
resp = self.client.get('/pipelines')
|
||||
else:
|
||||
resp = self.client.get('/pipelines?fields={}&offset={}&limit={}'.format(fields, offset, limit))
|
||||
if after is not None:
|
||||
resp = self.client.get('/pipelines?fields={}&after={}&limit={}'.format(fields, after, limit))
|
||||
else:
|
||||
resp = self.client.get('/pipelines?fields={}&limit={}'.format(fields, limit))
|
||||
|
||||
if self._use_raw_data:
|
||||
return resp
|
||||
else:
|
||||
return [Pipeline(**t) for t in resp['data']]
|
||||
pipelines = [Pipeline(**t) for t in resp['data']]
|
||||
total = resp['paging']['total']
|
||||
after = resp['paging']['after'] if 'after' in resp['paging'] else None
|
||||
return PipelineEntities(pipelines=pipelines, total=total, after=after)
|
||||
|
||||
def close(self):
|
||||
self.client.close()
|
||||
|
||||
@ -20,18 +20,21 @@ from typing import Optional, List
|
||||
from elasticsearch import Elasticsearch
|
||||
|
||||
from metadata.generated.schema.entity.data.dashboard import Dashboard
|
||||
from metadata.generated.schema.entity.data.pipeline import Pipeline
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.data.task import Task
|
||||
from metadata.generated.schema.entity.data.topic import Topic
|
||||
from metadata.generated.schema.entity.data.chart import Chart
|
||||
from metadata.generated.schema.type import entityReference
|
||||
from metadata.ingestion.api.sink import Sink, SinkStatus
|
||||
from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig
|
||||
from metadata.ingestion.sink.elasticsearch_constants import TABLE_ELASTICSEARCH_INDEX_MAPPING, \
|
||||
TOPIC_ELASTICSEARCH_INDEX_MAPPING, DASHBOARD_ELASTICSEARCH_INDEX_MAPPING
|
||||
TOPIC_ELASTICSEARCH_INDEX_MAPPING, DASHBOARD_ELASTICSEARCH_INDEX_MAPPING, PIPELINE_ELASTICSEARCH_INDEX_MAPPING
|
||||
|
||||
from metadata.config.common import ConfigModel
|
||||
from metadata.ingestion.api.common import WorkflowContext, Record
|
||||
from metadata.ingestion.models.table_metadata import TableESDocument, TopicESDocument, DashboardESDocument
|
||||
from metadata.ingestion.models.table_metadata import TableESDocument, TopicESDocument, DashboardESDocument, \
|
||||
PipelineESDocument
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -40,11 +43,13 @@ class ElasticSearchConfig(ConfigModel):
|
||||
es_host: str
|
||||
es_port: int = 9200
|
||||
index_tables: Optional[bool] = True
|
||||
index_topics: Optional[bool] = False
|
||||
index_dashboards: Optional[bool] = False
|
||||
index_topics: Optional[bool] = True
|
||||
index_dashboards: Optional[bool] = True
|
||||
index_pipelines: Optional[bool] = True
|
||||
table_index_name: str = "table_search_index"
|
||||
topic_index_name: str = "topic_search_index"
|
||||
dashboard_index_name: str = "dashboard_search_index"
|
||||
pipeline_index_name: str = "pipeline_search_index"
|
||||
|
||||
|
||||
class ElasticsearchSink(Sink):
|
||||
@ -78,6 +83,8 @@ class ElasticsearchSink(Sink):
|
||||
self._check_or_create_index(self.config.topic_index_name, TOPIC_ELASTICSEARCH_INDEX_MAPPING)
|
||||
if self.config.index_dashboards:
|
||||
self._check_or_create_index(self.config.dashboard_index_name, DASHBOARD_ELASTICSEARCH_INDEX_MAPPING)
|
||||
if self.config.index_pipelines:
|
||||
self._check_or_create_index(self.config.pipeline_index_name, PIPELINE_ELASTICSEARCH_INDEX_MAPPING)
|
||||
|
||||
def _check_or_create_index(self, index_name: str, es_mapping: str):
|
||||
"""
|
||||
@ -89,11 +96,11 @@ class ElasticsearchSink(Sink):
|
||||
if not mapping[index_name]['mappings']:
|
||||
logger.debug(f'There are no mappings for index {index_name}. Updating the mapping')
|
||||
es_mapping_dict = json.loads(es_mapping)
|
||||
es_mapping_update_dict = {'properties': es_mapping_dict['mappings']['properties']}
|
||||
es_mapping_update_dict = {'properties': es_mapping_dict['mappings']['properties']}
|
||||
self.elasticsearch_client.indices.put_mapping(index=index_name, body=json.dumps(es_mapping_update_dict))
|
||||
else:
|
||||
logger.warning("Received index not found error from Elasticsearch. "
|
||||
+ "The index doesn't exist for a newly created ES. It's OK on first run.")
|
||||
+ "The index doesn't exist for a newly created ES. It's OK on first run.")
|
||||
# create new index with mapping
|
||||
self.elasticsearch_client.indices.create(index=index_name, body=es_mapping)
|
||||
|
||||
@ -110,7 +117,12 @@ class ElasticsearchSink(Sink):
|
||||
dashboard_doc = self._create_dashboard_es_doc(record)
|
||||
self.elasticsearch_client.index(index=self.config.dashboard_index_name, id=str(dashboard_doc.dashboard_id),
|
||||
body=dashboard_doc.json())
|
||||
if (hasattr(record.name,'__root__')):
|
||||
if isinstance(record, Pipeline):
|
||||
pipeline_doc = self._create_pipeline_es_doc(record)
|
||||
self.elasticsearch_client.index(index=self.config.pipeline_index_name, id=str(pipeline_doc.pipeline_id),
|
||||
body=pipeline_doc.json())
|
||||
|
||||
if (hasattr(record.name, '__root__')):
|
||||
self.status.records_written(record.name.__root__)
|
||||
else:
|
||||
self.status.records_written(record.name)
|
||||
@ -257,6 +269,52 @@ class ElasticsearchSink(Sink):
|
||||
|
||||
return dashboard_doc
|
||||
|
||||
def _create_pipeline_es_doc(self, pipeline: Pipeline):
|
||||
fqdn = pipeline.fullyQualifiedName
|
||||
suggest = [{'input': [pipeline.displayName], 'weight': 10}]
|
||||
tags = set()
|
||||
timestamp = time.time()
|
||||
service_entity = self.rest.get_pipeline_service_by_id(str(pipeline.service.id.__root__))
|
||||
pipeline_owner = str(pipeline.owner.id.__root__) if pipeline.owner is not None else ""
|
||||
pipeline_followers = []
|
||||
if pipeline.followers:
|
||||
for follower in pipeline.followers.__root__:
|
||||
pipeline_followers.append(str(follower.id.__root__))
|
||||
tier = None
|
||||
for pipeline_tag in pipeline.tags:
|
||||
if "Tier" in pipeline_tag.tagFQN:
|
||||
tier = pipeline_tag.tagFQN
|
||||
else:
|
||||
tags.add(pipeline_tag.tagFQN)
|
||||
tasks: List[Task] = self._get_tasks(pipeline.tasks)
|
||||
task_names = []
|
||||
task_descriptions = []
|
||||
for task in tasks:
|
||||
task_names.append(task.displayName)
|
||||
if task.description is not None:
|
||||
task_descriptions.append(task.description)
|
||||
if len(task.tags) > 0:
|
||||
for col_tag in task.tags:
|
||||
tags.add(col_tag.tagFQN)
|
||||
|
||||
pipeline_doc = PipelineESDocument(pipeline_id=str(pipeline.id.__root__),
|
||||
service=service_entity.name,
|
||||
service_type=service_entity.serviceType.name,
|
||||
pipeline_name=pipeline.displayName,
|
||||
task_names=task_names,
|
||||
task_descriptions=task_descriptions,
|
||||
suggest=suggest,
|
||||
description=pipeline.description,
|
||||
last_updated_timestamp=timestamp,
|
||||
tier=tier,
|
||||
tags=list(tags),
|
||||
fqdn=fqdn,
|
||||
owner=pipeline_owner,
|
||||
followers=pipeline_followers
|
||||
)
|
||||
|
||||
return pipeline_doc
|
||||
|
||||
def _get_charts(self, chart_refs: Optional[List[entityReference.EntityReference]]):
|
||||
charts = []
|
||||
if chart_refs is not None:
|
||||
@ -265,6 +323,14 @@ class ElasticsearchSink(Sink):
|
||||
charts.append(chart)
|
||||
return charts
|
||||
|
||||
def _get_tasks(self, task_refs: Optional[List[entityReference.EntityReference]]):
|
||||
tasks = []
|
||||
if task_refs is not None:
|
||||
for task_ref in task_refs:
|
||||
task = self.rest.get_task_by_id(str(task_ref.id.__root__))
|
||||
tasks.append(task)
|
||||
return tasks
|
||||
|
||||
def get_status(self):
|
||||
return self.status
|
||||
|
||||
|
||||
@ -220,4 +220,55 @@ DASHBOARD_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
|
||||
}
|
||||
}
|
||||
"""
|
||||
)
|
||||
)
|
||||
|
||||
PIPELINE_ELASTICSEARCH_INDEX_MAPPING = textwrap.dedent(
|
||||
"""
|
||||
{
|
||||
"mappings":{
|
||||
"properties": {
|
||||
"pipeline_name": {
|
||||
"type":"text"
|
||||
},
|
||||
"display_name": {
|
||||
"type": "text"
|
||||
},
|
||||
"owner": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"followers": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"last_updated_timestamp": {
|
||||
"type": "date",
|
||||
"format": "epoch_second"
|
||||
},
|
||||
"description": {
|
||||
"type": "text"
|
||||
},
|
||||
"task_names": {
|
||||
"type":"text"
|
||||
},
|
||||
"task_descriptions": {
|
||||
"type": "text"
|
||||
},
|
||||
"tier": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"tags": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"service": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"service_type": {
|
||||
"type": "keyword"
|
||||
},
|
||||
"suggest": {
|
||||
"type": "completion"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
)
|
||||
|
||||
@ -22,9 +22,13 @@ from metadata.config.common import ConfigModel
|
||||
from metadata.generated.schema.api.data.createChart import CreateChartEntityRequest
|
||||
from metadata.generated.schema.api.data.createDashboard import CreateDashboardEntityRequest
|
||||
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest
|
||||
from metadata.generated.schema.api.data.createPipeline import CreatePipelineEntityRequest
|
||||
from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest
|
||||
from metadata.generated.schema.api.data.createTask import CreateTaskEntityRequest
|
||||
from metadata.generated.schema.api.data.createTopic import CreateTopic
|
||||
from metadata.generated.schema.entity.data.chart import ChartType
|
||||
from metadata.generated.schema.entity.data.pipeline import Pipeline
|
||||
from metadata.generated.schema.entity.data.task import Task
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.common import WorkflowContext, Record
|
||||
from metadata.ingestion.api.sink import Sink, SinkStatus
|
||||
@ -82,6 +86,10 @@ class MetadataRestSink(Sink):
|
||||
self.write_charts(record)
|
||||
elif isinstance(record, Dashboard):
|
||||
self.write_dashboards(record)
|
||||
elif isinstance(record, Task):
|
||||
self.write_tasks(record)
|
||||
elif isinstance(record, Pipeline):
|
||||
self.write_pipelines(record)
|
||||
else:
|
||||
logging.info("Ignoring the record due to unknown Record type {}".format(type(record)))
|
||||
|
||||
@ -185,6 +193,42 @@ class MetadataRestSink(Sink):
|
||||
chart_references.append(self.charts_dict[chart_id])
|
||||
return chart_references
|
||||
|
||||
def write_tasks(self, task: Task):
|
||||
try:
|
||||
task_request = CreateTaskEntityRequest(
|
||||
name=task.name,
|
||||
displayName=task.displayName,
|
||||
description=task.description,
|
||||
taskUrl=task.taskUrl,
|
||||
downstreamTasks=task.downstreamTasks,
|
||||
service=task.service
|
||||
)
|
||||
created_task = self.client.create_or_update_task(task_request)
|
||||
logger.info('Successfully ingested Task {}'.format(created_task.displayName))
|
||||
self.status.records_written('{}'.format(created_task.displayName))
|
||||
except (APIError, ValidationError) as err:
|
||||
logger.error("Failed to ingest task {}".format(task.name))
|
||||
logger.error(err)
|
||||
self.status.failure(task.name)
|
||||
|
||||
def write_pipelines(self, pipeline: Pipeline):
|
||||
try:
|
||||
pipeline_request = CreatePipelineEntityRequest(
|
||||
name=pipeline.name,
|
||||
displayName=pipeline.displayName,
|
||||
description=pipeline.description,
|
||||
pipelineUrl=pipeline.pipelineUrl,
|
||||
tasks=pipeline.tasks,
|
||||
service=pipeline.service
|
||||
)
|
||||
created_pipeline = self.client.create_or_update_pipeline(pipeline_request)
|
||||
logger.info('Successfully ingested Task {}'.format(created_pipeline.displayName))
|
||||
self.status.records_written('{}'.format(created_pipeline.displayName))
|
||||
except (APIError, ValidationError) as err:
|
||||
logger.error("Failed to ingest task {}".format(pipeline.name))
|
||||
logger.error(err)
|
||||
self.status.failure(pipeline.name)
|
||||
|
||||
def get_status(self):
|
||||
return self.status
|
||||
|
||||
|
||||
@ -35,6 +35,7 @@ class MetadataTablesRestSourceConfig(ConfigModel):
|
||||
include_tables: Optional[bool] = True
|
||||
include_topics: Optional[bool] = True
|
||||
include_dashboards: Optional[bool] = True
|
||||
include_pipelines: Optional[bool] = True
|
||||
limit_records: int = 1000
|
||||
|
||||
|
||||
@ -89,6 +90,7 @@ class MetadataSource(Source):
|
||||
yield from self.fetch_table()
|
||||
yield from self.fetch_topic()
|
||||
yield from self.fetch_dashboard()
|
||||
yield from self.fetch_pipeline()
|
||||
|
||||
def fetch_table(self) -> Table:
|
||||
if self.config.include_tables:
|
||||
@ -132,6 +134,20 @@ class MetadataSource(Source):
|
||||
break
|
||||
after = dashboard_entities.after
|
||||
|
||||
def fetch_pipeline(self) -> Dashboard:
|
||||
if self.config.include_pipelines:
|
||||
after = None
|
||||
while True:
|
||||
pipeline_entities = self.client.list_pipelines(
|
||||
fields="owner,service,tags,followers,tasks", after=after,
|
||||
limit=self.config.limit_records)
|
||||
for pipeline in pipeline_entities.pipelines:
|
||||
self.status.scanned_dashboard(pipeline.name)
|
||||
yield pipeline
|
||||
if pipeline_entities.after is None:
|
||||
break
|
||||
after = pipeline_entities.after
|
||||
|
||||
def get_status(self) -> SourceStatus:
|
||||
return self.status
|
||||
|
||||
|
||||
@ -29,10 +29,14 @@ from metadata.config.common import ConfigModel
|
||||
from metadata.generated.schema.api.data.createTopic import CreateTopic
|
||||
from metadata.generated.schema.api.services.createDashboardService import CreateDashboardServiceEntityRequest
|
||||
from metadata.generated.schema.api.services.createMessagingService import CreateMessagingServiceEntityRequest
|
||||
from metadata.generated.schema.api.services.createPipelineService import CreatePipelineServiceEntityRequest
|
||||
from metadata.generated.schema.entity.data.pipeline import Pipeline
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.data.database import Database
|
||||
from metadata.generated.schema.entity.data.task import Task
|
||||
from metadata.generated.schema.entity.services.dashboardService import DashboardService
|
||||
from metadata.generated.schema.entity.services.messagingService import MessagingService
|
||||
from metadata.generated.schema.entity.services.pipelineService import PipelineService
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.common import Record
|
||||
from metadata.ingestion.api.source import SourceStatus, Source
|
||||
@ -64,6 +68,7 @@ def get_database_service_or_create(service_json, metadata_config) -> DatabaseSer
|
||||
created_service = client.create_database_service(CreateDatabaseServiceEntityRequest(**service_json))
|
||||
return created_service
|
||||
|
||||
|
||||
def get_messaging_service_or_create(service_json, metadata_config) -> MessagingService:
|
||||
client = OpenMetadataAPIClient(metadata_config)
|
||||
service = client.get_messaging_service(service_json['name'])
|
||||
@ -73,6 +78,7 @@ def get_messaging_service_or_create(service_json, metadata_config) -> MessagingS
|
||||
created_service = client.create_messaging_service(CreateMessagingServiceEntityRequest(**service_json))
|
||||
return created_service
|
||||
|
||||
|
||||
def get_dashboard_service_or_create(service_json, metadata_config) -> DashboardService:
|
||||
client = OpenMetadataAPIClient(metadata_config)
|
||||
service = client.get_dashboard_service(service_json['name'])
|
||||
@ -83,6 +89,16 @@ def get_dashboard_service_or_create(service_json, metadata_config) -> DashboardS
|
||||
return created_service
|
||||
|
||||
|
||||
def get_pipeline_service_or_create(service_json, metadata_config) -> PipelineService:
|
||||
client = OpenMetadataAPIClient(metadata_config)
|
||||
service = client.get_pipeline_service(service_json['name'])
|
||||
if service is not None:
|
||||
return service
|
||||
else:
|
||||
created_service = client.create_pipeline_service(CreatePipelineServiceEntityRequest(**service_json))
|
||||
return created_service
|
||||
|
||||
|
||||
def get_table_key(row: Dict[str, Any]) -> Union[TableKey, None]:
|
||||
"""
|
||||
Table key consists of schema and table name
|
||||
@ -281,6 +297,10 @@ class SampleDataSource(Source):
|
||||
self.charts = json.load(open(self.config.sample_data_folder + "/dashboards/charts.json", 'r'))
|
||||
self.dashboards = json.load(open(self.config.sample_data_folder + "/dashboards/dashboards.json", 'r'))
|
||||
self.dashboard_service = get_dashboard_service_or_create(self.dashboard_service_json, metadata_config)
|
||||
self.pipeline_service_json = json.load(open(self.config.sample_data_folder + "/pipelines/service.json", 'r'))
|
||||
self.tasks = json.load(open(self.config.sample_data_folder + "/pipelines/tasks.json", 'r'))
|
||||
self.pipelines = json.load(open(self.config.sample_data_folder + "/pipelines/pipelines.json", 'r'))
|
||||
self.pipeline_service = get_pipeline_service_or_create(self.pipeline_service_json, metadata_config)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config_dict, metadata_config_dict, ctx):
|
||||
@ -296,6 +316,8 @@ class SampleDataSource(Source):
|
||||
yield from self.ingest_topics()
|
||||
yield from self.ingest_charts()
|
||||
yield from self.ingest_dashboards()
|
||||
yield from self.ingest_tasks()
|
||||
yield from self.ingest_pipelines()
|
||||
|
||||
def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]:
|
||||
db = Database(id=uuid.uuid4(),
|
||||
@ -342,6 +364,38 @@ class SampleDataSource(Source):
|
||||
self.status.scanned("dashboard", dashboard_ev.name)
|
||||
yield dashboard_ev
|
||||
|
||||
def ingest_tasks(self) -> Iterable[Task]:
|
||||
for task in self.tasks['tasks']:
|
||||
task_ev = Task(id=uuid.uuid4(),
|
||||
name=task['name'],
|
||||
displayName=task['displayName'],
|
||||
description=task['description'],
|
||||
taskUrl=task['taskUrl'],
|
||||
taskType=task['taskType'],
|
||||
downstreamTasks=task['downstreamTasks'],
|
||||
service=EntityReference(id=self.pipeline_service.id, type='pipelineService'))
|
||||
yield task_ev
|
||||
|
||||
def ingest_pipelines(self) -> Iterable[Dashboard]:
|
||||
tasks = self.client.list_tasks("service")
|
||||
task_dict = {}
|
||||
for task in tasks:
|
||||
task_dict[task.name] = task
|
||||
|
||||
for pipeline in self.pipelines['pipelines']:
|
||||
task_refs = []
|
||||
for task in pipeline['tasks']:
|
||||
if task in task_dict:
|
||||
task_refs.append(EntityReference(id=task_dict[task].id, type='task'))
|
||||
pipeline_ev = Pipeline(id=uuid.uuid4(),
|
||||
name=pipeline['name'],
|
||||
displayName=pipeline['displayName'],
|
||||
description=pipeline['description'],
|
||||
pipelineUrl=pipeline['pipelineUrl'],
|
||||
tasks=task_refs,
|
||||
service=EntityReference(id=self.pipeline_service.id, type='pipelineService'))
|
||||
yield pipeline_ev
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user