Merge remote-tracking branch 'origin/1.5.7' into 1.5.7

This commit is contained in:
sushi30 2024-10-14 11:39:48 +02:00
commit bf9a098aa6
12 changed files with 723 additions and 59 deletions

View File

@ -22,6 +22,61 @@ REQUIRED_MANIFEST_KEYS = ["name", "schema", "resource_type"]
# Based on https://schemas.getdbt.com/dbt/catalog/v1.json
REQUIRED_CATALOG_KEYS = ["name", "type", "index"]
REQUIRED_RESULTS_KEYS = {
"status",
"timing",
"thread_id",
"execution_time",
"message",
"adapter_response",
"unique_id",
}
REQUIRED_NODE_KEYS = {
"schema_",
"schema",
"freshness",
"name",
"resource_type",
"path",
"unique_id",
"source_name",
"source_description",
"source_meta",
"loader",
"identifier",
"relation_name",
"fqn",
"alias",
"checksum",
"config",
"column_name",
"test_metadata",
"original_file_path",
"root_path",
"database",
"tags",
"description",
"columns",
"meta",
"owner",
"created_at",
"group",
"sources",
"compiled",
"docs",
"version",
"latest_version",
"package_name",
"depends_on",
"compiled_code",
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}
NONE_KEYWORDS_LIST = ["none", "null"]
DBT_CATALOG_FILE_NAME = "catalog.json"

View File

