feat(ingest/teradata): Teradata source (#8977)

This commit is contained in:
Tamas Nemeth 2023-10-13 00:14:45 +02:00 committed by GitHub
parent d04d25bf42
commit a8f0080c08
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 365 additions and 1 deletions

View File

@ -0,0 +1,28 @@
### Prerequisites
1. Create a user which has access to the database you want to ingest.
```sql
CREATE USER datahub FROM <database> AS PASSWORD = <password> PERM = 20000000;
```
2. Create a user with the following privileges:
```sql
GRANT SELECT ON dbc.columns TO datahub;
GRANT SELECT ON dbc.databases TO datahub;
GRANT SELECT ON dbc.tables TO datahub;
GRANT SELECT ON DBC.All_RI_ChildrenV TO datahub;
GRANT SELECT ON DBC.ColumnsV TO datahub;
GRANT SELECT ON DBC.IndicesV TO datahub;
GRANT SELECT ON dbc.TableTextV TO datahub;
GRANT SELECT ON dbc.TablesV TO datahub;
GRANT SELECT ON dbc.dbqlogtbl TO datahub; -- if lineage or usage extraction is enabled
```
If you want to run profiling, you need to grant select permission on all the tables you want to profile.
3. If linege or usage extraction is enabled, please, check if query logging is enabled and it is set to size which
will fit for your queries (the default query text size Teradata captures is max 200 chars)
An example how you can set it for all users:
```sql
REPLACE QUERY LOGGING LIMIT SQLTEXT=2000 ON ALL;
```
See more here about query logging:
[https://docs.teradata.com/r/Teradata-VantageCloud-Lake/Database-Reference/Database-Administration/Tracking-Query-Behavior-with-Database-Query-Logging-Operational-DBAs]()

View File

@ -0,0 +1,17 @@
pipeline_name: my-teradata-ingestion-pipeline
source:
type: teradata
config:
host_port: "myteradatainstance.teradata.com:1025"
#platform_instance: "myteradatainstance"
username: myuser
password: mypassword
#database_pattern:
# allow:
# - "demo_user"
# ignoreCase: true
include_table_lineage: true
include_usage_statistics: true
stateful_ingestion:
enabled: true
sink:

View File

@ -373,6 +373,7 @@ plugins: Dict[str, Set[str]] = {
# FIXME: I don't think tableau uses sqllineage anymore so we should be able
# to remove that dependency.
"tableau": {"tableauserverclient>=0.17.0"} | sqllineage_lib | sqlglot_lib,
"teradata": sql_common | {"teradatasqlalchemy>=17.20.0.0"},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
@ -499,6 +500,7 @@ base_dev_requirements = {
"s3",
"snowflake",
"tableau",
"teradata",
"trino",
"hive",
"starburst-trino-usage",
@ -597,6 +599,7 @@ entry_points = {
"tableau = datahub.ingestion.source.tableau:TableauSource",
"openapi = datahub.ingestion.source.openapi:OpenApiSource",
"metabase = datahub.ingestion.source.metabase:MetabaseSource",
"teradata = datahub.ingestion.source.sql.teradata:TeradataSource",
"trino = datahub.ingestion.source.sql.trino:TrinoSource",
"starburst-trino-usage = datahub.ingestion.source.usage.starburst_trino_usage:TrinoUsageSource",
"nifi = datahub.ingestion.source.nifi:NifiSource",

View File

@ -0,0 +1,228 @@
import logging
from dataclasses import dataclass
from typing import Iterable, Optional, Set, Union
# This import verifies that the dependencies are available.
import teradatasqlalchemy # noqa: F401
import teradatasqlalchemy.types as custom_types
from pydantic.fields import Field
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
capability,
config_class,
platform_name,
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.graph.client import DataHubGraph
from datahub.ingestion.source.sql.sql_common import SqlWorkUnit, register_custom_type
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
)
from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage
logger: logging.Logger = logging.getLogger(__name__)
register_custom_type(custom_types.JSON, BytesTypeClass)
register_custom_type(custom_types.INTERVAL_DAY, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_DAY_TO_SECOND, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_DAY_TO_MINUTE, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_DAY_TO_HOUR, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_SECOND, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_MINUTE, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_MINUTE_TO_SECOND, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_HOUR, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_HOUR_TO_MINUTE, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_HOUR_TO_SECOND, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_MONTH, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_YEAR, TimeTypeClass)
register_custom_type(custom_types.INTERVAL_YEAR_TO_MONTH, TimeTypeClass)
register_custom_type(custom_types.MBB, BytesTypeClass)
register_custom_type(custom_types.MBR, BytesTypeClass)
register_custom_type(custom_types.GEOMETRY, BytesTypeClass)
register_custom_type(custom_types.TDUDT, BytesTypeClass)
register_custom_type(custom_types.XML, BytesTypeClass)
@dataclass
class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport):
num_queries_parsed: int = 0
num_table_parse_failures: int = 0
class BaseTeradataConfig(TwoTierSQLAlchemyConfig):
scheme = Field(default="teradatasql", description="database scheme")
class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
database_pattern = Field(
default=AllowDenyPattern(deny=["dbc"]),
description="Regex patterns for databases to filter in ingestion.",
)
include_table_lineage = Field(
default=False,
description="Whether to include table lineage in the ingestion. "
"This requires to have the table lineage feature enabled.",
)
usage: BaseUsageConfig = Field(
description="The usage config to use when generating usage statistics",
default=BaseUsageConfig(),
)
use_schema_resolver: bool = Field(
default=True,
description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.",
hidden_from_docs=True,
)
default_db: Optional[str] = Field(
default=None,
description="The default database to use for unqualified table names",
)
include_usage_statistics: bool = Field(
default=False,
description="Generate usage statistic.",
)
@platform_name("Teradata")
@config_class(TeradataConfig)
@support_status(SupportStatus.TESTING)
@capability(SourceCapability.DOMAINS, "Enabled by default")
@capability(SourceCapability.CONTAINERS, "Enabled by default")
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DELETION_DETECTION, "Optionally enabled via configuration")
@capability(SourceCapability.DATA_PROFILING, "Optionally enabled via configuration")
@capability(SourceCapability.LINEAGE_COARSE, "Optionally enabled via configuration")
@capability(SourceCapability.LINEAGE_FINE, "Optionally enabled via configuration")
@capability(SourceCapability.USAGE_STATS, "Optionally enabled via configuration")
class TeradataSource(TwoTierSQLAlchemySource):
"""
This plugin extracts the following:
- Metadata for databases, schemas, views, and tables
- Column types associated with each table
- Table, row, and column statistics via optional SQL profiling
"""
config: TeradataConfig
LINEAGE_QUERY: str = """SELECT ProcID, UserName as "user", StartTime AT TIME ZONE 'GMT' as "timestamp", DefaultDatabase as default_database, QueryText as query
FROM "DBC".DBQLogTbl
where ErrorCode = 0
and QueryText like 'create table demo_user.test_lineage%'
and "timestamp" >= TIMESTAMP '{start_time}'
and "timestamp" < TIMESTAMP '{end_time}'
"""
urns: Optional[Set[str]]
def __init__(self, config: TeradataConfig, ctx: PipelineContext):
super().__init__(config, ctx, "teradata")
self.report: TeradataReport = TeradataReport()
self.graph: Optional[DataHubGraph] = ctx.graph
if self.graph:
if self.config.use_schema_resolver:
self.schema_resolver = (
self.graph.initialize_schema_resolver_from_datahub(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
)
self.urns = self.schema_resolver.get_urns()
else:
self.schema_resolver = self.graph._make_schema_resolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.urns = None
else:
self.schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
graph=None,
env=self.config.env,
)
self.urns = None
self.builder: SqlParsingBuilder = SqlParsingBuilder(
usage_config=self.config.usage
if self.config.include_usage_statistics
else None,
generate_lineage=self.config.include_table_lineage,
generate_usage_statistics=self.config.include_usage_statistics,
generate_operations=self.config.usage.include_operational_stats,
)
@classmethod
def create(cls, config_dict, ctx):
config = TeradataConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]:
engine = self.get_metadata_engine()
for entry in engine.execute(
self.LINEAGE_QUERY.format(
start_time=self.config.start_time, end_time=self.config.end_time
)
):
self.report.num_queries_parsed += 1
if self.report.num_queries_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} queries")
result = sqlglot_lineage(
sql=entry.query,
schema_resolver=self.schema_resolver,
default_db=None,
default_schema=entry.default_database
if entry.default_database
else self.config.default_db,
)
if result.debug_info.table_error:
logger.debug(
f"Error parsing table lineage, {result.debug_info.table_error}"
)
self.report.num_table_parse_failures += 1
continue
yield from self.builder.process_sql_parsing_result(
result,
query=entry.query,
query_timestamp=entry.timestamp,
user=f"urn:li:corpuser:{entry.user}",
include_urns=self.urns,
)
def get_metadata_engine(self) -> Engine:
url = self.config.get_sql_alchemy_url()
logger.debug(f"sql_alchemy_url={url}")
return create_engine(url, **self.config.options)
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()
if self.config.include_table_lineage or self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("audit log extraction")
yield from self.get_audit_log_mcps()
yield from self.builder.gen_workunits()

