mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-06 12:36:56 +00:00
Redshift Mark Partitioned Tables (#7063)
This commit is contained in:
parent
c366d39737
commit
791245c0e8
@ -15,7 +15,7 @@ Redshift source ingestion
|
||||
import re
|
||||
import traceback
|
||||
from collections import defaultdict
|
||||
from typing import Iterable, Optional, Tuple
|
||||
from typing import Iterable, List, Optional, Tuple
|
||||
|
||||
import sqlalchemy as sa
|
||||
from packaging.version import Version
|
||||
@ -23,11 +23,16 @@ from sqlalchemy import inspect, util
|
||||
from sqlalchemy.dialects.postgresql.base import ENUM, PGDialect
|
||||
from sqlalchemy.dialects.postgresql.base import ischema_names as pg_ischema_names
|
||||
from sqlalchemy.engine import reflection
|
||||
from sqlalchemy.engine.reflection import Inspector
|
||||
from sqlalchemy.sql import sqltypes
|
||||
from sqlalchemy.types import CHAR, VARCHAR, NullType
|
||||
from sqlalchemy_redshift.dialect import RedshiftDialectMixin, RelationKey
|
||||
|
||||
from metadata.generated.schema.entity.data.table import TableType
|
||||
from metadata.generated.schema.entity.data.table import (
|
||||
IntervalType,
|
||||
TablePartition,
|
||||
TableType,
|
||||
)
|
||||
from metadata.generated.schema.entity.services.connections.database.redshiftConnection import (
|
||||
RedshiftConnection,
|
||||
)
|
||||
@ -44,6 +49,7 @@ from metadata.utils.logger import ingestion_logger
|
||||
from metadata.utils.sql_queries import (
|
||||
REDSHIFT_GET_ALL_RELATION_INFO,
|
||||
REDSHIFT_GET_SCHEMA_COLUMN_INFO,
|
||||
REDSHIFT_PARTITION_DETAILS,
|
||||
)
|
||||
|
||||
sa_version = Version(sa.__version__)
|
||||
@ -443,6 +449,7 @@ STANDARD_TABLE_TYPES = {
|
||||
# pylint: disable=useless-super-delegation
|
||||
class RedshiftSource(CommonDbSourceService):
|
||||
def __init__(self, config, metadata_config):
|
||||
self.partition_details = {}
|
||||
super().__init__(config, metadata_config)
|
||||
|
||||
@classmethod
|
||||
@ -455,6 +462,15 @@ class RedshiftSource(CommonDbSourceService):
|
||||
)
|
||||
return cls(config, metadata_config)
|
||||
|
||||
def get_partition_details(self) -> None:
|
||||
"""
|
||||
Populate partition details
|
||||
"""
|
||||
self.partition_details.clear()
|
||||
results = self.engine.execute(REDSHIFT_PARTITION_DETAILS).fetchall()
|
||||
for row in results:
|
||||
self.partition_details[f"{row.schema}.{row.table}"] = row.diststyle
|
||||
|
||||
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
|
||||
"""
|
||||
Handle table and views.
|
||||
@ -493,6 +509,7 @@ class RedshiftSource(CommonDbSourceService):
|
||||
def get_database_names(self) -> Iterable[str]:
|
||||
if not self.config.serviceConnection.__root__.config.ingestAllDatabases:
|
||||
self.inspector = inspect(self.engine)
|
||||
self.get_partition_details()
|
||||
yield self.config.serviceConnection.__root__.config.database
|
||||
else:
|
||||
results = self.connection.execute("SELECT datname FROM pg_database")
|
||||
@ -508,9 +525,31 @@ class RedshiftSource(CommonDbSourceService):
|
||||
|
||||
try:
|
||||
self.set_inspector(database_name=new_database)
|
||||
self.get_partition_details()
|
||||
yield new_database
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Error trying to connect to database {new_database}: {exc}"
|
||||
)
|
||||
|
||||
def _get_partition_key(self, diststyle: str) -> Optional[List[str]]:
|
||||
try:
|
||||
regex = re.match(r"KEY\((\w+)\)", diststyle)
|
||||
if regex:
|
||||
return [regex.group(1)]
|
||||
except Exception as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(err)
|
||||
|
||||
def get_table_partition_details(
|
||||
self, table_name: str, schema_name: str, inspector: Inspector
|
||||
) -> Tuple[bool, TablePartition]:
|
||||
diststyle = self.partition_details.get(f"{schema_name}.{table_name}")
|
||||
if diststyle:
|
||||
partition_details = TablePartition(
|
||||
columns=self._get_partition_key(diststyle),
|
||||
intervalType=IntervalType.COLUMN_VALUE,
|
||||
)
|
||||
return True, partition_details
|
||||
return False, None
|
||||
|
||||
@ -534,3 +534,10 @@ SNOWFLAKE_GET_CLUSTER_KEY = """
|
||||
where TABLE_TYPE = 'BASE TABLE'
|
||||
and CLUSTERING_KEY is not null
|
||||
"""
|
||||
|
||||
|
||||
REDSHIFT_PARTITION_DETAILS = """
|
||||
select "schema", "table", diststyle
|
||||
from SVV_TABLE_INFO
|
||||
where diststyle not like 'AUTO%%'
|
||||
"""
|
||||
|
||||
55
ingestion/tests/unit/topology/database/test_redshift.py
Normal file
55
ingestion/tests/unit/topology/database/test_redshift.py
Normal file
@ -0,0 +1,55 @@
|
||||
from unittest import TestCase
|
||||
from unittest.mock import patch
|
||||
|
||||
from metadata.generated.schema.metadataIngestion.workflow import (
|
||||
OpenMetadataWorkflowConfig,
|
||||
)
|
||||
from metadata.ingestion.source.database.redshift import RedshiftSource
|
||||
|
||||
mock_snowflake_config = {
|
||||
"source": {
|
||||
"type": "redshift",
|
||||
"serviceName": "local_redshift",
|
||||
"serviceConnection": {
|
||||
"config": {
|
||||
"type": "Redshift",
|
||||
"username": "username",
|
||||
"password": "password",
|
||||
"database": "database",
|
||||
"hostPort": "cluster.name.region.redshift.amazonaws.com:5439",
|
||||
}
|
||||
},
|
||||
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
|
||||
},
|
||||
"sink": {"type": "metadata-rest", "config": {}},
|
||||
"workflowConfig": {
|
||||
"openMetadataServerConfig": {
|
||||
"hostPort": "http://localhost:8585/api",
|
||||
"authProvider": "no-auth",
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
RAW_DIST_STYLE = ["KEY(eventid)", "EVEN", "ALL"]
|
||||
|
||||
EXPECTED_PARTITION_COLUMNS = [["eventid"], None, None]
|
||||
|
||||
|
||||
class SnowflakeUnitTest(TestCase):
|
||||
@patch("metadata.ingestion.source.database.common_db_source.test_connection")
|
||||
def __init__(self, methodName, test_connection) -> None:
|
||||
super().__init__(methodName)
|
||||
test_connection.return_value = False
|
||||
self.config = OpenMetadataWorkflowConfig.parse_obj(mock_snowflake_config)
|
||||
self.redshift_source = RedshiftSource.create(
|
||||
mock_snowflake_config["source"],
|
||||
self.config.workflowConfig.openMetadataServerConfig,
|
||||
)
|
||||
|
||||
def test_partition_parse_columns(self):
|
||||
for i in range(len(RAW_DIST_STYLE)):
|
||||
assert (
|
||||
self.redshift_source._get_partition_key(RAW_DIST_STYLE[i])
|
||||
== EXPECTED_PARTITION_COLUMNS[i]
|
||||
)
|
||||
@ -50,6 +50,12 @@ process the Entities if needed, and reach the OpenMetadata server.
|
||||
The workflow is modeled around the following
|
||||
[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json)
|
||||
|
||||
<Note>
|
||||
|
||||
During the metadata ingestion for redshift, the tables in which the distribution style i.e `DISTSTYLE` is not `AUTO` will be marked as partitioned tables
|
||||
|
||||
</Note>
|
||||
|
||||
### 1. Define the YAML Config
|
||||
|
||||
This is a sample config for Redshift:
|
||||
|
||||
@ -50,6 +50,12 @@ process the Entities if needed, and reach the OpenMetadata server.
|
||||
The workflow is modeled around the following
|
||||
[JSON Schema](https://github.com/open-metadata/OpenMetadata/blob/main/catalog-rest-service/src/main/resources/json/schema/metadataIngestion/workflow.json)
|
||||
|
||||
<Note>
|
||||
|
||||
During the metadata ingestion for redshift, the tables in which the distribution style i.e `DISTSTYLE` is not `AUTO` will be marked as partitioned tables
|
||||
|
||||
</Note>
|
||||
|
||||
### 1. Define the YAML Config
|
||||
|
||||
This is a sample config for Redshift:
|
||||
|
||||
@ -169,6 +169,12 @@ caption="Configure Metadata Ingestion Page"
|
||||
- **Mark Deleted Tables (toggle)**: Set the Mark Deleted Tables toggle to flag tables as soft-deleted if they are not present anymore in the source system.
|
||||
- **Mark Deleted Tables from Filter Only (toggle)**: Set the Mark Deleted Tables from Filter Only toggle to flag tables as soft-deleted if they are not present anymore within the filtered schema or database only. This flag is useful when you have more than one ingestion pipelines. For example if you have a schema
|
||||
|
||||
<Note>
|
||||
|
||||
During the metadata ingestion for redshift, the tables in which the distribution style i.e `DISTSTYLE` is not `AUTO` will be marked as partitioned tables
|
||||
|
||||
</Note>
|
||||
|
||||
### 7. Schedule the Ingestion and Deploy
|
||||
|
||||
Scheduling can be set up at an hourly, daily, or weekly cadence. The
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user