Redshift Usage Fixed (#3983)

* Redshift Usage Fixed

* renamed source connection
This commit is contained in:
Mayur Singal 2022-04-09 18:50:30 +05:30 committed by GitHub
parent c6bd06944b
commit aa6a3fa8c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 27 deletions

View File

@ -1,14 +1,20 @@
{ {
"source": { "source": {
"type": "redshift-usage", "type": "redshift-usage",
"serviceName": "aws_redshift_demo_2",
"serviceConnection": {
"config": { "config": {
"host_port": "cluster.name.region.redshift.amazonaws.com:5439", "hostPort": "cluster.name.region.redshift.amazonaws.com:5439",
"username": "username", "username": "username",
"password": "strong_password", "password": "strong_password",
"database": "warehouse", "database": "dev",
"where_clause": "and q.label != 'metrics' and q.label != 'health' and q.label != 'cmstats'", "type": "Redshift"
"service_name": "aws_redshift", }
"duration": 2 },
"sourceConfig": {
"config": {
"queryLogDuration": "1"
}
} }
}, },
"processor": { "processor": {
@ -23,17 +29,16 @@
"filename": "/tmp/redshift_usage" "filename": "/tmp/redshift_usage"
} }
}, },
"bulk_sink": { "bulkSink": {
"type": "metadata-usage", "type": "metadata-usage",
"config": { "config": {
"filename": "/tmp/redshift_usage" "filename": "/tmp/redshift_usage"
} }
}, },
"metadata_server": { "workflowConfig": {
"type": "metadata-server", "openMetadataServerConfig": {
"config": { "hostPort": "http://localhost:8585/api",
"api_endpoint": "http://localhost:8585/api", "authProvider": "no-auth"
"auth_provider_type": "no-auth"
} }
} }
} }

View File

@ -456,10 +456,7 @@ class RedshiftSource(SQLSource):
raise InvalidSourceException( raise InvalidSourceException(
f"Expected RedshiftConnection, but got {connection}" f"Expected RedshiftConnection, but got {connection}"
) )
if ( if config.sourceConfig.config.sampleDataQuery == "select * from {}.{} limit 50":
config.sourceConfig.config.sampleDataQuery
== WorkflowSource.sourceConfig.config.sampleDataQuery
):
config.sourceConfig.config.sampleDataQuery = 'select * from "{}"."{}"' config.sourceConfig.config.sampleDataQuery = 'select * from "{}"."{}"'
return cls(config, metadata_config) return cls(config, metadata_config)

View File

@ -13,16 +13,22 @@
import logging import logging
from typing import Any, Dict, Iterable, Iterator, Union from typing import Any, Dict, Iterable, Iterator, Union
# pylint: disable=useless-super-delegation
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
RedshiftConnection,
)
from metadata.generated.schema.entity.services.databaseService import ( from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType, DatabaseServiceType,
) )
from metadata.generated.schema.metadataIngestion.workflow import ( from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig, OpenMetadataServerConfig,
) )
from metadata.ingestion.api.source import Source, SourceStatus from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.table_queries import TableQuery from metadata.ingestion.models.table_queries import TableQuery
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.redshift import RedshiftConfig
from metadata.ingestion.source.sql_alchemy_helper import ( from metadata.ingestion.source.sql_alchemy_helper import (
SQLAlchemyHelper, SQLAlchemyHelper,
SQLSourceStatus, SQLSourceStatus,
@ -45,18 +51,21 @@ class RedshiftUsageSource(Source[TableQuery]):
SERVICE_TYPE = DatabaseServiceType.Redshift.value SERVICE_TYPE = DatabaseServiceType.Redshift.value
DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()" DEFAULT_CLUSTER_SOURCE = "CURRENT_DATABASE()"
def __init__(self, config, metadata_config): def __init__(
self, config: WorkflowSource, metadata_config: OpenMetadataServerConfig
):
super().__init__() super().__init__()
self.config = config self.config = config
self.service_connection = config.serviceConnection.__root__.config
self.metadata_config = metadata_config self.metadata_config = metadata_config
self.metadata = OpenMetadata(metadata_config) self.metadata = OpenMetadata(metadata_config)
start, end = get_start_and_end(config.duration) start, end = get_start_and_end(self.config.sourceConfig.config.queryLogDuration)
self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format( self.sql_stmt = RedshiftUsageSource.SQL_STATEMENT.format(
start_time=start, end_time=end start_time=start, end_time=end
) )
self.analysis_date = start self.analysis_date = start
self.alchemy_helper = SQLAlchemyHelper( self.alchemy_helper = SQLAlchemyHelper(
config, metadata_config, "Redshift", self.sql_stmt self.service_connection, metadata_config, "Redshift", self.sql_stmt
) )
self._extract_iter: Union[None, Iterator] = None self._extract_iter: Union[None, Iterator] = None
self._database = "redshift" self._database = "redshift"
@ -64,7 +73,12 @@ class RedshiftUsageSource(Source[TableQuery]):
@classmethod @classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = RedshiftConfig.parse_obj(config_dict) config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: RedshiftConnection = config.serviceConnection.__root__.config
if not isinstance(connection, RedshiftConnection):
raise InvalidSourceException(
f"Expected RedshiftConnection, but got {connection}"
)
return cls(config, metadata_config) return cls(config, metadata_config)
def prepare(self): def prepare(self):
@ -91,10 +105,10 @@ class RedshiftUsageSource(Source[TableQuery]):
starttime=str(row["starttime"]), starttime=str(row["starttime"]),
endtime=str(row["endtime"]), endtime=str(row["endtime"]),
analysis_date=str(self.analysis_date), analysis_date=str(self.analysis_date),
database=row["schema"], database=self.service_connection.database,
aborted=row["aborted"], aborted=row["aborted"],
sql=row["querytxt"], sql=row["querytxt"],
service_name=self.config.service_name, service_name=self.config.serviceName,
) )
yield tq yield tq
@ -102,7 +116,7 @@ class RedshiftUsageSource(Source[TableQuery]):
"sql": tq.sql, "sql": tq.sql,
"from_type": "table", "from_type": "table",
"to_type": "table", "to_type": "table",
"service_name": self.config.service_name, "service_name": self.config.serviceName,
} }
ingest_lineage(query_info, self.metadata_config) ingest_lineage(query_info, self.metadata_config)