diff --git a/ingestion/examples/workflows/redshift_usage.json b/ingestion/examples/workflows/redshift_usage.json index e2b33d915b3..28a58df54e1 100644 --- a/ingestion/examples/workflows/redshift_usage.json +++ b/ingestion/examples/workflows/redshift_usage.json @@ -1,14 +1,20 @@ { "source": { "type": "redshift-usage", - "config": { - "host_port": "cluster.name.region.redshift.amazonaws.com:5439", - "username": "username", - "password": "strong_password", - "database": "warehouse", - "where_clause": "and q.label != 'metrics' and q.label != 'health' and q.label != 'cmstats'", - "service_name": "aws_redshift", - "duration": 2 + "serviceName": "aws_redshift_demo_2", + "serviceConnection": { + "config": { + "hostPort": "cluster.name.region.redshift.amazonaws.com:5439", + "username": "username", + "password": "strong_password", + "database": "dev", + "type": "Redshift" + } + }, + "sourceConfig": { + "config": { + "queryLogDuration": "1" + } } }, "processor": { @@ -23,17 +29,16 @@ "filename": "/tmp/redshift_usage" } }, - "bulk_sink": { + "bulkSink": { "type": "metadata-usage", "config": { "filename": "/tmp/redshift_usage" } }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } } diff --git a/ingestion/src/metadata/ingestion/source/redshift.py b/ingestion/src/metadata/ingestion/source/redshift.py index 054262429b4..462816e57da 100644 --- a/ingestion/src/metadata/ingestion/source/redshift.py +++ b/ingestion/src/metadata/ingestion/source/redshift.py @@ -456,10 +456,7 @@ class RedshiftSource(SQLSource): raise InvalidSourceException( f"Expected RedshiftConnection, but got {connection}" ) - if ( - config.sourceConfig.config.sampleDataQuery - == WorkflowSource.sourceConfig.config.sampleDataQuery - ): + if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50": config.sourceConfig.config.sampleDataQuery = 'select * from "{}"."{}"' return cls(config, metadata_config) diff --git a/ingestion/src/metadata/ingestion/source/redshift_usage.py b/ingestion/src/metadata/ingestion/source/redshift_usage.py index 15447b8eb01..b2e8687344a 100644 --- a/ingestion/src/metadata/ingestion/source/redshift_usage.py +++ b/ingestion/src/metadata/ingestion/source/redshift_usage.py @@ -13,16 +13,22 @@ import logging from typing import Any, Dict, Iterable, Iterator, Union +# pylint: disable=useless-super-delegation +from metadata.generated.schema.entity.services.connections.database.redshiftConnection import ( + RedshiftConnection, +) from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.redshift import RedshiftConfig from metadata.ingestion.source.sql_alchemy_helper import ( SQLAlchemyHelper, SQLSourceStatus, @@ -45,18 +51,21 @@ class RedshiftUsageSource(Source[TableQuery]): SERVICE_TYPE = DatabaseServiceType.Redshift.value DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" - def __init__(self, config, metadata_config): + def __init__( + self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig + ): super().__init__() self.config = config + self.service_connection = config.serviceConnection.__root__.config self.metadata_config = metadata_config self.metadata = OpenMetadata(metadata_config) - start, end = get_start_and_end(config.duration) + start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration) self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format( start_time=start, end_time=end ) self.analysis_date = start self.alchemy_helper = SQLAlchemyHelper( - config, metadata_config, "Redshift", self.sql_stmt + self.service_connection, metadata_config, "Redshift", self.sql_stmt ) self._extract_iter: Union[None, Iterator] = None self._database = "redshift" @@ -64,7 +73,12 @@ class RedshiftUsageSource(Source[TableQuery]): @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = RedshiftConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: RedshiftConnection = config.serviceConnection.__root__.config + if not isinstance(connection, RedshiftConnection): + raise InvalidSourceException( + f"Expected RedshiftConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): @@ -91,10 +105,10 @@ class RedshiftUsageSource(Source[TableQuery]): starttime=str(row["starttime"]), endtime=str(row["endtime"]), analysis_date=str(self.analysis_date), - database=row["schema"], + database=self.service_connection.database, aborted=row["aborted"], sql=row["querytxt"], - service_name=self.config.service_name, + service_name=self.config.serviceName, ) yield tq @@ -102,7 +116,7 @@ class RedshiftUsageSource(Source[TableQuery]): "sql": tq.sql, "from_type": "table", "to_type": "table", - "service_name": self.config.service_name, + "service_name": self.config.serviceName, } ingest_lineage(query_info, self.metadata_config)