View File

@ -70,11 +70,14 @@ def assert_sql_result(
sql: str,
*,
dialect: str,
platform_instance: Optional[str] = None,
expected_file: pathlib.Path,
schemas: Optional[Dict[str, SchemaInfo]] = None,
**kwargs: Any,
) -> None:
schema_resolver = SchemaResolver(platform=dialect)
schema_resolver = SchemaResolver(
platform=dialect, platform_instance=platform_instance
)
if schemas:
for urn, schema in schemas.items():
schema_resolver.add_raw_schema_info(urn, schema)

View File

@ -482,6 +482,11 @@ def _column_level_lineage( # noqa: C901
# Our snowflake source lowercases column identifiers, so we are forced
# to do fuzzy (case-insensitive) resolution instead of exact resolution.
"snowflake",
# Teradata column names are case-insensitive.
# A name, even when enclosed in double quotation marks, is not case sensitive. For example, CUSTOMER and Customer are the same.
# See more below:
# https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/acreldb/n0ejgx4895bofnn14rlguktfx5r3.htm
"teradata",
}
sqlglot_db_schema = sqlglot.MappingSchema(

View File

@ -0,0 +1,38 @@
{
"query_type": "CREATE",
"in_tables": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_diagnoses,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_features,PROD)"
],
"out_tables": [
"urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)"
],
"column_lineage": [
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)",
"column": "PatientId",
"native_column_type": "INTEGER()"
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_diagnoses,PROD)",
"column": "PatientId"
}
]
},
{
"downstream": {
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)",
"column": "BMI",
"native_column_type": "FLOAT()"
},
"upstreams": [
{
"table": "urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_features,PROD)",
"column": "BMI"
}
]
}
]
}