@ -13,7 +13,7 @@ DBT service Topology.
"""
from abc import ABC, abstractmethod
from typing import Iterable
from typing import Iterable, List
from dbt_artifacts_parser.parser import parse_catalog, parse_manifest, parse_run_results
from pydantic import Field
@ -37,6 +37,10 @@ from metadata.ingestion.models.topology import (
TopologyNode,
)
from metadata.ingestion.source.database.database_service import DataModelLink
from metadata.ingestion.source.database.dbt.constants import (
REQUIRED_NODE_KEYS,
REQUIRED_RESULTS_KEYS,
)
from metadata.ingestion.source.database.dbt.dbt_config import get_dbt_details
from metadata.ingestion.source.database.dbt.models import (
DbtFiles,
@ -169,51 +173,27 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
}
)
required_nodes_keys = {
"schema_",
"schema",
"name",
"resource_type",
"path",
"unique_id",
"fqn",
"alias",
"checksum",
"config",
"column_name",
"test_metadata",
"original_file_path",
"root_path",
"database",
"tags",
"description",
"columns",
"meta",
"owner",
"created_at",
"group",
"sources",
"compiled",
"docs",
"version",
"latest_version",
"package_name",
"depends_on",
"compiled_code",
"compiled_sql",
"raw_code",
"raw_sql",
"language",
}
for field in ["nodes", "sources"]:
for node, value in manifest_dict.get( # pylint: disable=unused-variable
field
).items():
keys_to_delete = [
key for key in value if key.lower() not in REQUIRED_NODE_KEYS
]
for key in keys_to_delete:
del value[key]
for node, value in manifest_dict.get( # pylint: disable=unused-variable
"nodes"
).items():
keys_to_delete = [
key for key in value if key.lower() not in required_nodes_keys
]
for key in keys_to_delete:
del value[key]
def remove_run_result_non_required_keys(self, run_results: List[dict]):
"""
Method to remove the non required keys from run results file
"""
for run_result in run_results:
for result in run_result.get("results"):
keys_to_delete = [
key for key in result if key.lower() not in REQUIRED_RESULTS_KEYS
]
for key in keys_to_delete:
del result[key]
def get_dbt_files(self) -> Iterable[DbtFiles]:
dbt_files = get_dbt_details(self.source_config.dbtConfigSource)
@ -225,6 +205,10 @@ class DbtServiceSource(TopologyRunnerMixin, Source, ABC):
self.remove_manifest_non_required_keys(
manifest_dict=self.context.get().dbt_file.dbt_manifest
)
if self.context.get().dbt_file.dbt_run_results:
self.remove_run_result_non_required_keys(
run_results=self.context.get().dbt_file.dbt_run_results
)
dbt_objects = DbtObjects(
dbt_catalog=parse_catalog(self.context.get().dbt_file.dbt_catalog)
if self.context.get().dbt_file.dbt_catalog

View File

@ -25,6 +25,7 @@ from metadata.generated.schema.entity.data.pipeline import (
Task,
TaskStatus,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.pipeline.gluePipelineConnection import (
GluePipelineConnection,
)
@ -40,14 +41,25 @@ from metadata.generated.schema.type.basic import (
SourceUrl,
Timestamp,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.gluepipeline.models import (
AmazonRedshift,
CatalogSource,
JDBCSource,
JobNodeResponse,
S3Source,
S3Target,
)
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils import fqn
from metadata.utils.logger import ingestion_logger
from metadata.utils.time_utils import convert_timestamp_to_milliseconds
from metadata.utils.time_utils import datetime_to_timestamp
logger = ingestion_logger()
@ -63,6 +75,28 @@ STATUS_MAP = {
"incomplete": StatusType.Failed,
"pending": StatusType.Pending,
}
TABLE_MODEL_MAP = {
"AmazonRedshiftSource": AmazonRedshift,
"AmazonRedshiftTarget": AmazonRedshift,
"AthenaConnectorSource": JDBCSource,
"JDBCConnectorSource": JDBCSource,
"JDBCConnectorTarget": JDBCSource,
"DirectJDBCSource": CatalogSource,
"RedshiftSource": CatalogSource,
"RedshiftTarget": CatalogSource,
"DirectJDBC": CatalogSource,
}
STORAGE_MODEL_MAP = {
"S3CsvSource": S3Source,
"S3JsonSource": S3Source,
"S3ParquetSource": S3Source,
"S3HudiSource": S3Source,
"S3DeltaSource": S3Source,
"S3DirectTarget": S3Target,
"S3DeltaDirectTarget": S3Target,
"S3GlueParquetTarget": S3Target,
"S3HudiDirectTarget": S3Target,
}
class GluepipelineSource(PipelineServiceSource):
@ -145,9 +179,88 @@ class GluepipelineSource(PipelineServiceSource):
downstream_tasks.append(self.task_id_mapping[edges["DestinationId"]])
return downstream_tasks
def get_lineage_details(self, job) -> Optional[dict]:
"""
Get the Lineage Details of the pipeline
"""
lineage_details = {"sources": [], "targets": []}
try:
job_details = JobNodeResponse.model_validate(
self.glue.get_job(JobName=job)
).Job
if job_details and job_details.config_nodes:
nodes = job_details.config_nodes
for _, node in nodes.items():
for key, entity in node.items():
table_model, storage_model = None, None
if key in TABLE_MODEL_MAP:
table_model = TABLE_MODEL_MAP[key].model_validate(entity)
elif "Catalog" in key:
table_model = CatalogSource.model_validate(entity)
elif key in STORAGE_MODEL_MAP:
storage_model = STORAGE_MODEL_MAP[key].model_validate(
entity
)
if table_model:
for db_service_name in self.get_db_service_names():
table_entity = self.metadata.get_entity_reference(
entity=Table,
fqn=fqn.build(
metadata=self.metadata,
entity_type=Table,
table_name=table_model.table_name,
database_name=table_model.database_name,
schema_name=table_model.schema_name,
service_name=db_service_name,
),
)
if table_entity:
if key.endswith("Source"):
lineage_details["sources"].append(table_entity)
else:
lineage_details["targets"].append(table_entity)
break
if storage_model:
for path in storage_model.Paths or [storage_model.Path]:
container = self.metadata.es_search_container_by_path(
full_path=path
)
if container and container[0]:
storage_entity = EntityReference(
id=container[0].id,
type="container",
name=container[0].name.root,
fullyQualifiedName=container[
0
].fullyQualifiedName.root,
)
if storage_entity:
if key.endswith("Source"):
lineage_details["sources"].append(
storage_entity
)
else:
lineage_details["targets"].append(
storage_entity
)
break
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to get lineage details for job : {job} due to : {exc}"
)
return lineage_details
def yield_pipeline_status(
self, pipeline_details: Any
) -> Iterable[Either[OMetaPipelineStatus]]:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
for job in self.job_name_list:
try:
runs = self.glue.get_job_runs(JobName=job)
@ -161,13 +274,13 @@ class GluepipelineSource(PipelineServiceSource):
attempt["JobRunState"].lower(), StatusType.Pending
).value,
startTime=Timestamp(
convert_timestamp_to_milliseconds(
attempt["StartedOn"].timestamp()
datetime_to_timestamp(
attempt["StartedOn"], milliseconds=True
)
),
endTime=Timestamp(
convert_timestamp_to_milliseconds(
attempt["CompletedOn"].timestamp()
datetime_to_timestamp(
attempt["CompletedOn"], milliseconds=True
)
),
)
@ -175,20 +288,14 @@ class GluepipelineSource(PipelineServiceSource):
pipeline_status = PipelineStatus(
taskStatus=task_status,
timestamp=Timestamp(
convert_timestamp_to_milliseconds(
attempt["StartedOn"].timestamp()
datetime_to_timestamp(
attempt["StartedOn"], milliseconds=True
)
),
executionStatus=STATUS_MAP.get(
attempt["JobRunState"].lower(), StatusType.Pending
).value,
)
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
yield Either(
right=OMetaPipelineStatus(
pipeline_fqn=pipeline_fqn,
@ -199,7 +306,7 @@ class GluepipelineSource(PipelineServiceSource):
yield Either(
left=StackTraceError(
name=pipeline_fqn,
error=f"Failed to yield pipeline status: {exc}",
error=f"Failed to yield pipeline status for job {job}: {exc}",
stackTrace=traceback.format_exc(),
)
)
@ -210,3 +317,42 @@ class GluepipelineSource(PipelineServiceSource):
"""
Get lineage between pipeline and data sources
"""
try:
pipeline_fqn = fqn.build(
metadata=self.metadata,
entity_type=Pipeline,
service_name=self.context.get().pipeline_service,
pipeline_name=self.context.get().pipeline,
)
pipeline_entity = self.metadata.get_by_name(
entity=Pipeline, fqn=pipeline_fqn
)
lineage_details = LineageDetails(
pipeline=EntityReference(id=pipeline_entity.id.root, type="pipeline"),
source=LineageSource.PipelineLineage,
)
for job in self.job_name_list:
lineage_enities = self.get_lineage_details(job)
for source in lineage_enities.get("sources"):
for target in lineage_enities.get("targets"):
yield Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=source,
toEntity=target,
lineageDetails=lineage_details,
)
)
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=pipeline_details.get(NAME),
error=f"Wild error ingesting pipeline lineage {pipeline_details} - {exc}",
stackTrace=traceback.format_exc(),
)
)

View File

@ -0,0 +1,78 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Glue Pipeline Source Model module
"""
from typing import List, Optional
from pydantic import BaseModel, Field
class EntityDetails(BaseModel):
Value: str
class SourceDetails(BaseModel):
schema_details: EntityDetails = Field(alias="Schema")
table_details: EntityDetails = Field(alias="Table")
class AmazonRedshift(BaseModel):
Name: str
Data: SourceDetails
database_name: Optional[str] = None
@property
def table_name(self):
if self.Data:
return self.Data.table_details.Value
return None
@property
def schema_name(self):
if self.Data:
return self.Data.schema_details.Value
return None
class CatalogSource(BaseModel):
Name: str
database_name: str = Field(alias="Database")
schema_name: Optional[str] = None
table_name: str = Field(alias="Table")
class JDBCSource(BaseModel):
Name: str
schema_name: Optional[str] = Field(default=None, alias="SchemaName")
database_name: Optional[str] = None
table_name: str = Field(alias="ConnectionTable")
class S3Source(BaseModel):
Name: str
Paths: List[str]
class S3Target(BaseModel):
Name: str
Path: str
Paths: Optional[str] = None
class JobNodes(BaseModel):
config_nodes: Optional[dict] = Field(alias="CodeGenConfigurationNodes")
class JobNodeResponse(BaseModel):
Job: Optional[JobNodes] = None

