mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-07 21:16:45 +00:00
Added source url to entities (#12901)
* Added source url to entites * added support to create and update sourceUrl * fixed pytests --------- Co-authored-by: 07Himank <himank07mehta@gmail.com>
This commit is contained in:
parent
f9e3e1801f
commit
5b47fd4acf
@ -66,7 +66,7 @@ class DomodashboardSource(DashboardServiceSource):
|
||||
connection: DomoDashboardConnection = config.serviceConnection.__root__.config
|
||||
if not isinstance(connection, DomoDashboardConnection):
|
||||
raise InvalidSourceException(
|
||||
f"Expected MetabaseConnection, but got {connection}"
|
||||
f"Expected DomoDashboardConnection, but got {connection}"
|
||||
)
|
||||
return cls(config, metadata_config)
|
||||
|
||||
@ -191,7 +191,7 @@ class DomodashboardSource(DashboardServiceSource):
|
||||
|
||||
def yield_dashboard_chart(
|
||||
self, dashboard_details: DomoDashboardDetails
|
||||
) -> Optional[Iterable[CreateChartRequest]]:
|
||||
) -> Iterable[Optional[CreateChartRequest]]:
|
||||
chart_ids = dashboard_details.cardIds
|
||||
chart_id_from_collection = self.get_chart_ids(dashboard_details.collectionIds)
|
||||
chart_ids.extend(chart_id_from_collection)
|
||||
|
||||
@ -303,6 +303,10 @@ class BigquerySource(CommonDbSourceService):
|
||||
name=schema_name,
|
||||
database=self.context.database.fullyQualifiedName,
|
||||
description=self.get_schema_description(schema_name),
|
||||
sourceUrl=self.get_source_url(
|
||||
database_name=self.context.database.name.__root__,
|
||||
schema_name=schema_name,
|
||||
),
|
||||
)
|
||||
|
||||
dataset_obj = self.client.get_dataset(schema_name)
|
||||
@ -467,15 +471,30 @@ class BigquerySource(CommonDbSourceService):
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
database_name: str,
|
||||
schema_name: str,
|
||||
table_name: str,
|
||||
table_type: TableType,
|
||||
database_name: Optional[str] = None,
|
||||
schema_name: Optional[str] = None,
|
||||
table_name: Optional[str] = None,
|
||||
table_type: Optional[TableType] = None,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Method to get the source url for bigquery
|
||||
"""
|
||||
return (
|
||||
f"https://console.cloud.google.com/bigquery?project={database_name}"
|
||||
f"&ws=!1m5!1m4!4m3!1s{database_name}!2s{schema_name}!3s{table_name}"
|
||||
)
|
||||
try:
|
||||
bigquery_host = "https://console.cloud.google.com/"
|
||||
database_url = f"{bigquery_host}bigquery?project={database_name}"
|
||||
|
||||
schema_table_url = None
|
||||
if schema_name:
|
||||
schema_table_url = f"&ws=!1m4!1m3!3m2!1s{database_name}!2s{schema_name}"
|
||||
if table_name:
|
||||
schema_table_url = (
|
||||
f"&ws=!1m5!1m4!4m3!1s{database_name}"
|
||||
f"!2s{schema_name}!3s{table_name}"
|
||||
)
|
||||
if schema_table_url:
|
||||
return f"{database_url}{schema_table_url}"
|
||||
return database_url
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get source url: {exc}")
|
||||
return None
|
||||
|
||||
@ -163,6 +163,7 @@ class CommonDbSourceService(
|
||||
name=database_name,
|
||||
service=self.context.database_service.fullyQualifiedName,
|
||||
description=self.get_database_description(database_name),
|
||||
sourceUrl=self.get_source_url(database_name=database_name),
|
||||
)
|
||||
|
||||
def get_raw_database_schema_names(self) -> Iterable[str]:
|
||||
@ -190,6 +191,10 @@ class CommonDbSourceService(
|
||||
name=schema_name,
|
||||
database=self.context.database.fullyQualifiedName,
|
||||
description=self.get_schema_description(schema_name),
|
||||
sourceUrl=self.get_source_url(
|
||||
database_name=self.context.database.name.__root__,
|
||||
schema_name=schema_name,
|
||||
),
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
@ -523,10 +528,10 @@ class CommonDbSourceService(
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
database_name: str,
|
||||
schema_name: str,
|
||||
table_name: str,
|
||||
table_type: TableType,
|
||||
database_name: Optional[str] = None,
|
||||
schema_name: Optional[str] = None,
|
||||
table_name: Optional[str] = None,
|
||||
table_type: Optional[TableType] = None,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
By default the source url is not supported for
|
||||
|
||||
@ -94,6 +94,7 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
|
||||
yield CreateDatabaseRequest(
|
||||
name=database_name,
|
||||
service=self.context.database_service.fullyQualifiedName.__root__,
|
||||
sourceUrl=self.get_source_url(database_name=database_name),
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
@ -133,6 +134,10 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
|
||||
yield CreateDatabaseSchemaRequest(
|
||||
name=schema_name,
|
||||
database=self.context.database.fullyQualifiedName.__root__,
|
||||
sourceUrl=self.get_source_url(
|
||||
database_name=self.context.database.name.__root__,
|
||||
schema_name=schema_name,
|
||||
),
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
@ -202,6 +207,12 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
|
||||
columns=columns,
|
||||
tableConstraints=None,
|
||||
databaseSchema=self.context.database_schema.fullyQualifiedName.__root__,
|
||||
sourceUrl=self.get_source_url(
|
||||
database_name=self.context.database.name.__root__,
|
||||
schema_name=schema_name,
|
||||
table_name=table_name,
|
||||
table_type=table_type,
|
||||
),
|
||||
)
|
||||
|
||||
yield table_request
|
||||
@ -223,6 +234,17 @@ class CommonNoSQLSource(DatabaseServiceSource, ABC):
|
||||
tags are not supported with NoSQL
|
||||
"""
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
database_name: Optional[str] = None,
|
||||
schema_name: Optional[str] = None,
|
||||
table_name: Optional[str] = None,
|
||||
table_type: Optional[TableType] = None,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
By default the source url is not supported for
|
||||
"""
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
By default there is nothing to close
|
||||
|
||||
@ -51,6 +51,7 @@ from metadata.ingestion.source.database.domodatabase.models import (
|
||||
from metadata.utils import fqn
|
||||
from metadata.utils.constants import DEFAULT_DATABASE
|
||||
from metadata.utils.filters import filter_by_table
|
||||
from metadata.utils.helpers import clean_uri
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
@ -174,6 +175,9 @@ class DomodatabaseSource(DatabaseServiceSource):
|
||||
owner=self.get_owners(owner=table_object.owner),
|
||||
tableConstraints=table_constraints,
|
||||
databaseSchema=self.context.database_schema.fullyQualifiedName,
|
||||
sourceUrl=self.get_source_url(
|
||||
table_name=table_id,
|
||||
),
|
||||
)
|
||||
yield table_request
|
||||
self.register_record(table_request=table_request)
|
||||
@ -204,6 +208,20 @@ class DomodatabaseSource(DatabaseServiceSource):
|
||||
def yield_view_lineage(self) -> Optional[Iterable[AddLineageRequest]]:
|
||||
yield from []
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
table_name: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Method to get the source url for domodatabase
|
||||
"""
|
||||
try:
|
||||
return f"{clean_uri(self.service_connection.sandboxDomain)}/datasources/{table_name}/details/overview"
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get source url for {table_name}: {exc}")
|
||||
return None
|
||||
|
||||
def standardize_table_name( # pylint: disable=unused-argument
|
||||
self, schema: str, table: str
|
||||
) -> str:
|
||||
|
||||
@ -13,8 +13,9 @@ Dynamo source methods.
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import Dict, List, Union
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from metadata.generated.schema.entity.data.table import TableType
|
||||
from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import (
|
||||
DynamoDBConnection,
|
||||
)
|
||||
@ -103,3 +104,25 @@ class DynamodbSource(CommonNoSQLSource):
|
||||
f"Failed to read DynamoDB attributes for [{table_name}]: {err}"
|
||||
)
|
||||
return []
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
database_name: Optional[str] = None,
|
||||
schema_name: Optional[str] = None,
|
||||
table_name: Optional[str] = None,
|
||||
table_type: Optional[TableType] = None,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Method to get the source url for dynamodb
|
||||
"""
|
||||
try:
|
||||
if table_name:
|
||||
return (
|
||||
f"https://{self.service_connection.awsConfig.awsRegion}."
|
||||
f"console.aws.amazon.com/dynamodbv2/home?region="
|
||||
f"{self.service_connection.awsConfig.awsRegion}#table?name={table_name}"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get source url: {exc}")
|
||||
return None
|
||||
|
||||
@ -194,6 +194,10 @@ class GlueSource(DatabaseServiceSource):
|
||||
yield CreateDatabaseSchemaRequest(
|
||||
name=schema_name,
|
||||
database=self.context.database.fullyQualifiedName,
|
||||
sourceUrl=self.get_source_url(
|
||||
database_name=self.context.database.name.__root__,
|
||||
schema_name=schema_name,
|
||||
),
|
||||
)
|
||||
|
||||
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
|
||||
@ -272,6 +276,11 @@ class GlueSource(DatabaseServiceSource):
|
||||
columns=columns,
|
||||
tableConstraints=table_constraints,
|
||||
databaseSchema=self.context.database_schema.fullyQualifiedName,
|
||||
sourceUrl=self.get_source_url(
|
||||
table_name=table_name,
|
||||
schema_name=self.context.database_schema.name.__root__,
|
||||
database_name=self.context.database.name.__root__,
|
||||
),
|
||||
)
|
||||
yield table_request
|
||||
self.register_record(table_request=table_request)
|
||||
@ -320,5 +329,37 @@ class GlueSource(DatabaseServiceSource):
|
||||
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndClassification]:
|
||||
pass
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
database_name: Optional[str],
|
||||
schema_name: Optional[str] = None,
|
||||
table_name: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Method to get the source url for dynamodb
|
||||
"""
|
||||
try:
|
||||
if schema_name:
|
||||
base_url = (
|
||||
f"https://{self.service_connection.awsConfig.awsRegion}.console.aws.amazon.com/"
|
||||
f"glue/home?region={self.service_connection.awsConfig.awsRegion}#/v2/data-catalog/"
|
||||
)
|
||||
|
||||
schema_url = (
|
||||
f"{base_url}databases/view"
|
||||
f"/{schema_name}?catalogId={database_name}"
|
||||
)
|
||||
if not table_name:
|
||||
return schema_url
|
||||
table_url = (
|
||||
f"{base_url}tables/view/{table_name}"
|
||||
f"?database={schema_name}&catalogId={database_name}&versionId=latest"
|
||||
)
|
||||
return table_url
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get source url: {exc}")
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
@ -193,6 +193,9 @@ class SalesforceSource(DatabaseServiceSource):
|
||||
columns=columns,
|
||||
tableConstraints=table_constraints,
|
||||
databaseSchema=self.context.database_schema.fullyQualifiedName,
|
||||
sourceUrl=self.get_source_url(
|
||||
table_name=table_name,
|
||||
),
|
||||
)
|
||||
yield table_request
|
||||
self.register_record(table_request=table_request)
|
||||
@ -261,6 +264,22 @@ class SalesforceSource(DatabaseServiceSource):
|
||||
def prepare(self):
|
||||
pass
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
table_name: Optional[str] = None,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Method to get the source url for salesforce
|
||||
"""
|
||||
try:
|
||||
instance_url = self.client.sf_instance
|
||||
if instance_url:
|
||||
return f"https://{instance_url}/lightning/o/{table_name}/list"
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get source url for {table_name}: {exc}")
|
||||
return None
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
@ -393,22 +393,30 @@ class SnowflakeSource(CommonDbSourceService):
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
database_name: str,
|
||||
schema_name: str,
|
||||
table_name: str,
|
||||
table_type: TableType,
|
||||
database_name: Optional[str] = None,
|
||||
schema_name: Optional[str] = None,
|
||||
table_name: Optional[str] = None,
|
||||
table_type: Optional[TableType] = None,
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Method to get the source url for snowflake
|
||||
"""
|
||||
account = self._get_current_account()
|
||||
region_id = self._get_current_region()
|
||||
region_name = self._clean_region_name(region_id)
|
||||
if account and region_name:
|
||||
tab_type = "view" if table_type == TableType.View else "table"
|
||||
return (
|
||||
f"https://app.snowflake.com/{region_name.lower()}/{account.lower()}/#/"
|
||||
f"data/databases/{database_name}/schemas"
|
||||
f"/{schema_name}/{tab_type}/{table_name}"
|
||||
)
|
||||
try:
|
||||
account = self._get_current_account()
|
||||
region_id = self._get_current_region()
|
||||
region_name = self._clean_region_name(region_id)
|
||||
if account and region_name:
|
||||
tab_type = "view" if table_type == TableType.View else "table"
|
||||
url = (
|
||||
f"https://app.snowflake.com/{region_name.lower()}"
|
||||
f"/{account.lower()}/#/data/databases/{database_name}"
|
||||
)
|
||||
if schema_name:
|
||||
url = f"{url}/schemas/{schema_name}"
|
||||
if table_name:
|
||||
url = f"{url}/{tab_type}/{table_name}"
|
||||
return url
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get source url: {exc}")
|
||||
return None
|
||||
|
||||
@ -117,6 +117,12 @@ class KinesisSource(MessagingServiceSource):
|
||||
try:
|
||||
logger.info(f"Fetching topic details {topic_details.topic_name}")
|
||||
|
||||
source_url = (
|
||||
f"https://{self.service_connection.awsConfig.awsRegion}.console.aws.amazon.com/kinesis/home"
|
||||
f"?region={self.service_connection.awsConfig.awsRegion}#/streams/details/"
|
||||
f"{topic_details.topic_name}/monitoring"
|
||||
)
|
||||
|
||||
topic = CreateTopicRequest(
|
||||
name=topic_details.topic_name,
|
||||
service=self.context.messaging_service.fullyQualifiedName.__root__,
|
||||
@ -125,6 +131,7 @@ class KinesisSource(MessagingServiceSource):
|
||||
topic_details.topic_metadata.summary
|
||||
),
|
||||
maximumMessageSize=self._get_max_message_size(),
|
||||
sourceUrl=source_url,
|
||||
)
|
||||
self.register_record(topic_request=topic)
|
||||
yield topic
|
||||
|
||||
@ -38,6 +38,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
from metadata.ingestion.api.source import InvalidSourceException
|
||||
from metadata.ingestion.source.mlmodel.mlmodel_service import MlModelServiceSource
|
||||
from metadata.utils.filters import filter_by_mlmodel
|
||||
from metadata.utils.helpers import clean_uri
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
@ -107,6 +108,11 @@ class MlflowSource(MlModelServiceSource):
|
||||
|
||||
run = self.client.get_run(latest_version.run_id)
|
||||
|
||||
source_url = (
|
||||
f"{clean_uri(self.service_connection.trackingUri)}/"
|
||||
f"#/models/{model.name}"
|
||||
)
|
||||
|
||||
mlmodel_request = CreateMlModelRequest(
|
||||
name=model.name,
|
||||
description=model.description,
|
||||
@ -117,6 +123,7 @@ class MlflowSource(MlModelServiceSource):
|
||||
),
|
||||
mlStore=self._get_ml_store(latest_version),
|
||||
service=self.context.mlmodel_service.fullyQualifiedName,
|
||||
sourceUrl=source_url,
|
||||
)
|
||||
yield mlmodel_request
|
||||
self.register_record(mlmodel_request=mlmodel_request)
|
||||
|
||||
@ -40,6 +40,7 @@ from metadata.ingestion.source.pipeline.dagster.models import (
|
||||
SolidHandle,
|
||||
)
|
||||
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
||||
from metadata.utils.helpers import clean_uri
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
|
||||
|
||||
@ -99,6 +100,9 @@ class DagsterSource(PipelineServiceSource):
|
||||
name=job.handleID,
|
||||
displayName=job.handleID,
|
||||
downstreamTasks=self._get_downstream_tasks(job=job),
|
||||
sourceUrl=self.get_source_url(
|
||||
pipeline_name=pipeline_name, task_name=job.handleID
|
||||
),
|
||||
)
|
||||
task_list.append(task)
|
||||
except Exception as exc:
|
||||
@ -131,6 +135,9 @@ class DagsterSource(PipelineServiceSource):
|
||||
classification_name=DAGSTER_TAG_CATEGORY,
|
||||
include_tags=self.source_config.includeTags,
|
||||
),
|
||||
sourceUrl=self.get_source_url(
|
||||
pipeline_name=pipeline_details.name, task_name=None
|
||||
),
|
||||
)
|
||||
yield pipeline_request
|
||||
self.register_record(pipeline_request=pipeline_request)
|
||||
@ -138,7 +145,7 @@ class DagsterSource(PipelineServiceSource):
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Error to yield pipeline for {pipeline_details}: {exc}")
|
||||
|
||||
def yield_tag(self, *_, **__) -> OMetaTagAndClassification:
|
||||
def yield_tag(self, *_, **__) -> Iterable[OMetaTagAndClassification]:
|
||||
yield from get_ometa_tag_and_classification(
|
||||
tags=[self.context.repository_name],
|
||||
classification_name=DAGSTER_TAG_CATEGORY,
|
||||
@ -181,7 +188,7 @@ class DagsterSource(PipelineServiceSource):
|
||||
|
||||
def yield_pipeline_status(
|
||||
self, pipeline_details: DagsterPipeline
|
||||
) -> OMetaPipelineStatus:
|
||||
) -> Iterable[OMetaPipelineStatus]:
|
||||
"""
|
||||
Yield the pipeline and task status
|
||||
"""
|
||||
@ -233,5 +240,24 @@ class DagsterSource(PipelineServiceSource):
|
||||
|
||||
return pipeline_details.name
|
||||
|
||||
def get_source_url(
|
||||
self, pipeline_name: str, task_name: Optional[str]
|
||||
) -> Optional[str]:
|
||||
"""
|
||||
Method to get source url for pipelines and tasks for dagster
|
||||
"""
|
||||
try:
|
||||
url = (
|
||||
f"{clean_uri(self.service_connection.host)}/locations/"
|
||||
f"{self.context.repository_location}/jobs/{pipeline_name}/"
|
||||
)
|
||||
if task_name:
|
||||
url = f"{url}{task_name}"
|
||||
return url
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Error to get pipeline url: {exc}")
|
||||
return None
|
||||
|
||||
def test_connection(self) -> None:
|
||||
pass
|
||||
|
||||
@ -37,6 +37,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
from metadata.ingestion.api.source import InvalidSourceException
|
||||
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
|
||||
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
|
||||
from metadata.utils.helpers import clean_uri
|
||||
from metadata.utils.logger import ingestion_logger
|
||||
|
||||
logger = ingestion_logger()
|
||||
@ -75,10 +76,12 @@ class DomopipelineSource(PipelineServiceSource):
|
||||
def yield_pipeline(self, pipeline_details) -> Iterable[CreatePipelineRequest]:
|
||||
try:
|
||||
pipeline_name = pipeline_details["id"]
|
||||
source_url = self.get_source_url(pipeline_id=pipeline_name)
|
||||
task = Task(
|
||||
name=pipeline_name,
|
||||
displayName=pipeline_details.get("name"),
|
||||
description=pipeline_details.get("description", ""),
|
||||
sourceUrl=source_url,
|
||||
)
|
||||
|
||||
pipeline_request = CreatePipelineRequest(
|
||||
@ -88,6 +91,7 @@ class DomopipelineSource(PipelineServiceSource):
|
||||
tasks=[task],
|
||||
service=self.context.pipeline_service.fullyQualifiedName.__root__,
|
||||
startDate=pipeline_details.get("created"),
|
||||
sourceUrl=source_url,
|
||||
)
|
||||
yield pipeline_request
|
||||
self.register_record(pipeline_request=pipeline_request)
|
||||
@ -113,7 +117,7 @@ class DomopipelineSource(PipelineServiceSource):
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
return
|
||||
|
||||
def yield_pipeline_status(self, pipeline_details) -> OMetaPipelineStatus:
|
||||
def yield_pipeline_status(self, pipeline_details) -> Iterable[OMetaPipelineStatus]:
|
||||
pipeline_id = pipeline_details.get("id")
|
||||
if not pipeline_id:
|
||||
logger.debug(
|
||||
@ -158,3 +162,17 @@ class DomopipelineSource(PipelineServiceSource):
|
||||
logger.debug(traceback.format_exc())
|
||||
|
||||
return None
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
pipeline_id: str,
|
||||
) -> Optional[str]:
|
||||
try:
|
||||
return (
|
||||
f"{clean_uri(self.service_connection.sandboxDomain)}/datacenter/dataflows/"
|
||||
f"{pipeline_id}/details#history"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get source url for {pipeline_id}: {exc}")
|
||||
return None
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
Airbyte source to extract metadata
|
||||
"""
|
||||
|
||||
import traceback
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
@ -71,7 +72,7 @@ class FivetranSource(PipelineServiceSource):
|
||||
connection: FivetranConnection = config.serviceConnection.__root__.config
|
||||
if not isinstance(connection, FivetranConnection):
|
||||
raise InvalidSourceException(
|
||||
f"Expected AirbyteConnection, but got {connection}"
|
||||
f"Expected FivetranConnection, but got {connection}"
|
||||
)
|
||||
return cls(config, metadata_config)
|
||||
|
||||
@ -99,6 +100,11 @@ class FivetranSource(PipelineServiceSource):
|
||||
displayName=pipeline_details.pipeline_display_name,
|
||||
tasks=self.get_connections_jobs(pipeline_details),
|
||||
service=self.context.pipeline_service.fullyQualifiedName.__root__,
|
||||
sourceUrl=self.get_source_url(
|
||||
connector_id=pipeline_details.source.get("id"),
|
||||
group_id=pipeline_details.group.get("id"),
|
||||
source_name=pipeline_details.source.get("service"),
|
||||
),
|
||||
)
|
||||
yield pipeline_request
|
||||
self.register_record(pipeline_request=pipeline_request)
|
||||
@ -112,7 +118,7 @@ class FivetranSource(PipelineServiceSource):
|
||||
|
||||
def yield_pipeline_lineage_details(
|
||||
self, pipeline_details: FivetranPipelineDetails
|
||||
) -> Optional[Iterable[AddLineageRequest]]:
|
||||
) -> Iterable[AddLineageRequest]:
|
||||
"""
|
||||
Parse all the stream available in the connection and create a lineage between them
|
||||
:param pipeline_details: pipeline_details object from airbyte
|
||||
@ -191,3 +197,20 @@ class FivetranSource(PipelineServiceSource):
|
||||
Get Pipeline Name
|
||||
"""
|
||||
return pipeline_details.pipeline_name
|
||||
|
||||
def get_source_url(
|
||||
self,
|
||||
connector_id: Optional[str],
|
||||
group_id: Optional[str],
|
||||
source_name: Optional[str],
|
||||
) -> Optional[str]:
|
||||
try:
|
||||
if connector_id and group_id and source_name:
|
||||
return (
|
||||
f"https://fivetran.com/dashboard/connectors/{connector_id}/status"
|
||||
f"?groupId={group_id}&service={source_name}"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get source url: {exc}")
|
||||
return None
|
||||
|
||||
@ -94,12 +94,18 @@ class GluepipelineSource(PipelineServiceSource):
|
||||
"""
|
||||
Method to Get Pipeline Entity
|
||||
"""
|
||||
source_url = (
|
||||
f"https://{self.service_connection.awsConfig.awsRegion}.console.aws.amazon.com/glue/home?"
|
||||
f"region={self.service_connection.awsConfig.awsRegion}#/v2/etl-configuration/"
|
||||
f"workflows/view/{pipeline_details[NAME]}"
|
||||
)
|
||||
self.job_name_list = set()
|
||||
pipeline_request = CreatePipelineRequest(
|
||||
name=pipeline_details[NAME],
|
||||
displayName=pipeline_details[NAME],
|
||||
tasks=self.get_tasks(pipeline_details),
|
||||
service=self.context.pipeline_service.fullyQualifiedName.__root__,
|
||||
sourceUrl=source_url,
|
||||
)
|
||||
yield pipeline_request
|
||||
self.register_record(pipeline_request=pipeline_request)
|
||||
|
||||
@ -144,6 +144,7 @@ class S3Source(StorageServiceSource):
|
||||
dataModel=container_details.data_model,
|
||||
service=self.context.objectstore_service.fullyQualifiedName,
|
||||
parent=container_details.parent,
|
||||
sourceUrl=container_details.sourceUrl,
|
||||
)
|
||||
|
||||
def _generate_container_details(
|
||||
@ -184,6 +185,10 @@ class S3Source(StorageServiceSource):
|
||||
isPartitioned=metadata_entry.isPartitioned, columns=columns
|
||||
),
|
||||
parent=parent,
|
||||
sourceUrl=self._get_object_source_url(
|
||||
bucket_name=bucket_name,
|
||||
prefix=metadata_entry.dataPath.strip(KEY_SEPARATOR),
|
||||
),
|
||||
)
|
||||
return None
|
||||
|
||||
@ -292,6 +297,7 @@ class S3Source(StorageServiceSource):
|
||||
),
|
||||
file_formats=[],
|
||||
data_model=None,
|
||||
sourceUrl=self._get_bucket_source_url(bucket_name=bucket_response.name),
|
||||
)
|
||||
|
||||
def _get_sample_file_path(
|
||||
@ -350,3 +356,47 @@ class S3Source(StorageServiceSource):
|
||||
f"Failed when trying to check if S3 prefix {prefix} exists in bucket {bucket_name}"
|
||||
)
|
||||
return False
|
||||
|
||||
def get_aws_bucket_region(self, bucket_name: str) -> str:
|
||||
"""
|
||||
Method to fetch the bucket region
|
||||
"""
|
||||
region = None
|
||||
try:
|
||||
region_resp = self.s3_client.get_bucket_location(Bucket=bucket_name)
|
||||
region = region_resp.get("LocationConstraint")
|
||||
except Exception:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Unable to get the region for bucket: {bucket_name}")
|
||||
return region or self.service_connection.awsConfig.awsRegion
|
||||
|
||||
def _get_bucket_source_url(self, bucket_name: str) -> Optional[str]:
|
||||
"""
|
||||
Method to get the source url of s3 bucket
|
||||
"""
|
||||
try:
|
||||
region = self.get_aws_bucket_region(bucket_name=bucket_name)
|
||||
return (
|
||||
f"https://s3.console.aws.amazon.com/s3/buckets/{bucket_name}"
|
||||
f"?region={region}&tab=objects"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get source url: {exc}")
|
||||
return None
|
||||
|
||||
def _get_object_source_url(self, bucket_name: str, prefix: str) -> Optional[str]:
|
||||
"""
|
||||
Method to get the source url of s3 bucket
|
||||
"""
|
||||
try:
|
||||
region = self.get_aws_bucket_region(bucket_name=bucket_name)
|
||||
return (
|
||||
f"https://s3.console.aws.amazon.com/s3/buckets/{bucket_name}"
|
||||
f"?region={region}&prefix={prefix}/"
|
||||
f"&showversions=false"
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(f"Unable to get source url: {exc}")
|
||||
return None
|
||||
|
||||
@ -20,6 +20,7 @@ from metadata.generated.schema.entity.data.container import (
|
||||
ContainerDataModel,
|
||||
FileFormat,
|
||||
)
|
||||
from metadata.generated.schema.type import basic
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
|
||||
|
||||
@ -74,3 +75,6 @@ class S3ContainerDetails(BaseModel):
|
||||
None,
|
||||
description="Reference to the parent container",
|
||||
)
|
||||
sourceUrl: Optional[basic.SourceUrl] = Field(
|
||||
None, description="Source URL of the container."
|
||||
)
|
||||
|
||||
@ -32,7 +32,7 @@ from metadata.generated.schema.entity.services.pipelineService import (
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
OpenMetadataWorkflowConfig,
|
||||
)
|
||||
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
|
||||
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SourceUrl
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.generated.schema.type.tagLabel import (
|
||||
LabelType,
|
||||
@ -77,7 +77,6 @@ EXPECTED_CREATED_PIPELINES = [
|
||||
name="graph5164c131c3524a271e7ecce49766d50a479b5ff4",
|
||||
displayName="story_recommender_job",
|
||||
description=None,
|
||||
sourceUrl=None,
|
||||
concurrency=None,
|
||||
pipelineLocation=None,
|
||||
startDate=None,
|
||||
@ -87,46 +86,51 @@ EXPECTED_CREATED_PIPELINES = [
|
||||
displayName="s3__recommender__recommender_model",
|
||||
fullyQualifiedName=None,
|
||||
description=None,
|
||||
sourceUrl=None,
|
||||
downstreamTasks=["s3__recommender__user_story_matrix"],
|
||||
taskType=None,
|
||||
taskSQL=None,
|
||||
startDate=None,
|
||||
endDate=None,
|
||||
tags=None,
|
||||
sourceUrl=SourceUrl(
|
||||
__root__="http://lolhost:3000/locations/project_fully_featured/jobs/story_recommender_job/s3__recommender__recommender_model"
|
||||
),
|
||||
),
|
||||
Task(
|
||||
name="s3__recommender__user_story_matrix",
|
||||
displayName="s3__recommender__user_story_matrix",
|
||||
fullyQualifiedName=None,
|
||||
description=None,
|
||||
sourceUrl=None,
|
||||
downstreamTasks=["snowflake__recommender__comment_stories"],
|
||||
taskType=None,
|
||||
taskSQL=None,
|
||||
startDate=None,
|
||||
endDate=None,
|
||||
tags=None,
|
||||
sourceUrl=SourceUrl(
|
||||
__root__="http://lolhost:3000/locations/project_fully_featured/jobs/story_recommender_job/s3__recommender__user_story_matrix"
|
||||
),
|
||||
),
|
||||
Task(
|
||||
name="snowflake__recommender__comment_stories",
|
||||
displayName="snowflake__recommender__comment_stories",
|
||||
fullyQualifiedName=None,
|
||||
description=None,
|
||||
sourceUrl=None,
|
||||
downstreamTasks=None,
|
||||
taskType=None,
|
||||
taskSQL=None,
|
||||
startDate=None,
|
||||
endDate=None,
|
||||
tags=None,
|
||||
sourceUrl=SourceUrl(
|
||||
__root__="http://lolhost:3000/locations/project_fully_featured/jobs/story_recommender_job/snowflake__recommender__comment_stories"
|
||||
),
|
||||
),
|
||||
Task(
|
||||
name="snowflake__recommender__component_top_stories",
|
||||
displayName="snowflake__recommender__component_top_stories",
|
||||
fullyQualifiedName=None,
|
||||
description=None,
|
||||
sourceUrl=None,
|
||||
downstreamTasks=[
|
||||
"s3__recommender__recommender_model",
|
||||
"s3__recommender__user_story_matrix",
|
||||
@ -136,13 +140,15 @@ EXPECTED_CREATED_PIPELINES = [
|
||||
startDate=None,
|
||||
endDate=None,
|
||||
tags=None,
|
||||
sourceUrl=SourceUrl(
|
||||
__root__="http://lolhost:3000/locations/project_fully_featured/jobs/story_recommender_job/snowflake__recommender__component_top_stories"
|
||||
),
|
||||
),
|
||||
Task(
|
||||
name="snowflake__recommender__user_top_recommended_stories",
|
||||
displayName="snowflake__recommender__user_top_recommended_stories",
|
||||
fullyQualifiedName=None,
|
||||
description=None,
|
||||
sourceUrl=None,
|
||||
downstreamTasks=[
|
||||
"s3__recommender__recommender_model",
|
||||
"s3__recommender__user_story_matrix",
|
||||
@ -152,6 +158,9 @@ EXPECTED_CREATED_PIPELINES = [
|
||||
startDate=None,
|
||||
endDate=None,
|
||||
tags=None,
|
||||
sourceUrl=SourceUrl(
|
||||
__root__="http://lolhost:3000/locations/project_fully_featured/jobs/story_recommender_job/snowflake__recommender__user_top_recommended_stories"
|
||||
),
|
||||
),
|
||||
],
|
||||
tags=[
|
||||
@ -167,6 +176,9 @@ EXPECTED_CREATED_PIPELINES = [
|
||||
owner=None,
|
||||
service="dagster_source_test",
|
||||
extension=None,
|
||||
sourceUrl=SourceUrl(
|
||||
__root__="http://lolhost:3000/locations/project_fully_featured/jobs/story_recommender_job/"
|
||||
),
|
||||
),
|
||||
]
|
||||
MOCK_CONNECTION_URI_PATH = (
|
||||
|
||||
@ -26,7 +26,7 @@ from metadata.generated.schema.entity.services.pipelineService import (
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
OpenMetadataWorkflowConfig,
|
||||
)
|
||||
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
|
||||
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SourceUrl
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.source.pipeline.fivetran.metadata import (
|
||||
FivetranPipelineDetails,
|
||||
@ -82,6 +82,9 @@ EXPECTED_CREATED_PIPELINES = CreatePipelineRequest(
|
||||
)
|
||||
],
|
||||
service=FullyQualifiedEntityName(__root__="fivetran_source"),
|
||||
sourceUrl=SourceUrl(
|
||||
__root__="https://fivetran.com/dashboard/connectors/aiding_pointless/status?groupId=wackiness_remote&service=postgres_rds"
|
||||
),
|
||||
)
|
||||
|
||||
MOCK_PIPELINE_SERVICE = PipelineService(
|
||||
|
||||
@ -34,6 +34,7 @@ from metadata.generated.schema.metadataIngestion.storage.containerMetadataConfig
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
OpenMetadataWorkflowConfig,
|
||||
)
|
||||
from metadata.generated.schema.type.basic import SourceUrl
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.ingestion.api.source import InvalidSourceException
|
||||
from metadata.ingestion.source.storage.s3.metadata import (
|
||||
@ -211,6 +212,9 @@ class StorageUnitTest(TestCase):
|
||||
file_formats=[],
|
||||
data_model=None,
|
||||
creation_date=bucket_response.creation_date.isoformat(),
|
||||
sourceUrl=SourceUrl(
|
||||
__root__="https://s3.console.aws.amazon.com/s3/buckets/test_bucket?region=us-east-1&tab=objects"
|
||||
),
|
||||
),
|
||||
self.object_store_source._generate_unstructured_container(
|
||||
bucket_response=bucket_response
|
||||
@ -252,6 +256,9 @@ class StorageUnitTest(TestCase):
|
||||
data_model=ContainerDataModel(isPartitioned=False, columns=columns),
|
||||
creation_date=datetime.datetime(2000, 1, 1).isoformat(),
|
||||
parent=entity_ref,
|
||||
sourceUrl=SourceUrl(
|
||||
__root__="https://s3.console.aws.amazon.com/s3/buckets/test_bucket?region=us-east-1&prefix=transactions/&showversions=false"
|
||||
),
|
||||
),
|
||||
self.object_store_source._generate_container_details(
|
||||
S3BucketResponse(
|
||||
|
||||
@ -282,6 +282,7 @@ public class ContainerRepository extends EntityRepository<Container> {
|
||||
EntityUtil.objectMatch,
|
||||
false);
|
||||
recordChange("size", original.getSize(), updated.getSize(), false, EntityUtil.objectMatch, false);
|
||||
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
|
||||
}
|
||||
|
||||
private void updateDataModel(Container original, Container updated) {
|
||||
|
||||
@ -23,7 +23,6 @@ import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository.EntityUpdater;
|
||||
import org.openmetadata.service.resources.databases.DatabaseResource;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.EntityUtil.Fields;
|
||||
@ -118,6 +117,7 @@ public class DatabaseRepository extends EntityRepository<Database> {
|
||||
@Override
|
||||
public void entitySpecificUpdate() {
|
||||
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());
|
||||
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,6 +135,7 @@ public class DatabaseSchemaRepository extends EntityRepository<DatabaseSchema> {
|
||||
@Override
|
||||
public void entitySpecificUpdate() {
|
||||
recordChange("retentionPeriod", original.getRetentionPeriod(), updated.getRetentionPeriod());
|
||||
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -40,7 +40,6 @@ import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TaskDetails;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository.EntityUpdater;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.resources.mlmodels.MlModelResource;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
@ -298,6 +297,7 @@ public class MlModelRepository extends EntityRepository<MlModel> {
|
||||
updateMlStore(original, updated);
|
||||
updateServer(original, updated);
|
||||
updateTarget(original, updated);
|
||||
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
|
||||
}
|
||||
|
||||
private void updateAlgorithm(MlModel origModel, MlModel updatedModel) {
|
||||
|
||||
@ -980,6 +980,7 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
recordChange("tableType", origTable.getTableType(), updatedTable.getTableType());
|
||||
updateConstraints(origTable, updatedTable);
|
||||
updateColumns("columns", origTable.getColumns(), updated.getColumns(), EntityUtil.columnMatch);
|
||||
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
|
||||
}
|
||||
|
||||
private void updateConstraints(Table origTable, Table updatedTable) {
|
||||
|
||||
@ -45,7 +45,6 @@ import org.openmetadata.schema.type.topic.CleanupPolicy;
|
||||
import org.openmetadata.schema.type.topic.TopicSampleData;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository.EntityUpdater;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser;
|
||||
import org.openmetadata.service.resources.topics.TopicResource;
|
||||
import org.openmetadata.service.security.mask.PIIMasker;
|
||||
@ -374,6 +373,7 @@ public class TopicRepository extends EntityRepository<Topic> {
|
||||
}
|
||||
recordChange("topicConfig", original.getTopicConfig(), updated.getTopicConfig());
|
||||
updateCleanupPolicies(original, updated);
|
||||
recordChange("sourceUrl", original.getSourceUrl(), updated.getSourceUrl());
|
||||
}
|
||||
|
||||
private void updateCleanupPolicies(Topic original, Topic updated) {
|
||||
|
||||
@ -382,6 +382,7 @@ public class DatabaseResource extends EntityResource<Database, DatabaseRepositor
|
||||
private Database getDatabase(CreateDatabase create, String user) {
|
||||
return copy(new Database(), create, user)
|
||||
.withService(getEntityReference(Entity.DATABASE_SERVICE, create.getService()))
|
||||
.withSourceUrl(create.getSourceUrl())
|
||||
.withRetentionPeriod(create.getRetentionPeriod());
|
||||
}
|
||||
}
|
||||
|
||||
@ -387,6 +387,7 @@ public class DatabaseSchemaResource extends EntityResource<DatabaseSchema, Datab
|
||||
return copy(new DatabaseSchema(), create, user)
|
||||
.withDatabase(getEntityReference(Entity.DATABASE, create.getDatabase()))
|
||||
.withTags(create.getTags())
|
||||
.withSourceUrl(create.getSourceUrl())
|
||||
.withRetentionPeriod(create.getRetentionPeriod());
|
||||
}
|
||||
}
|
||||
|
||||
@ -428,6 +428,7 @@ public class MlModelResource extends EntityResource<MlModel, MlModelRepository>
|
||||
.withMlStore(create.getMlStore())
|
||||
.withServer(create.getServer())
|
||||
.withTarget(create.getTarget())
|
||||
.withSourceUrl(create.getSourceUrl())
|
||||
.withTags(create.getTags());
|
||||
}
|
||||
}
|
||||
|
||||
@ -427,6 +427,7 @@ public class ContainerResource extends EntityResource<Container, ContainerReposi
|
||||
.withNumberOfObjects(create.getNumberOfObjects())
|
||||
.withSize(create.getSize())
|
||||
.withFileFormats(create.getFileFormats())
|
||||
.withSourceUrl(create.getSourceUrl())
|
||||
.withTags(create.getTags());
|
||||
}
|
||||
}
|
||||
|
||||
@ -479,6 +479,7 @@ public class TopicResource extends EntityResource<Topic, TopicRepository> {
|
||||
.withRetentionTime(create.getRetentionTime())
|
||||
.withReplicationFactor(create.getReplicationFactor())
|
||||
.withTopicConfig(create.getTopicConfig())
|
||||
.withSourceUrl(create.getSourceUrl())
|
||||
.withTags(create.getTags());
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user