MINOR: Restructure dbServiceName field in dashboard and pipeline (#15548)

This commit is contained in:
Mayur Singal 2024-03-15 12:42:47 +05:30 committed by GitHub
parent ed41f25f18
commit 88ab7475e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 171 additions and 45 deletions

View File

@ -7,6 +7,22 @@ ALTER TABLE query_entity ADD COLUMN checksum VARCHAR(32) GENERATED ALWAYS AS (js
UPDATE query_entity SET json = JSON_INSERT(json, '$.checksum', MD5(JSON_UNQUOTE(JSON_EXTRACT(json, '$.checksum'))));
-- Restructure dbServiceNames in ingestion_pipeline_entity
update ingestion_pipeline_entity set json =
JSON_INSERT(
JSON_REMOVE(json, '$.sourceConfig.config.dbServiceNames'),
'$.sourceConfig.config.lineageInformation',
JSON_OBJECT(
'dbServiceNames',
JSON_EXTRACT(json, '$.sourceConfig.config.dbServiceNames')
)
)
where
JSON_EXTRACT(json, '$.sourceConfig.config.type') in ('DashboardMetadata', 'PipelineMetadata')
AND JSON_EXTRACT(json, '$.sourceConfig.config.dbServiceNames') is not null;
ALTER TABLE chart_entity ADD INDEX index_chart_entity_deleted(fqnHash, deleted);
ALTER TABLE dashboard_data_model_entity ADD INDEX index_dashboard_data_model_entity_deleted(fqnHash, deleted);
ALTER TABLE dashboard_entity ADD INDEX index_dashboard_entity_deleted(fqnHash, deleted);

View File

@ -8,6 +8,19 @@ ALTER TABLE query_entity ADD COLUMN checksum varchar(32) GENERATED ALWAYS AS (js
UPDATE query_entity SET json = jsonb_set(json::jsonb, '{checksum}', MD5(json->'connection'));
-- Restructure dbServiceNames in ingestion_pipeline_entity
update ingestion_pipeline_entity ipe set json = JSONB_SET(
json::jsonb #- '{sourceConfig,config,dbServiceNames}',
'{sourceConfig,config,lineageInformation}',
jsonb_build_object(
'dbServiceNames',
json#>'{sourceConfig,config,dbServiceNames}'
)
)
WHERE (json#>'{sourceConfig,config,type}')::varchar(255) IN ('"DashboardMetadata"', '"PipelineMetadata"')
and json#>'{sourceConfig,config,dbServiceNames}' is not null;
CREATE INDEX index_chart_entity_deleted ON chart_entity (fqnHash, deleted);
CREATE INDEX index_dashboard_data_model_entity_deleted ON dashboard_data_model_entity (fqnHash, deleted);
CREATE INDEX index_dashboard_entity_deleted ON dashboard_entity (fqnHash, deleted);
@ -53,3 +66,4 @@ CREATE INDEX index_user_entity_deleted ON user_entity (nameHash, deleted);
CREATE INDEX apps_extension_time_series_index ON apps_extension_time_series (appId);
CREATE INDEX index_suggestions_type ON suggestions (suggestionType);
CREATE INDEX index_suggestions_status ON suggestions (status);

View File

@ -21,7 +21,8 @@ source:
includes:
- Supplier Quality Analysis Sample
- "Customer"
dbServiceNames: [local_redshift]
lineageInformation:
dbServiceNames: [local_redshift]
sink:
type: metadata-rest
config: {}

View File

@ -23,9 +23,10 @@ source:
sourceConfig:
config:
type: DashboardMetadata
dbServiceNames:
- mysql
- postgres
lineageInformation:
dbServiceNames:
- mysql
- postgres
chartFilterPattern: {}
# dashboardFilterPattern:
# includes:

View File

@ -9,9 +9,10 @@ source:
sourceConfig:
config:
type: PipelineMetadata
dbServiceNames:
- local_databricks
- local_postgres_empty1
lineageInformation:
dbServiceNames:
- local_databricks
- local_postgres_empty1
pipelineFilterPattern:
includes:

View File

@ -319,6 +319,16 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
f"Error to yield dashboard lineage details for data model name [{datamodel.name}]: {err}"
)
def get_db_service_names(self) -> List[str]:
"""
Get the list of db service names
"""
return (
self.source_config.lineageInformation.dbServiceNames or []
if self.source_config.lineageInformation
else []
)
def yield_dashboard_lineage(
self, dashboard_details: Any
) -> Iterable[Either[AddLineageRequest]]:
@ -330,7 +340,9 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
"""
yield from self.yield_datamodel_dashboard_lineage() or []
for db_service_name in self.source_config.dbServiceNames or []:
db_service_names = self.get_db_service_names()
for db_service_name in db_service_names or []:
yield from self.yield_dashboard_lineage_details(
dashboard_details, db_service_name
) or []

View File

@ -541,11 +541,13 @@ class LookerSource(DashboardServiceSource):
" while processing view lineage."
)
db_service_names = self.get_db_service_names()
if view.sql_table_name:
source_table_name = self._clean_table_name(view.sql_table_name)
# View to the source is only there if we are informing the dbServiceNames
for db_service_name in self.source_config.dbServiceNames or []:
for db_service_name in db_service_names or []:
yield self.build_lineage_request(
source=source_table_name,
db_service_name=db_service_name,
@ -556,7 +558,7 @@ class LookerSource(DashboardServiceSource):
sql_query = view.derived_table.sql
if not sql_query:
return
for db_service_name in self.source_config.dbServiceNames or []:
for db_service_name in db_service_names or []:
db_service = self.metadata.get_by_name(
DatabaseService, db_service_name
)

View File

@ -152,7 +152,7 @@ class OpenlineageSource(PipelineServiceSource):
:return: fully qualified name of a Table in Open Metadata
"""
result = None
services = self.source_config.dbServiceNames
services = self.get_db_service_names()
for db_service in services:
result = fqn.build(
metadata=self.metadata,
@ -176,7 +176,7 @@ class OpenlineageSource(PipelineServiceSource):
:return: fully qualified name of a DatabaseSchema in Open Metadata
"""
result = None
services = self.source_config.dbServiceNames
services = self.get_db_service_names()
for db_service in services:
result = fqn.build(

View File

@ -231,6 +231,16 @@ class PipelineServiceSource(TopologyRunnerMixin, Source, ABC):
params={"service": self.context.pipeline_service},
)
def get_db_service_names(self) -> List[str]:
"""
Get the list of db service names
"""
return (
self.source_config.lineageInformation.dbServiceNames or []
if self.source_config.lineageInformation
else []
)
def prepare(self):
"""
Method to implement any required logic before starting the ingestion process

View File

@ -113,7 +113,7 @@ class SplineSource(PipelineServiceSource):
) -> Optional[Table]:
if not table_name:
return None
for service_name in self.source_config.dbServiceNames:
for service_name in self.get_db_service_names():
table_fqn = fqn.build(
metadata=self.metadata,
entity_type=Table,
@ -163,7 +163,7 @@ class SplineSource(PipelineServiceSource):
"""
Parse all the executions available and create lineage
"""
if not self.source_config.dbServiceNames:
if not self.get_db_service_names():
return
lineage_details = self.client.get_lineage_details(
pipeline_details.executionPlanId

View File

@ -12,8 +12,9 @@ source:
type: DashboardMetadata
dashboardFilterPattern: {}
chartFilterPattern: {}
dbServiceNames:
- local_redshift_metabase
lineageInformation:
dbServiceNames:
- local_redshift_metabase
sink:
type: metadata-rest
config: {}

View File

@ -16,8 +16,9 @@ source:
type: DashboardMetadata
dashboardFilterPattern: {}
chartFilterPattern: {}
dbServiceNames:
- local_redshift_powerbi
lineageInformation:
dbServiceNames:
- local_redshift_powerbi
sink:
type: metadata-rest
config: {}

View File

@ -15,8 +15,9 @@ source:
sourceConfig:
config:
type: DashboardMetadata
dbServiceNames:
- local_redshift_tableau
lineageInformation:
dbServiceNames:
- local_redshift_tableau
sink:
type: metadata-rest
config: {}

View File

@ -417,7 +417,7 @@ class TestWorkflowParse(TestCase):
"type": "PipelineMetadata",
"includeTags": True,
"includeOwners": True,
"dbServiceNames": ["dev"],
"lineageInformation": {"dbServiceNames": ["dev"]},
"includeLineage": True,
"markDeletedPipelines": True,
"pipelineFilterPattern": {
@ -454,7 +454,7 @@ class TestWorkflowParse(TestCase):
"type": "PipelineMetadata",
"includeTags": True,
"includeOwners": True,
"dbServiceNames": ["dev"],
"lineageInformation": {"dbServiceNames": ["dev"]},
"includeViewLineage": True,
"markDeletedDbs": True,
"pipelineFilterPatterns": {

View File

@ -142,7 +142,9 @@ class OpenLineageUnitTest(unittest.TestCase):
self.open_lineage_source.context.__dict__[
"pipeline_service"
] = MOCK_PIPELINE_SERVICE.name.__root__
self.open_lineage_source.source_config.dbServiceNames = ["skun"]
self.open_lineage_source.source_config.lineageInformation = {
"dbServiceNames": ["skun"]
}
@patch(
"metadata.ingestion.source.pipeline.pipeline_service.PipelineServiceSource.test_connection"

View File

@ -3,9 +3,10 @@
config:
type: DashboardMetadata
overrideOwner: True
# dbServiceNames:
# - service1
# - service2
# lineageInformation:
# dbServiceNames:
# - service1
# - service2
# dashboardFilterPattern:
# includes:
# - dashboard1

View File

@ -225,9 +225,10 @@ source:
markDeletedDashboards: True
includeTags: True
includeDataModels: True
# dbServiceNames:
# - service1
# - service2
# lineageInformation:
# dbServiceNames:
# - service1
# - service2
# dashboardFilterPattern:
# includes:
# - dashboard1

View File

@ -0,0 +1,44 @@
---
title: Upgrade 1.3.x to 1.4.x
slug: /deployment/upgrade/versions/130-to-140
---
# Upgrade from 1.3.x to 1.4.x
Upgrading from 1.3.x to 1.4.x can be done directly on your instances. This page will list few general details you should take into consideration when running the upgrade.
## Deprecation Notice
## Breaking Changes for 1.4.x Stable Release
### Dashboard & Pipeline Source Config Changes
We have restructured how we input the dbServiceNames field, used for creating lineage between pipeline(spline) and tables & dashboard data models and tables. This change was done in order to highlight the field on UI and improve user experience.
Please make note of changes in your yaml.
Before:
```
sourceConfig:
config:
type: DashboardMetadata # or PipelineMetadata
.....
dbServiceNames:
- redshift_prod
.....
```
After 1.4.0 Upgrade:
```
sourceConfig:
config:
type: DashboardMetadata # or PipelineMetadata
.....
lineageInformation:
dbServiceNames:
- redshift_prod
.....
```

View File

@ -18,6 +18,21 @@
"$ref": "#/definitions/dashboardMetadataConfigType",
"default": "DashboardMetadata"
},
"lineageInformation": {
"description": "Details required to generate Lineage",
"type": "object",
"title": "Lineage Information",
"properties": {
"dbServiceNames": {
"title": "Database Service Names List",
"description": "List of Database Service Names for creation of lineage",
"type": "array",
"items": {
"type": "string"
}
}
}
},
"dashboardFilterPattern": {
"description": "Regex to exclude or include dashboards that matches the pattern.",
"$ref": "../type/filterPattern.json#/definitions/filterPattern",
@ -38,14 +53,6 @@
"$ref": "../type/filterPattern.json#/definitions/filterPattern",
"title": "Project Filter Pattern"
},
"dbServiceNames": {
"title": "Database Service Names List",
"description": "List of Database Service Names for creation of lineage",
"type": "array",
"items": {
"type": "string"
}
},
"includeOwners": {
"title": "Include Current Owners",
"description": "Enabling a flag will replace the current owner with a new owner from the source during metadata ingestion, if the current owner is null. It is recommended to keep the flag enabled to obtain the owner information during the first metadata ingestion.",

View File

@ -35,12 +35,19 @@
"$ref": "../type/filterPattern.json#/definitions/filterPattern",
"title": "Pipeline Filter Pattern"
},
"dbServiceNames": {
"title": "Database Service Names List",
"description": "List of Database Service Names for creation of lineage",
"type": "array",
"items": {
"type": "string"
"lineageInformation": {
"description": "Details required to generate Lineage",
"type": "object",
"title": "Lineage Information",
"properties": {
"dbServiceNames": {
"title": "Database Service Names List",
"description": "List of Database Service Names for creation of lineage",
"type": "array",
"items": {
"type": "string"
}
}
}
},
"markDeletedPipelines": {

View File

@ -40,7 +40,9 @@ const MOCK_WORKFLOW_DATA = {
includes: [],
excludes: [],
},
dbServiceNames: [],
lineageInformation: {
dbServiceNames: [],
},
includeOwners: false,
markDeletedDashboards: true,
markDeletedDataModels: true,
@ -49,7 +51,9 @@ const MOCK_WORKFLOW_DATA = {
} as Pipeline;
const MOCK_CLEANED_WORKFLOW_DATA = {
dbServiceNames: [],
lineageInformation: {
dbServiceNames: [],
},
includeDataModels: true,
includeOwners: false,
includeTags: true,