View File

@ -285,6 +285,16 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
else []
)
def get_storage_service_names(self) -> List[str]:
"""
Get the list of storage service names
"""
return (
self.source_config.lineageInformation.storageServiceNames or []
if self.source_config.lineageInformation
else []
)
def prepare(self):
"""
Method to implement any required logic before starting the ingestion process

View File

@ -0,0 +1,341 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test dbt cloud using the topology
"""
import json
from unittest import TestCase
from unittest.mock import patch
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.entity.data.pipeline import Pipeline, Task
from metadata.generated.schema.entity.services.pipelineService import (
PipelineConnection,
PipelineService,
PipelineServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
Markdown,
SourceUrl,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.source.pipeline.gluepipeline.metadata import GluepipelineSource
mock_glue_config = {
"source": {
"type": "gluepipeline",
"serviceName": "local_gluepipeline",
"serviceConnection": {
"config": {
"type": "GluePipeline",
"awsConfig": {
"awsAccessKeyId": "aws_access_key_id",
"awsSecretAccessKey": "aws_secret_access_key",
"awsRegion": "us-east-2",
"endPointURL": "https://endpoint.com/",
},
},
},
"sourceConfig": {"config": {"type": "PipelineMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
EXPECTED_JOB_DETAILS = json.loads(
"""
{
"Name": "redshift workflow",
"Description": "redshift workflow description",
"DefaultRunProperties": {},
"CreatedOn": "2024-09-20 15:46:36.668000",
"LastModifiedOn": "2024-09-20 15:46:36.668000",
"LastRun": {
"Name": "redshift workflow",
"WorkflowRunId": "wr_6db99d3ea932db0739f03ba5ae56e4b635b7878261f75af062e1223a7272c50e",
"WorkflowRunProperties": {},
"StartedOn": "2024-09-30 17:07:24.032000",
"CompletedOn": "2024-09-30 17:08:24.032000",
"Status": "COMPLETED",
"Statistics": {
"TotalActions": 1,
"TimeoutActions": 0,
"FailedActions": 1,
"StoppedActions": 0,
"SucceededActions": 0,
"RunningActions": 0,
"ErroredActions": 0,
"WaitingActions": 0
},
"Graph": {
"Nodes": [
{
"Type": "TRIGGER",
"Name": "redshift_event",
"UniqueId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee",
"TriggerDetails": {
"Trigger": {
"Name": "redshift_event",
"WorkflowName": "redshift workflow",
"Type": "ON_DEMAND",
"State": "CREATED",
"Actions": [
{
"JobName": "Redshift DBT Job"
}
]
}
}
},
{
"Type": "JOB",
"Name": "Redshift DBT Job",
"UniqueId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123",
"JobDetails": {
"JobRuns": [
{
"Id": "jr_108804857dd29cb1857c92d3e8bf0b48f7685c246e56125b713eb6ea7ebfe4e2",
"Attempt": 0,
"TriggerName": "redshift_event",
"JobName": "Redshift DBT Job",
"JobMode": "VISUAL",
"JobRunQueuingEnabled": false,
"StartedOn": "2024-09-30 17:07:59.185000",
"LastModifiedOn": "2024-09-30 17:08:03.003000",
"CompletedOn": "2024-09-30 17:08:03.003000",
"JobRunState": "FAILED",
"ErrorMessage": "Error Message",
"PredecessorRuns": [],
"AllocatedCapacity": 10,
"ExecutionTime": 0,
"Timeout": 2880,
"MaxCapacity": 10.0,
"WorkerType": "G.1X",
"NumberOfWorkers": 10,
"LogGroupName": "/aws-glue/jobs",
"GlueVersion": "4.0",
"ExecutionClass": "STANDARD"
}
]
}
}
],
"Edges": [
{
"SourceId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee",
"DestinationId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123"
}
]
}
},
"Graph": {
"Nodes": [
{
"Type": "TRIGGER",
"Name": "redshift_event",
"UniqueId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee",
"TriggerDetails": {
"Trigger": {
"Name": "redshift_event",
"WorkflowName": "redshift workflow",
"Type": "ON_DEMAND",
"State": "CREATED",
"Actions": [
{
"JobName": "Redshift DBT Job"
}
]
}
}
},
{
"Type": "JOB",
"Name": "Redshift DBT Job",
"UniqueId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123",
"JobDetails": {}
}
],
"Edges": [
{
"SourceId": "wnode_98c85bc1e19d969e35e0687b2ec586822271463c72dd556f90cfe6421a2517ee",
"DestinationId": "wnode_0cbf9f52c41002015ebc46fe70a9b0ea64ff7dba891cf141d6dcbf5580fe7123"
}
]
}
}
"""
)
EXPECTED_CREATED_PIPELINES = CreatePipelineRequest(
name=EntityName(root="redshift workflow"),
displayName="redshift workflow",
description=None,
dataProducts=None,
sourceUrl=SourceUrl(
root="https://us-east-2.console.aws.amazon.com/glue/home?region=us-east-2#/v2/etl-configuration/workflows/view/redshift workflow"
),
concurrency=None,
pipelineLocation=None,
startDate=None,
tasks=[
Task(
name="redshift_event",
displayName="redshift_event",
fullyQualifiedName=None,
description=None,
sourceUrl=None,
downstreamTasks=["Redshift DBT Job"],
taskType="TRIGGER",
taskSQL=None,
startDate=None,
endDate=None,
tags=None,
owners=None,
),
Task(
name="Redshift DBT Job",
displayName="Redshift DBT Job",
fullyQualifiedName=None,
description=None,
sourceUrl=None,
downstreamTasks=[],
taskType="JOB",
taskSQL=None,
startDate=None,
endDate=None,
tags=None,
owners=None,
),
],
tags=None,
owners=None,
service=FullyQualifiedEntityName(root="gluepipeline_test"),
extension=None,
scheduleInterval=None,
domain=None,
lifeCycle=None,
sourceHash=None,
)
MOCK_PIPELINE_SERVICE = PipelineService(
id="85811038-099a-11ed-861d-0242ac120002",
name="gluepipeline_test",
fullyQualifiedName=FullyQualifiedEntityName("gluepipeline_test"),
connection=PipelineConnection(),
serviceType=PipelineServiceType.DBTCloud,
)
MOCK_PIPELINE = Pipeline(
id="2aaa012e-099a-11ed-861d-0242ac120002",
name=EntityName(root="redshift workflow"),
fullyQualifiedName="gluepipeline_test.redshift workflow",
displayName="OpenMetadata DBTCloud Workflow",
description=Markdown(root="Example Job Description"),
dataProducts=None,
sourceUrl=SourceUrl(
root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/jobs/70403103936332"
),
concurrency=None,
pipelineLocation=None,
startDate=None,
tasks=[
Task(
name="70403110257794",
displayName=None,
fullyQualifiedName=None,
description=None,
sourceUrl=SourceUrl(
root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/runs/70403110257794/"
),
downstreamTasks=None,
taskType=None,
taskSQL=None,
startDate="2024-05-27 10:42:20.621788+00:00",
endDate="2024-05-28 10:42:52.622408+00:00",
tags=None,
owners=None,
),
Task(
name="70403111615088",
displayName=None,
fullyQualifiedName=None,
description=None,
sourceUrl=SourceUrl(
root="https://abc12.us1.dbt.com/deploy/70403103922125/projects/70403103926818/runs/70403111615088/"
),
downstreamTasks=None,
taskType=None,
taskSQL=None,
startDate="None",
endDate="None",
tags=None,
owners=None,
),
],
tags=None,
owners=None,
service=EntityReference(
id="85811038-099a-11ed-861d-0242ac120002", type="pipelineService"
),
extension=None,
scheduleInterval="6 */12 * * 0,1,2,3,4,5,6",
domain=None,
lifeCycle=None,
sourceHash=None,
)
EXPECTED_PIPELINE_NAME = "redshift workflow"
class GluePipelineUnitTest(TestCase):
"""
DBTCloud unit tests
"""
@patch(
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"
)
def __init__(self, methodName, test_connection) -> None:
super().__init__(methodName)
test_connection.return_value = False
config = OpenMetadataWorkflowConfig.model_validate(mock_glue_config)
self.gluepipeline = GluepipelineSource.create(
mock_glue_config["source"],
config.workflowConfig.openMetadataServerConfig,
)
self.gluepipeline.context.get().__dict__["pipeline"] = MOCK_PIPELINE.name.root
self.gluepipeline.context.get().__dict__[
"pipeline_service"
] = MOCK_PIPELINE_SERVICE.name.root
def test_pipeline_name(self):
assert (
self.gluepipeline.get_pipeline_name(EXPECTED_JOB_DETAILS)
== EXPECTED_PIPELINE_NAME
)
def test_pipelines(self):
pipeline = list(self.gluepipeline.yield_pipeline(EXPECTED_JOB_DETAILS))[0].right
assert pipeline == EXPECTED_CREATED_PIPELINES

View File

@ -175,6 +175,8 @@ public interface SearchClient {
void deleteEntityByFields(List<String> indexName, List<Pair<String, String>> fieldAndValue);
void deleteEntityByFQNPrefix(String indexName, String fqnPrefix);
void softDeleteOrRestoreEntity(String indexName, String docId, String scriptTxt);
void softDeleteOrRestoreChildren(

View File

@ -554,6 +554,25 @@ public class SearchRepository {
}
}
public void deleteEntityByFQNPrefix(EntityInterface entity) {
if (entity != null) {
String entityType = entity.getEntityReference().getType();
String fqn = entity.getFullyQualifiedName();
IndexMapping indexMapping = entityIndexMap.get(entityType);
try {
searchClient.deleteEntityByFQNPrefix(indexMapping.getIndexName(clusterAlias), fqn);
} catch (Exception ie) {
LOG.error(
"Issue in Deleting the search document for entityFQN [{}] and entityType [{}]. Reason[{}], Cause[{}], Stack [{}]",
fqn,
entityType,
ie.getMessage(),
ie.getCause(),
ExceptionUtils.getStackTrace(ie));
}
}
}
public void deleteTimeSeriesEntityById(EntityTimeSeriesInterface entity) {
if (entity != null) {
String entityId = entity.getId().toString();

View File

@ -71,6 +71,7 @@ import es.org.elasticsearch.index.query.BoolQueryBuilder;
import es.org.elasticsearch.index.query.MatchQueryBuilder;
import es.org.elasticsearch.index.query.MultiMatchQueryBuilder;
import es.org.elasticsearch.index.query.Operator;
import es.org.elasticsearch.index.query.PrefixQueryBuilder;
import es.org.elasticsearch.index.query.QueryBuilder;
import es.org.elasticsearch.index.query.QueryBuilders;
import es.org.elasticsearch.index.query.QueryStringQueryBuilder;
@ -1619,6 +1620,16 @@ public class ElasticSearchClient implements SearchClient {
}
}
@Override
public void deleteEntityByFQNPrefix(String indexName, String fqnPrefix) {
if (isClientAvailable) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
deleteByQueryRequest.setQuery(
new PrefixQueryBuilder("fullyQualifiedName.keyword", fqnPrefix.toLowerCase()));
deleteEntityFromElasticSearchByQuery(deleteByQueryRequest);
}
}
@Override
public void softDeleteOrRestoreEntity(String indexName, String docId, String scriptTxt) {
if (isClientAvailable) {

View File

@ -155,6 +155,7 @@ import os.org.opensearch.index.query.BoolQueryBuilder;
import os.org.opensearch.index.query.MatchQueryBuilder;
import os.org.opensearch.index.query.MultiMatchQueryBuilder;
import os.org.opensearch.index.query.Operator;
import os.org.opensearch.index.query.PrefixQueryBuilder;
import os.org.opensearch.index.query.QueryBuilder;
import os.org.opensearch.index.query.QueryBuilders;
import os.org.opensearch.index.query.QueryStringQueryBuilder;
@ -1589,6 +1590,16 @@ public class OpenSearchClient implements SearchClient {
}
}
@Override
public void deleteEntityByFQNPrefix(String indexName, String fqnPrefix) {
if (isClientAvailable) {
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);
deleteByQueryRequest.setQuery(
new PrefixQueryBuilder("fullyQualifiedName.keyword", fqnPrefix.toLowerCase()));
deleteEntityFromOpenSearchByQuery(deleteByQueryRequest);
}
}
@Override
public void deleteEntityByFields(
List<String> indexName, List<Pair<String, String>> fieldAndValue) {

View File

@ -24,6 +24,7 @@
background-color: white;
.ant-layout-header {
line-height: inherit;
background: @white;
}
.navbar-container {
border-bottom: 1px solid @border-color;

View File

@ -261,6 +261,12 @@
color: initial;
}
}
.left-sidebar-menu.ant-menu > .ant-menu-item {
&:first-child {
margin-top: 0;
}
}
}
.left-panel-item.active,