View File

@ -630,3 +630,45 @@ LIMIT 10
# TODO: Add a test for setting platform_instance or env
def test_teradata_default_normalization():
assert_sql_result(
"""
create table demo_user.test_lineage2 as
(
select
ppd.PatientId,
ppf.bmi
from
demo_user.pima_patient_features ppf
join demo_user.pima_patient_diagnoses ppd on
ppd.PatientId = ppf.PatientId
) with data;
""",
dialect="teradata",
default_schema="dbc",
platform_instance="myteradata",
schemas={
"urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_diagnoses,PROD)": {
"HasDiabetes": "INTEGER()",
"PatientId": "INTEGER()",
},
"urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.pima_patient_features,PROD)": {
"Age": "INTEGER()",
"BMI": "FLOAT()",
"BloodP": "INTEGER()",
"DiPedFunc": "FLOAT()",
"NumTimesPrg": "INTEGER()",
"PatientId": "INTEGER()",
"PlGlcConc": "INTEGER()",
"SkinThick": "INTEGER()",
"TwoHourSerIns": "INTEGER()",
},
"urn:li:dataset:(urn:li:dataPlatform:teradata,myteradata.demo_user.test_lineage2,PROD)": {
"BMI": "FLOAT()",
"PatientId": "INTEGER()",
},
},
expected_file=RESOURCE_DIR / "test_teradata_default_normalization.json",
)