2022-02-08 19:09:30 +01:00
import datetime
2021-02-02 18:47:02 -08:00
import logging
2022-02-20 23:32:59 +01:00
import traceback
2021-02-15 14:39:59 -08:00
from abc import abstractmethod
2022-04-12 03:48:15 +01:00
from collections import OrderedDict
2021-02-09 15:58:26 -08:00
from dataclasses import dataclass , field
2022-02-07 18:51:49 +01:00
from enum import Enum
2021-07-30 17:41:03 -07:00
from typing import (
TYPE_CHECKING ,
Any ,
2022-04-12 03:48:15 +01:00
Callable ,
2021-07-30 17:41:03 -07:00
Dict ,
Iterable ,
List ,
Optional ,
Set ,
Tuple ,
Type ,
Union ,
2021-12-16 20:06:33 -08:00
cast ,
2021-07-30 17:41:03 -07:00
)
2021-06-30 09:33:41 -07:00
from urllib . parse import quote_plus
2021-02-11 23:14:20 -08:00
2021-07-06 16:41:54 -07:00
import pydantic
2022-05-02 00:18:15 -07:00
from pydantic . fields import Field
2022-02-20 22:32:15 -05:00
from sqlalchemy import create_engine , dialects , inspect
2021-06-29 17:49:57 -07:00
from sqlalchemy . engine . reflection import Inspector
2021-11-29 18:57:25 -08:00
from sqlalchemy . exc import ProgrammingError
2021-02-19 19:57:13 -08:00
from sqlalchemy . sql import sqltypes as types
2021-02-02 18:47:02 -08:00
2021-12-16 20:06:33 -08:00
from datahub . configuration . common import AllowDenyPattern
from datahub . emitter . mce_builder import (
make_data_platform_urn ,
2022-01-31 08:49:03 -08:00
make_dataplatform_instance_urn ,
2022-01-27 15:31:25 -08:00
make_dataset_urn_with_platform_instance ,
2022-02-07 18:51:49 +01:00
make_domain_urn ,
2022-05-23 21:27:13 +05:30
make_tag_urn ,
2021-12-16 20:06:33 -08:00
)
2021-07-30 17:41:03 -07:00
from datahub . emitter . mcp import MetadataChangeProposalWrapper
2022-02-07 18:51:49 +01:00
from datahub . emitter . mcp_builder import (
DatabaseKey ,
PlatformKey ,
SchemaKey ,
add_dataset_to_container ,
add_domain_to_entity_wu ,
gen_containers ,
)
2021-03-03 19:49:46 -08:00
from datahub . ingestion . api . common import PipelineContext
2021-06-29 11:43:43 -07:00
from datahub . ingestion . api . workunit import MetadataWorkUnit
2021-12-16 20:06:33 -08:00
from datahub . ingestion . source . state . checkpoint import Checkpoint
from datahub . ingestion . source . state . sql_common_state import (
BaseSQLAlchemyCheckpointState ,
)
from datahub . ingestion . source . state . stateful_ingestion_base import (
JobId ,
StatefulIngestionConfig ,
StatefulIngestionConfigBase ,
2022-03-14 21:20:29 +05:30
StatefulIngestionReport ,
2021-12-16 20:06:33 -08:00
StatefulIngestionSourceBase ,
)
from datahub . metadata . com . linkedin . pegasus2avro . common import StatusClass
2022-03-29 16:06:48 +02:00
from datahub . metadata . com . linkedin . pegasus2avro . dataset import UpstreamLineage
2021-02-15 15:04:21 -08:00
from datahub . metadata . com . linkedin . pegasus2avro . metadata . snapshot import DatasetSnapshot
from datahub . metadata . com . linkedin . pegasus2avro . mxe import MetadataChangeEvent
from datahub . metadata . com . linkedin . pegasus2avro . schema import (
2021-02-11 23:14:20 -08:00
ArrayTypeClass ,
2021-02-11 21:34:36 -08:00
BooleanTypeClass ,
BytesTypeClass ,
2021-03-10 02:07:20 -05:00
DateTypeClass ,
2021-02-11 21:34:36 -08:00
EnumTypeClass ,
2021-09-30 23:04:47 -07:00
ForeignKeyConstraint ,
2021-02-11 23:14:20 -08:00
MySqlDDL ,
2021-02-11 21:34:36 -08:00
NullTypeClass ,
2021-02-11 23:14:20 -08:00
NumberTypeClass ,
2021-07-06 19:18:18 -07:00
RecordTypeClass ,
2021-02-11 23:14:20 -08:00
SchemaField ,
SchemaFieldDataType ,
SchemaMetadata ,
StringTypeClass ,
2021-03-10 02:07:20 -05:00
TimeTypeClass ,
2021-02-11 12:24:20 -08:00
)
2022-01-27 15:31:25 -08:00
from datahub . metadata . schema_classes import (
ChangeTypeClass ,
DataPlatformInstanceClass ,
2022-03-29 16:06:48 +02:00
DatasetLineageTypeClass ,
2022-01-27 15:31:25 -08:00
DatasetPropertiesClass ,
2022-05-23 21:27:13 +05:30
GlobalTagsClass ,
2022-02-02 13:19:15 -08:00
JobStatusClass ,
2022-03-27 18:49:26 -05:00
SubTypesClass ,
2022-05-23 21:27:13 +05:30
TagAssociationClass ,
2022-03-29 16:06:48 +02:00
UpstreamClass ,
2022-03-27 18:49:26 -05:00
ViewPropertiesClass ,
2022-01-27 15:31:25 -08:00
)
2022-01-30 20:19:00 -05:00
from datahub . telemetry import telemetry
2022-07-11 09:37:38 -07:00
from datahub . utilities . registries . domain_registry import DomainRegistry
2021-12-08 00:38:40 -05:00
from datahub . utilities . sqlalchemy_query_combiner import SQLAlchemyQueryCombinerReport
2021-07-30 17:41:03 -07:00
if TYPE_CHECKING :
2021-10-20 01:24:20 -04:00
from datahub . ingestion . source . ge_data_profiler import (
DatahubGEProfiler ,
GEProfilerRequest ,
)
2021-02-11 12:24:20 -08:00
2021-04-14 13:40:24 -07:00
logger : logging . Logger = logging . getLogger ( __name__ )
2021-02-02 18:47:02 -08:00
2022-06-08 23:03:32 +05:30
MISSING_COLUMN_INFO = " missing column information "
2021-02-02 18:47:02 -08:00
2022-04-12 03:48:15 +01:00
def _platform_alchemy_uri_tester_gen (
platform : str , opt_starts_with : Optional [ str ] = None
) - > Tuple [ str , Callable [ [ str ] , bool ] ] :
return platform , lambda x : x . startswith (
platform if not opt_starts_with else opt_starts_with
)
PLATFORM_TO_SQLALCHEMY_URI_TESTER_MAP : Dict [ str , Callable [ [ str ] , bool ] ] = OrderedDict (
[
_platform_alchemy_uri_tester_gen ( " athena " , " awsathena " ) ,
_platform_alchemy_uri_tester_gen ( " bigquery " ) ,
_platform_alchemy_uri_tester_gen ( " clickhouse " ) ,
_platform_alchemy_uri_tester_gen ( " druid " ) ,
2022-05-26 12:42:50 +02:00
_platform_alchemy_uri_tester_gen ( " hana " ) ,
2022-04-12 03:48:15 +01:00
_platform_alchemy_uri_tester_gen ( " hive " ) ,
_platform_alchemy_uri_tester_gen ( " mongodb " ) ,
_platform_alchemy_uri_tester_gen ( " mssql " ) ,
_platform_alchemy_uri_tester_gen ( " mysql " ) ,
_platform_alchemy_uri_tester_gen ( " oracle " ) ,
_platform_alchemy_uri_tester_gen ( " pinot " ) ,
_platform_alchemy_uri_tester_gen ( " presto " ) ,
(
" redshift " ,
lambda x : (
x . startswith ( ( " jdbc:postgres: " , " postgresql " ) )
and x . find ( " redshift.amazonaws " ) > 0
)
or x . startswith ( " redshift " ) ,
) ,
# Don't move this before redshift.
_platform_alchemy_uri_tester_gen ( " postgres " , " postgresql " ) ,
_platform_alchemy_uri_tester_gen ( " snowflake " ) ,
_platform_alchemy_uri_tester_gen ( " trino " ) ,
2022-05-26 19:26:28 +09:00
_platform_alchemy_uri_tester_gen ( " vertica " ) ,
2022-04-12 03:48:15 +01:00
]
)
2021-10-20 10:59:38 +05:30
def get_platform_from_sqlalchemy_uri ( sqlalchemy_uri : str ) - > str :
2022-04-12 03:48:15 +01:00
for platform , tester in PLATFORM_TO_SQLALCHEMY_URI_TESTER_MAP . items ( ) :
if tester ( sqlalchemy_uri ) :
return platform
2021-10-20 10:59:38 +05:30
return " external "
2021-06-30 09:33:41 -07:00
def make_sqlalchemy_uri (
scheme : str ,
username : Optional [ str ] ,
password : Optional [ str ] ,
at : Optional [ str ] ,
db : Optional [ str ] ,
uri_opts : Optional [ Dict [ str , Any ] ] = None ,
) - > str :
url = f " { scheme } :// "
if username is not None :
url + = f " { quote_plus ( username ) } "
if password is not None :
url + = f " : { quote_plus ( password ) } "
url + = " @ "
if at is not None :
url + = f " { at } "
if db is not None :
url + = f " / { db } "
if uri_opts is not None :
if db is None :
url + = " / "
params = " & " . join (
f " { key } = { quote_plus ( value ) } " for ( key , value ) in uri_opts . items ( ) if value
)
url = f " { url } ? { params } "
return url
2022-02-07 18:51:49 +01:00
class SqlContainerSubTypes ( str , Enum ) :
DATABASE = " Database "
SCHEMA = " Schema "
2021-02-09 15:58:26 -08:00
@dataclass
2022-03-14 21:20:29 +05:30
class SQLSourceReport ( StatefulIngestionReport ) :
2021-04-14 13:40:24 -07:00
tables_scanned : int = 0
2021-06-12 02:30:33 +02:00
views_scanned : int = 0
2021-12-14 02:06:49 -05:00
entities_profiled : int = 0
2021-02-09 15:58:26 -08:00
filtered : List [ str ] = field ( default_factory = list )
2021-12-16 20:06:33 -08:00
soft_deleted_stale_entities : List [ str ] = field ( default_factory = list )
2021-02-09 15:58:26 -08:00
2021-12-08 00:38:40 -05:00
query_combiner : Optional [ SQLAlchemyQueryCombinerReport ] = None
2021-06-12 02:30:33 +02:00
def report_entity_scanned ( self , name : str , ent_type : str = " table " ) - > None :
"""
Entity could be a view or a table
"""
if ent_type == " table " :
self . tables_scanned + = 1
elif ent_type == " view " :
self . views_scanned + = 1
else :
raise KeyError ( f " Unknown entity { ent_type } . " )
2021-12-14 02:06:49 -05:00
def report_entity_profiled ( self , name : str ) - > None :
self . entities_profiled + = 1
2021-06-12 02:30:33 +02:00
def report_dropped ( self , ent_name : str ) - > None :
self . filtered . append ( ent_name )
2021-02-09 15:58:26 -08:00
2021-12-08 00:38:40 -05:00
def report_from_query_combiner (
self , query_combiner_report : SQLAlchemyQueryCombinerReport
) - > None :
self . query_combiner = query_combiner_report
2021-12-16 20:06:33 -08:00
def report_stale_entity_soft_deleted ( self , urn : str ) - > None :
self . soft_deleted_stale_entities . append ( urn )
2021-02-09 15:58:26 -08:00
2021-12-16 20:06:33 -08:00
class SQLAlchemyStatefulIngestionConfig ( StatefulIngestionConfig ) :
"""
Specialization of basic StatefulIngestionConfig to adding custom config .
This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase
in the SQLAlchemyConfig .
"""
2022-05-02 00:18:15 -07:00
remove_stale_metadata : bool = Field (
default = True ,
description = " Soft-deletes the tables and views that were found in the last successful run but missing in the current run with stateful_ingestion enabled. " ,
)
2021-12-16 20:06:33 -08:00
class SQLAlchemyConfig ( StatefulIngestionConfigBase ) :
2021-03-03 19:49:46 -08:00
options : dict = { }
2021-03-11 09:08:01 +01:00
# Although the 'table_pattern' enables you to skip everything from certain schemas,
# having another option to allow/deny on schema level is an optimization for the case when there is a large number
# of schemas that one wants to skip and you want to avoid the time to needlessly fetch those tables only to filter
# them out afterwards via the table_pattern.
2022-05-02 00:18:15 -07:00
schema_pattern : AllowDenyPattern = Field (
default = AllowDenyPattern . allow_all ( ) ,
2022-07-11 09:37:38 -07:00
description = " Regex patterns for schemas to filter in ingestion. Specify regex to only match the schema name. e.g. to match all tables in schema analytics, use the regex ' analytics ' " ,
2022-05-02 00:18:15 -07:00
)
table_pattern : AllowDenyPattern = Field (
default = AllowDenyPattern . allow_all ( ) ,
2022-07-11 09:37:38 -07:00
description = " Regex patterns for tables to filter in ingestion. Specify regex to match the entire table name in database.schema.table format. e.g. to match all tables starting with customer in Customer database and public schema, use the regex ' Customer.public.customer.* ' " ,
2022-05-02 00:18:15 -07:00
)
view_pattern : AllowDenyPattern = Field (
default = AllowDenyPattern . allow_all ( ) ,
2022-07-11 09:37:38 -07:00
description = " Regex patterns for views to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex ' Customer.public.customer.* ' " ,
2022-05-02 00:18:15 -07:00
)
profile_pattern : AllowDenyPattern = Field (
default = AllowDenyPattern . allow_all ( ) ,
2022-07-11 09:37:38 -07:00
description = " Regex patterns to filter tables for profiling during ingestion. Allowed by the `table_pattern`. " ,
2022-05-02 00:18:15 -07:00
)
domain : Dict [ str , AllowDenyPattern ] = Field (
default = dict ( ) ,
2022-07-11 09:37:38 -07:00
description = ' Attach domains to databases, schemas or tables during ingestion using regex patterns. Domain key can be a guid like *urn:li:domain:ec428203-ce86-4db3-985d-5a8ee6df32ba* or a string like " Marketing " .) If you provide strings, then datahub will attempt to resolve this name to a guid, and will error out if this fails. There can be multiple domain keys specified. ' ,
2022-05-02 00:18:15 -07:00
)
2021-06-12 02:30:33 +02:00
2022-05-02 00:18:15 -07:00
include_views : Optional [ bool ] = Field (
default = True , description = " Whether views should be ingested. "
)
include_tables : Optional [ bool ] = Field (
default = True , description = " Whether tables should be ingested. "
)
2021-02-15 14:39:59 -08:00
2021-10-27 19:49:40 -07:00
from datahub . ingestion . source . ge_data_profiler import GEProfilingConfig
2021-07-30 17:41:03 -07:00
profiling : GEProfilingConfig = GEProfilingConfig ( )
2021-12-16 20:06:33 -08:00
# Custom Stateful Ingestion settings
stateful_ingestion : Optional [ SQLAlchemyStatefulIngestionConfig ] = None
2021-07-30 17:41:03 -07:00
2022-07-11 09:37:38 -07:00
@pydantic.root_validator ( pre = True )
def view_pattern_is_table_pattern_unless_specified (
cls , values : Dict [ str , Any ]
) - > Dict [ str , Any ] :
view_pattern = values . get ( " view_pattern " )
table_pattern = values . get ( " table_pattern " )
if table_pattern and not view_pattern :
logger . info ( f " Applying table_pattern { table_pattern } to view_pattern. " )
values [ " view_pattern " ] = table_pattern
return values
2021-10-27 19:49:40 -07:00
@pydantic.root_validator ( )
def ensure_profiling_pattern_is_passed_to_profiling (
cls , values : Dict [ str , Any ]
) - > Dict [ str , Any ] :
profiling = values . get ( " profiling " )
if profiling is not None and profiling . enabled :
profiling . allow_deny_patterns = values [ " profile_pattern " ]
return values
2021-02-15 14:39:59 -08:00
@abstractmethod
def get_sql_alchemy_url ( self ) :
pass
class BasicSQLAlchemyConfig ( SQLAlchemyConfig ) :
2022-05-02 00:18:15 -07:00
username : Optional [ str ] = Field ( default = None , description = " username " )
2022-06-24 17:05:39 +05:30
password : Optional [ pydantic . SecretStr ] = Field (
default = None , exclude = True , description = " password "
)
2022-05-02 00:18:15 -07:00
host_port : str = Field ( description = " host URL " )
database : Optional [ str ] = Field ( default = None , description = " database (catalog) " )
database_alias : Optional [ str ] = Field (
default = None , description = " Alias to apply to database when ingesting. "
)
scheme : str = Field ( description = " scheme " )
sqlalchemy_uri : Optional [ str ] = Field (
default = None ,
description = " URI of database to connect to. See https://docs.sqlalchemy.org/en/14/core/engines.html#database-urls. Takes precedence over other connection parameters. " ,
)
2021-02-02 18:47:02 -08:00
2022-04-12 17:49:56 -07:00
def get_sql_alchemy_url ( self , uri_opts : Optional [ Dict [ str , Any ] ] = None ) - > str :
2022-04-08 04:11:52 +01:00
if not ( ( self . host_port and self . scheme ) or self . sqlalchemy_uri ) :
raise ValueError ( " host_port and schema or connect_uri required. " )
return self . sqlalchemy_uri or make_sqlalchemy_uri (
self . scheme , # type: ignore
2021-06-30 09:33:41 -07:00
self . username ,
2021-07-06 16:41:54 -07:00
self . password . get_secret_value ( ) if self . password else None ,
2022-04-08 04:11:52 +01:00
self . host_port , # type: ignore
2021-06-30 09:33:41 -07:00
self . database ,
uri_opts = uri_opts ,
)
2021-02-02 18:47:02 -08:00
2021-01-31 22:40:30 -08:00
2021-03-18 02:05:05 -04:00
class SqlWorkUnit ( MetadataWorkUnit ) :
pass
2021-02-11 21:34:36 -08:00
2021-04-21 15:14:59 -07:00
_field_type_mapping : Dict [ Type [ types . TypeEngine ] , Type ] = {
2021-02-11 12:24:20 -08:00
types . Integer : NumberTypeClass ,
types . Numeric : NumberTypeClass ,
types . Boolean : BooleanTypeClass ,
types . Enum : EnumTypeClass ,
types . _Binary : BytesTypeClass ,
2021-03-05 16:39:34 -08:00
types . LargeBinary : BytesTypeClass ,
2021-02-11 12:24:20 -08:00
types . PickleType : BytesTypeClass ,
types . ARRAY : ArrayTypeClass ,
types . String : StringTypeClass ,
2021-03-10 02:07:20 -05:00
types . Date : DateTypeClass ,
types . DATE : DateTypeClass ,
types . Time : TimeTypeClass ,
types . DateTime : TimeTypeClass ,
types . DATETIME : TimeTypeClass ,
types . TIMESTAMP : TimeTypeClass ,
2021-07-06 19:18:18 -07:00
types . JSON : RecordTypeClass ,
2022-02-20 22:32:15 -05:00
dialects . postgresql . base . BYTEA : BytesTypeClass ,
dialects . postgresql . base . DOUBLE_PRECISION : NumberTypeClass ,
dialects . postgresql . base . INET : StringTypeClass ,
dialects . postgresql . base . MACADDR : StringTypeClass ,
dialects . postgresql . base . MONEY : NumberTypeClass ,
dialects . postgresql . base . OID : StringTypeClass ,
dialects . postgresql . base . REGCLASS : BytesTypeClass ,
dialects . postgresql . base . TIMESTAMP : TimeTypeClass ,
dialects . postgresql . base . TIME : TimeTypeClass ,
dialects . postgresql . base . INTERVAL : TimeTypeClass ,
dialects . postgresql . base . BIT : BytesTypeClass ,
dialects . postgresql . base . UUID : StringTypeClass ,
dialects . postgresql . base . TSVECTOR : BytesTypeClass ,
dialects . postgresql . base . ENUM : EnumTypeClass ,
2021-07-28 20:31:33 -07:00
# When SQLAlchemy is unable to map a type into its internal hierarchy, it
2021-02-19 19:57:13 -08:00
# assigns the NullType by default. We want to carry this warning through.
types . NullType : NullTypeClass ,
2021-02-11 12:24:20 -08:00
}
2021-04-21 15:14:59 -07:00
_known_unknown_field_types : Set [ Type [ types . TypeEngine ] ] = {
2021-03-05 16:39:34 -08:00
types . Interval ,
2021-03-10 02:07:20 -05:00
types . CLOB ,
2021-03-05 16:39:34 -08:00
}
2021-02-11 12:24:20 -08:00
2021-02-11 21:34:36 -08:00
2021-04-21 15:14:59 -07:00
def register_custom_type (
tp : Type [ types . TypeEngine ] , output : Optional [ Type ] = None
) - > None :
if output :
_field_type_mapping [ tp ] = output
else :
_known_unknown_field_types . add ( tp )
2021-07-06 19:18:18 -07:00
class _CustomSQLAlchemyDummyType ( types . TypeDecorator ) :
impl = types . LargeBinary
def make_sqlalchemy_type ( name : str ) - > Type [ types . TypeEngine ] :
# This usage of type() dynamically constructs a class.
# See https://stackoverflow.com/a/15247202/5004662 and
# https://docs.python.org/3/library/functions.html#type.
sqlalchemy_type : Type [ types . TypeEngine ] = type (
name ,
( _CustomSQLAlchemyDummyType , ) ,
{
" __repr__ " : lambda self : f " { name } () " ,
} ,
)
return sqlalchemy_type
2021-02-11 22:48:08 -08:00
def get_column_type (
2021-03-10 02:07:20 -05:00
sql_report : SQLSourceReport , dataset_name : str , column_type : Any
2021-02-11 22:48:08 -08:00
) - > SchemaFieldDataType :
2021-02-11 12:24:20 -08:00
"""
Maps SQLAlchemy types ( https : / / docs . sqlalchemy . org / en / 13 / core / type_basics . html ) to corresponding schema types
"""
2021-03-10 02:07:20 -05:00
TypeClass : Optional [ Type ] = None
2021-02-11 12:24:20 -08:00
for sql_type in _field_type_mapping . keys ( ) :
if isinstance ( column_type , sql_type ) :
TypeClass = _field_type_mapping [ sql_type ]
break
2021-03-05 16:39:34 -08:00
if TypeClass is None :
for sql_type in _known_unknown_field_types :
if isinstance ( column_type , sql_type ) :
TypeClass = NullTypeClass
break
2021-02-11 21:34:36 -08:00
2021-02-11 12:24:20 -08:00
if TypeClass is None :
2021-02-11 22:48:08 -08:00
sql_report . report_warning (
2021-03-03 19:49:46 -08:00
dataset_name , f " unable to map type { column_type !r} to metadata schema "
2021-02-11 22:48:08 -08:00
)
2021-02-11 12:24:20 -08:00
TypeClass = NullTypeClass
2021-02-02 18:47:02 -08:00
2021-02-11 12:24:20 -08:00
return SchemaFieldDataType ( type = TypeClass ( ) )
2021-01-31 22:40:30 -08:00
2021-02-09 01:02:05 -08:00
2021-02-11 22:48:08 -08:00
def get_schema_metadata (
2021-09-30 23:04:47 -07:00
sql_report : SQLSourceReport ,
dataset_name : str ,
platform : str ,
columns : List [ dict ] ,
pk_constraints : dict = None ,
foreign_keys : List [ ForeignKeyConstraint ] = None ,
2021-10-19 11:23:51 +05:30
canonical_schema : List [ SchemaField ] = [ ] ,
2021-02-11 22:48:08 -08:00
) - > SchemaMetadata :
2021-02-02 18:47:02 -08:00
schema_metadata = SchemaMetadata (
schemaName = dataset_name ,
2021-12-16 20:06:33 -08:00
platform = make_data_platform_urn ( platform ) ,
2021-02-02 18:47:02 -08:00
version = 0 ,
hash = " " ,
2021-02-11 22:48:08 -08:00
platformSchema = MySqlDDL ( tableSchema = " " ) ,
2021-02-11 21:34:36 -08:00
fields = canonical_schema ,
2021-02-11 12:24:20 -08:00
)
2021-09-30 23:04:47 -07:00
if foreign_keys is not None and foreign_keys != [ ] :
schema_metadata . foreignKeys = foreign_keys
2021-02-02 18:47:02 -08:00
return schema_metadata
2021-01-31 22:40:30 -08:00
2022-02-24 15:35:48 -05:00
# config flags to emit telemetry for
config_options_to_report = [
" include_views " ,
" include_tables " ,
]
2022-01-30 20:19:00 -05:00
# flags to emit telemetry for
profiling_flags_to_report = [
" turn_off_expensive_profiling_metrics " ,
" profile_table_level_only " ,
" include_field_null_count " ,
" include_field_min_value " ,
" include_field_max_value " ,
" include_field_mean_value " ,
" include_field_median_value " ,
" include_field_stddev_value " ,
" include_field_quantiles " ,
" include_field_distinct_value_frequencies " ,
" include_field_histogram " ,
" include_field_sample_values " ,
" query_combiner_enabled " ,
]
2021-12-16 20:06:33 -08:00
class SQLAlchemySource ( StatefulIngestionSourceBase ) :
2021-02-09 01:02:05 -08:00
""" A Base class for all SQL Sources that use SQLAlchemy to extend """
2021-03-03 19:49:46 -08:00
def __init__ ( self , config : SQLAlchemyConfig , ctx : PipelineContext , platform : str ) :
2021-12-16 20:06:33 -08:00
super ( SQLAlchemySource , self ) . __init__ ( config , ctx )
2021-02-09 01:02:05 -08:00
self . config = config
self . platform = platform
2022-03-14 21:20:29 +05:30
self . report : SQLSourceReport = SQLSourceReport ( )
2021-02-09 01:02:05 -08:00
2022-02-24 15:35:48 -05:00
config_report = {
config_option : config . dict ( ) . get ( config_option )
for config_option in config_options_to_report
}
config_report = {
* * config_report ,
" profiling_enabled " : config . profiling . enabled ,
" platform " : platform ,
}
2022-01-30 20:19:00 -05:00
telemetry . telemetry_instance . ping (
2022-02-24 15:35:48 -05:00
" sql_config " ,
config_report ,
2022-01-30 20:19:00 -05:00
)
if config . profiling . enabled :
2022-02-24 15:35:48 -05:00
telemetry . telemetry_instance . ping (
" sql_profiling_config " ,
{
config_flag : config . profiling . dict ( ) . get ( config_flag )
for config_flag in profiling_flags_to_report
} ,
)
2022-07-11 09:37:38 -07:00
if self . config . domain :
self . domain_registry = DomainRegistry (
cached_domains = [ k for k in self . config . domain ] , graph = self . ctx . graph
)
2022-01-30 20:19:00 -05:00
2022-03-21 22:20:36 +05:30
def warn ( self , log : logging . Logger , key : str , reason : str ) - > None :
2022-03-08 04:02:44 +05:30
self . report . report_warning ( key , reason )
2022-03-21 22:20:36 +05:30
log . warning ( f " { key } => { reason } " )
2022-03-08 04:02:44 +05:30
2022-03-21 22:20:36 +05:30
def error ( self , log : logging . Logger , key : str , reason : str ) - > None :
2022-03-08 04:02:44 +05:30
self . report . report_failure ( key , reason )
2022-03-14 21:20:29 +05:30
log . error ( f " { key } => { reason } " )
2022-03-08 04:02:44 +05:30
2021-06-30 15:54:17 -07:00
def get_inspectors ( self ) - > Iterable [ Inspector ] :
# This method can be overridden in the case that you want to dynamically
# run on multiple databases.
url = self . config . get_sql_alchemy_url ( )
logger . debug ( f " sql_alchemy_url= { url } " )
engine = create_engine ( url , * * self . config . options )
2021-07-30 17:41:03 -07:00
with engine . connect ( ) as conn :
inspector = inspect ( conn )
yield inspector
2021-06-30 15:54:17 -07:00
2022-02-07 18:51:49 +01:00
def get_db_name ( self , inspector : Inspector ) - > str :
engine = inspector . engine
if engine and hasattr ( engine , " url " ) and hasattr ( engine . url , " database " ) :
return str ( engine . url . database ) . strip ( ' " ' ) . lower ( )
else :
raise Exception ( " Unable to get database name from Sqlalchemy inspector " )
2021-12-16 20:06:33 -08:00
def is_checkpointing_enabled ( self , job_id : JobId ) - > bool :
if (
job_id == self . get_default_ingestion_job_id ( )
and self . is_stateful_ingestion_configured ( )
and self . config . stateful_ingestion
and self . config . stateful_ingestion . remove_stale_metadata
) :
return True
return False
def get_default_ingestion_job_id ( self ) - > JobId :
"""
Default ingestion job name that sql_common provides .
Subclasses can override as needed .
"""
return JobId ( " common_ingest_from_sql_source " )
def create_checkpoint ( self , job_id : JobId ) - > Optional [ Checkpoint ] :
"""
Create the custom checkpoint with empty state for the job .
"""
assert self . ctx . pipeline_name is not None
if job_id == self . get_default_ingestion_job_id ( ) :
return Checkpoint (
job_name = job_id ,
pipeline_name = self . ctx . pipeline_name ,
platform_instance_id = self . get_platform_instance_id ( ) ,
run_id = self . ctx . run_id ,
config = self . config ,
state = BaseSQLAlchemyCheckpointState ( ) ,
)
return None
2022-02-02 13:19:15 -08:00
def update_default_job_run_summary ( self ) - > None :
summary = self . get_job_run_summary ( self . get_default_ingestion_job_id ( ) )
if summary is not None :
# For now just add the config and the report.
summary . config = self . config . json ( )
summary . custom_summary = self . report . as_string ( )
summary . runStatus = (
JobStatusClass . FAILED
if self . get_report ( ) . failures
else JobStatusClass . COMPLETED
)
2021-07-28 23:20:18 -04:00
def get_schema_names ( self , inspector ) :
return inspector . get_schema_names ( )
2021-12-16 20:06:33 -08:00
def get_platform_instance_id ( self ) - > str :
"""
The source identifier such as the specific source host address required for stateful ingestion .
Individual subclasses need to override this method appropriately .
"""
config_dict = self . config . dict ( )
host_port = config_dict . get ( " host_port " , " no_host_port " )
database = config_dict . get ( " database " , " no_database " )
return f " { self . platform } _ { host_port } _ { database } "
def gen_removed_entity_workunits ( self ) - > Iterable [ MetadataWorkUnit ] :
last_checkpoint = self . get_last_checkpoint (
self . get_default_ingestion_job_id ( ) , BaseSQLAlchemyCheckpointState
)
cur_checkpoint = self . get_current_checkpoint (
self . get_default_ingestion_job_id ( )
)
if (
self . config . stateful_ingestion
and self . config . stateful_ingestion . remove_stale_metadata
and last_checkpoint is not None
and last_checkpoint . state is not None
and cur_checkpoint is not None
and cur_checkpoint . state is not None
) :
logger . debug ( " Checking for stale entity removal. " )
2022-02-07 18:51:49 +01:00
def soft_delete_item ( urn : str , type : str ) - > Iterable [ MetadataWorkUnit ] :
entity_type : str = " dataset "
if type == " container " :
entity_type = " container "
2021-12-16 20:06:33 -08:00
logger . info ( f " Soft-deleting stale entity of type { type } - { urn } . " )
mcp = MetadataChangeProposalWrapper (
2022-02-07 18:51:49 +01:00
entityType = entity_type ,
2021-12-16 20:06:33 -08:00
entityUrn = urn ,
changeType = ChangeTypeClass . UPSERT ,
aspectName = " status " ,
aspect = StatusClass ( removed = True ) ,
)
wu = MetadataWorkUnit ( id = f " soft-delete- { type } - { urn } " , mcp = mcp )
self . report . report_workunit ( wu )
self . report . report_stale_entity_soft_deleted ( urn )
yield wu
last_checkpoint_state = cast (
BaseSQLAlchemyCheckpointState , last_checkpoint . state
)
cur_checkpoint_state = cast (
BaseSQLAlchemyCheckpointState , cur_checkpoint . state
)
for table_urn in last_checkpoint_state . get_table_urns_not_in (
cur_checkpoint_state
) :
2022-02-07 18:51:49 +01:00
yield from soft_delete_item ( table_urn , " table " )
2021-12-16 20:06:33 -08:00
for view_urn in last_checkpoint_state . get_view_urns_not_in (
cur_checkpoint_state
) :
2022-02-07 18:51:49 +01:00
yield from soft_delete_item ( view_urn , " view " )
for container_urn in last_checkpoint_state . get_container_urns_not_in (
cur_checkpoint_state
) :
yield from soft_delete_item ( container_urn , " container " )
def gen_schema_key ( self , db_name : str , schema : str ) - > PlatformKey :
return SchemaKey (
database = db_name ,
schema = schema ,
platform = self . platform ,
2022-03-16 22:57:50 +01:00
instance = self . config . platform_instance
if self . config . platform_instance is not None
else self . config . env ,
2022-02-07 18:51:49 +01:00
)
def gen_database_key ( self , database : str ) - > PlatformKey :
return DatabaseKey (
database = database ,
platform = self . platform ,
2022-03-16 22:57:50 +01:00
instance = self . config . platform_instance
if self . config . platform_instance is not None
else self . config . env ,
2022-02-07 18:51:49 +01:00
)
def gen_database_containers ( self , database : str ) - > Iterable [ MetadataWorkUnit ] :
domain_urn = self . _gen_domain_urn ( database )
database_container_key = self . gen_database_key ( database )
container_workunits = gen_containers (
container_key = database_container_key ,
name = database ,
sub_types = [ SqlContainerSubTypes . DATABASE ] ,
domain_urn = domain_urn ,
)
for wu in container_workunits :
self . report . report_workunit ( wu )
yield wu
def gen_schema_containers (
self , schema : str , db_name : str
) - > Iterable [ MetadataWorkUnit ] :
schema_container_key = self . gen_schema_key ( db_name , schema )
2022-02-17 10:34:02 +01:00
database_container_key : Optional [ PlatformKey ] = None
if db_name is not None :
database_container_key = self . gen_database_key ( database = db_name )
2022-02-07 18:51:49 +01:00
container_workunits = gen_containers (
schema_container_key ,
schema ,
[ SqlContainerSubTypes . SCHEMA ] ,
database_container_key ,
)
for wu in container_workunits :
self . report . report_workunit ( wu )
yield wu
2021-12-16 20:06:33 -08:00
2021-07-30 17:41:03 -07:00
def get_workunits ( self ) - > Iterable [ Union [ MetadataWorkUnit , SqlWorkUnit ] ] :
2021-02-09 01:02:05 -08:00
sql_config = self . config
2021-06-03 11:14:34 -07:00
if logger . isEnabledFor ( logging . DEBUG ) :
# If debug logging is enabled, we also want to echo each SQL query issued.
2021-07-30 17:41:03 -07:00
sql_config . options . setdefault ( " echo " , True )
2021-06-03 11:14:34 -07:00
2021-10-20 01:24:20 -04:00
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if sql_config . profiling . enabled :
sql_config . options . setdefault (
" max_overflow " , sql_config . profiling . max_workers
)
2021-06-30 15:54:17 -07:00
for inspector in self . get_inspectors ( ) :
2021-10-20 01:24:20 -04:00
profiler = None
profile_requests : List [ " GEProfilerRequest " ] = [ ]
2021-07-30 17:41:03 -07:00
if sql_config . profiling . enabled :
2022-06-27 14:21:54 +05:30
profiler = self . get_profiler_instance ( inspector )
2021-07-30 17:41:03 -07:00
2022-02-07 18:51:49 +01:00
db_name = self . get_db_name ( inspector )
yield from self . gen_database_containers ( db_name )
2021-07-28 23:20:18 -04:00
for schema in self . get_schema_names ( inspector ) :
2021-06-30 15:54:17 -07:00
if not sql_config . schema_pattern . allowed ( schema ) :
2021-07-06 19:18:18 -07:00
self . report . report_dropped ( f " { schema } .* " )
2021-06-30 15:54:17 -07:00
continue
2022-05-27 13:09:08 +05:30
self . add_information_for_schema ( inspector , schema )
2021-03-11 09:08:01 +01:00
2022-02-17 10:34:02 +01:00
yield from self . gen_schema_containers ( schema , db_name )
2022-02-07 18:51:49 +01:00
2021-06-30 15:54:17 -07:00
if sql_config . include_tables :
yield from self . loop_tables ( inspector , schema , sql_config )
2021-06-12 02:30:33 +02:00
2021-06-30 15:54:17 -07:00
if sql_config . include_views :
yield from self . loop_views ( inspector , schema , sql_config )
2021-06-12 02:30:33 +02:00
2021-10-20 01:24:20 -04:00
if profiler :
profile_requests + = list (
self . loop_profiler_requests ( inspector , schema , sql_config )
2021-07-30 17:41:03 -07:00
)
2021-10-20 01:24:20 -04:00
if profiler and profile_requests :
2022-05-19 10:43:06 +05:30
yield from self . loop_profiler (
profile_requests , profiler , platform = self . platform
)
2021-10-20 01:24:20 -04:00
2021-12-16 20:06:33 -08:00
if self . is_stateful_ingestion_configured ( ) :
# Clean up stale entities.
yield from self . gen_removed_entity_workunits ( )
2021-07-28 20:31:33 -07:00
def standardize_schema_table_names (
self , schema : str , entity : str
) - > Tuple [ str , str ] :
# Some SQLAlchemy dialects need a standardization step to clean the schema
# and table names. See BigQuery for an example of when this is useful.
return schema , entity
def get_identifier (
self , * , schema : str , entity : str , inspector : Inspector , * * kwargs : Any
) - > str :
# Many SQLAlchemy dialects have three-level hierarchies. This method, which
# subclasses can override, enables them to modify the identifers as needed.
if hasattr ( self . config , " get_identifier " ) :
# This path is deprecated and will eventually be removed.
return self . config . get_identifier ( schema = schema , table = entity ) # type: ignore
else :
return f " { schema } . { entity } "
2021-12-20 16:11:55 +01:00
def get_foreign_key_metadata (
self ,
dataset_urn : str ,
schema : str ,
fk_dict : Dict [ str , str ] ,
inspector : Inspector ,
) - > ForeignKeyConstraint :
referred_schema : Optional [ str ] = fk_dict . get ( " referred_schema " )
if not referred_schema :
referred_schema = schema
2021-09-30 23:04:47 -07:00
referred_dataset_name = self . get_identifier (
2021-12-20 16:11:55 +01:00
schema = referred_schema ,
2021-09-30 23:04:47 -07:00
entity = fk_dict [ " referred_table " ] ,
inspector = inspector ,
)
source_fields = [
2021-12-16 20:06:33 -08:00
f " urn:li:schemaField:( { dataset_urn } , { f } ) "
2021-09-30 23:04:47 -07:00
for f in fk_dict [ " constrained_columns " ]
]
2022-01-27 15:31:25 -08:00
foreign_dataset = make_dataset_urn_with_platform_instance (
platform = self . platform ,
name = referred_dataset_name ,
platform_instance = self . config . platform_instance ,
env = self . config . env ,
2021-12-16 20:06:33 -08:00
)
2021-09-30 23:04:47 -07:00
foreign_fields = [
2021-10-28 23:14:42 -07:00
f " urn:li:schemaField:( { foreign_dataset } , { f } ) "
2021-09-30 23:04:47 -07:00
for f in fk_dict [ " referred_columns " ]
]
return ForeignKeyConstraint (
fk_dict [ " name " ] , foreign_fields , source_fields , foreign_dataset
)
2022-02-01 19:05:35 +01:00
def normalise_dataset_name ( self , dataset_name : str ) - > str :
return dataset_name
2022-02-07 18:51:49 +01:00
def _gen_domain_urn ( self , dataset_name : str ) - > Optional [ str ] :
domain_urn : Optional [ str ] = None
for domain , pattern in self . config . domain . items ( ) :
if pattern . allowed ( dataset_name ) :
2022-07-11 09:37:38 -07:00
domain_urn = make_domain_urn (
self . domain_registry . get_domain_urn ( domain )
)
2022-02-07 18:51:49 +01:00
return domain_urn
def _get_domain_wu (
self ,
dataset_name : str ,
entity_urn : str ,
entity_type : str ,
sql_config : SQLAlchemyConfig ,
2022-07-27 16:32:32 +00:00
) - > Iterable [ MetadataWorkUnit ] :
2022-02-07 18:51:49 +01:00
domain_urn = self . _gen_domain_urn ( dataset_name )
if domain_urn :
wus = add_domain_to_entity_wu (
entity_type = entity_type ,
entity_urn = entity_urn ,
domain_urn = domain_urn ,
)
for wu in wus :
self . report . report_workunit ( wu )
yield wu
2022-01-27 15:31:25 -08:00
def loop_tables ( # noqa: C901
2021-06-12 02:30:33 +02:00
self ,
2021-06-29 17:49:57 -07:00
inspector : Inspector ,
2021-06-12 02:30:33 +02:00
schema : str ,
sql_config : SQLAlchemyConfig ,
2022-02-07 18:51:49 +01:00
) - > Iterable [ Union [ SqlWorkUnit , MetadataWorkUnit ] ] :
2021-12-15 18:15:03 +01:00
tables_seen : Set [ str ] = set ( )
2022-03-08 04:02:44 +05:30
try :
for table in inspector . get_table_names ( schema ) :
schema , table = self . standardize_schema_table_names (
schema = schema , entity = table
)
dataset_name = self . get_identifier (
schema = schema , entity = table , inspector = inspector
)
2022-02-01 19:05:35 +01:00
2022-03-08 04:02:44 +05:30
dataset_name = self . normalise_dataset_name ( dataset_name )
2022-02-01 19:05:35 +01:00
2022-03-08 04:02:44 +05:30
if dataset_name not in tables_seen :
tables_seen . add ( dataset_name )
else :
logger . debug ( f " { dataset_name } has already been seen, skipping... " )
continue
2021-12-15 18:15:03 +01:00
2022-03-08 04:02:44 +05:30
self . report . report_entity_scanned ( dataset_name , ent_type = " table " )
if not sql_config . table_pattern . allowed ( dataset_name ) :
self . report . report_dropped ( dataset_name )
continue
2021-06-12 02:30:33 +02:00
2022-03-08 04:02:44 +05:30
try :
yield from self . _process_table (
dataset_name , inspector , schema , table , sql_config
)
except Exception as e :
logger . warning (
f " Unable to ingest { schema } . { table } due to an exception. \n { traceback . format_exc ( ) } "
)
self . report . report_warning (
f " { schema } . { table } " , f " Ingestion error: { e } "
)
except Exception as e :
self . report . report_failure ( f " { schema } " , f " Tables error: { e } " )
2021-09-30 23:04:47 -07:00
2022-05-27 13:09:08 +05:30
def add_information_for_schema ( self , inspector : Inspector , schema : str ) - > None :
pass
2022-05-23 21:27:13 +05:30
def get_extra_tags (
self , inspector : Inspector , schema : str , table : str
) - > Optional [ Dict [ str , List [ str ] ] ] :
return None
2022-02-20 23:32:59 +01:00
def _process_table (
self ,
dataset_name : str ,
inspector : Inspector ,
schema : str ,
table : str ,
sql_config : SQLAlchemyConfig ,
) - > Iterable [ Union [ SqlWorkUnit , MetadataWorkUnit ] ] :
columns = self . _get_columns ( dataset_name , inspector , schema , table )
dataset_urn = make_dataset_urn_with_platform_instance (
self . platform ,
dataset_name ,
self . config . platform_instance ,
self . config . env ,
)
dataset_snapshot = DatasetSnapshot (
urn = dataset_urn ,
aspects = [ StatusClass ( removed = False ) ] ,
)
if self . is_stateful_ingestion_configured ( ) :
cur_checkpoint = self . get_current_checkpoint (
self . get_default_ingestion_job_id ( )
2021-06-12 02:30:33 +02:00
)
2022-02-20 23:32:59 +01:00
if cur_checkpoint is not None :
checkpoint_state = cast (
BaseSQLAlchemyCheckpointState , cur_checkpoint . state
)
checkpoint_state . add_table_urn ( dataset_urn )
2022-03-29 16:06:48 +02:00
description , properties , location_urn = self . get_table_properties (
inspector , schema , table
)
2022-06-23 11:53:25 +02:00
# Tablename might be different from the real table if we ran some normalisation ont it.
# Getting normalized table name from the dataset_name
# Table is the last item in the dataset name
normalised_table = table
splits = dataset_name . split ( " . " )
if splits :
normalised_table = splits [ - 1 ]
if properties and normalised_table != table :
properties [ " original_table_name " ] = table
2022-03-27 18:49:26 -05:00
dataset_properties = DatasetPropertiesClass (
2022-06-23 11:53:25 +02:00
name = normalised_table ,
2022-03-27 18:49:26 -05:00
description = description ,
customProperties = properties ,
)
dataset_snapshot . aspects . append ( dataset_properties )
2022-03-29 16:06:48 +02:00
if location_urn :
external_upstream_table = UpstreamClass (
dataset = location_urn ,
type = DatasetLineageTypeClass . COPY ,
)
lineage_mcpw = MetadataChangeProposalWrapper (
entityType = " dataset " ,
changeType = ChangeTypeClass . UPSERT ,
entityUrn = dataset_snapshot . urn ,
aspectName = " upstreamLineage " ,
aspect = UpstreamLineage ( upstreams = [ external_upstream_table ] ) ,
)
lineage_wu = MetadataWorkUnit (
id = f " { self . platform } - { lineage_mcpw . entityUrn } - { lineage_mcpw . aspectName } " ,
mcp = lineage_mcpw ,
)
2022-04-13 13:49:37 +05:30
self . report . report_workunit ( lineage_wu )
2022-03-29 16:06:48 +02:00
yield lineage_wu
2022-05-23 21:27:13 +05:30
extra_tags = self . get_extra_tags ( inspector , schema , table )
2022-02-20 23:32:59 +01:00
pk_constraints : dict = inspector . get_pk_constraint ( table , schema )
foreign_keys = self . _get_foreign_keys ( dataset_urn , inspector , schema , table )
2022-05-23 21:27:13 +05:30
schema_fields = self . get_schema_fields (
dataset_name , columns , pk_constraints , tags = extra_tags
)
2022-02-20 23:32:59 +01:00
schema_metadata = get_schema_metadata (
self . report ,
dataset_name ,
self . platform ,
columns ,
pk_constraints ,
foreign_keys ,
schema_fields ,
)
dataset_snapshot . aspects . append ( schema_metadata )
db_name = self . get_db_name ( inspector )
yield from self . add_table_to_schema_container ( dataset_urn , db_name , schema )
mce = MetadataChangeEvent ( proposedSnapshot = dataset_snapshot )
wu = SqlWorkUnit ( id = dataset_name , mce = mce )
self . report . report_workunit ( wu )
yield wu
dpi_aspect = self . get_dataplatform_instance_aspect ( dataset_urn = dataset_urn )
if dpi_aspect :
yield dpi_aspect
2022-03-27 18:49:26 -05:00
subtypes_aspect = MetadataWorkUnit (
id = f " { dataset_name } -subtypes " ,
mcp = MetadataChangeProposalWrapper (
entityType = " dataset " ,
changeType = ChangeTypeClass . UPSERT ,
entityUrn = dataset_urn ,
aspectName = " subTypes " ,
aspect = SubTypesClass ( typeNames = [ " table " ] ) ,
) ,
)
2022-04-13 13:49:37 +05:30
self . report . report_workunit ( subtypes_aspect )
2022-03-27 18:49:26 -05:00
yield subtypes_aspect
2022-02-20 23:32:59 +01:00
yield from self . _get_domain_wu (
dataset_name = dataset_name ,
entity_urn = dataset_urn ,
entity_type = " dataset " ,
sql_config = sql_config ,
)
2022-02-07 18:51:49 +01:00
2022-02-14 22:51:45 +01:00
def get_table_properties (
self , inspector : Inspector , schema : str , table : str
2022-03-29 16:06:48 +02:00
) - > Tuple [ Optional [ str ] , Optional [ Dict [ str , str ] ] , Optional [ str ] ] :
2022-02-14 22:51:45 +01:00
try :
2022-03-29 16:06:48 +02:00
location : Optional [ str ] = None
2022-02-14 22:51:45 +01:00
# SQLALchemy stubs are incomplete and missing this method.
# PR: https://github.com/dropbox/sqlalchemy-stubs/pull/223.
table_info : dict = inspector . get_table_comment ( table , schema ) # type: ignore
except NotImplementedError :
description : Optional [ str ] = None
properties : Dict [ str , str ] = { }
except ProgrammingError as pe :
# Snowflake needs schema names quoted when fetching table comments.
logger . debug (
f " Encountered ProgrammingError. Retrying with quoted schema name for schema { schema } and table { table } " ,
pe ,
)
description = None
properties = { }
table_info : dict = inspector . get_table_comment ( table , f ' " { schema } " ' ) # type: ignore
else :
description = table_info [ " text " ]
# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = table_info . get ( " properties " , { } )
2022-03-29 16:06:48 +02:00
return description , properties , location
2022-02-14 22:51:45 +01:00
2022-01-31 08:49:03 -08:00
def get_dataplatform_instance_aspect (
self , dataset_urn : str
) - > Optional [ SqlWorkUnit ] :
# If we are a platform instance based source, emit the instance aspect
if self . config . platform_instance :
mcp = MetadataChangeProposalWrapper (
entityType = " dataset " ,
changeType = ChangeTypeClass . UPSERT ,
entityUrn = dataset_urn ,
aspectName = " dataPlatformInstance " ,
aspect = DataPlatformInstanceClass (
platform = make_data_platform_urn ( self . platform ) ,
instance = make_dataplatform_instance_urn (
self . platform , self . config . platform_instance
2022-01-27 15:31:25 -08:00
) ,
2022-01-31 08:49:03 -08:00
) ,
)
wu = SqlWorkUnit ( id = f " { dataset_urn } -dataPlatformInstance " , mcp = mcp )
self . report . report_workunit ( wu )
return wu
else :
return None
2022-01-27 15:31:25 -08:00
2022-02-07 18:51:49 +01:00
def _get_columns (
self , dataset_name : str , inspector : Inspector , schema : str , table : str
) - > List [ dict ] :
columns = [ ]
try :
columns = inspector . get_columns ( table , schema )
if len ( columns ) == 0 :
2022-06-08 23:03:32 +05:30
self . report . report_warning ( MISSING_COLUMN_INFO , dataset_name )
2022-02-07 18:51:49 +01:00
except Exception as e :
self . report . report_warning (
dataset_name ,
f " unable to get column information due to an error -> { e } " ,
)
return columns
def _get_foreign_keys (
self , dataset_urn : str , inspector : Inspector , schema : str , table : str
) - > List [ ForeignKeyConstraint ] :
try :
foreign_keys = [
self . get_foreign_key_metadata ( dataset_urn , schema , fk_rec , inspector )
for fk_rec in inspector . get_foreign_keys ( table , schema )
]
except KeyError :
# certain databases like MySQL cause issues due to lower-case/upper-case irregularities
logger . debug (
f " { dataset_urn } : failure in foreign key extraction... skipping "
)
foreign_keys = [ ]
return foreign_keys
2021-10-19 11:23:51 +05:30
def get_schema_fields (
2022-05-23 21:27:13 +05:30
self ,
dataset_name : str ,
columns : List [ dict ] ,
pk_constraints : dict = None ,
tags : Optional [ Dict [ str , List [ str ] ] ] = None ,
2021-10-19 11:23:51 +05:30
) - > List [ SchemaField ] :
canonical_schema = [ ]
for column in columns :
2022-05-23 21:27:13 +05:30
column_tags : Optional [ List [ str ] ] = None
if tags :
column_tags = tags . get ( column [ " name " ] , [ ] )
2021-10-19 11:23:51 +05:30
fields = self . get_schema_fields_for_column (
2022-05-23 21:27:13 +05:30
dataset_name , column , pk_constraints , tags = column_tags
2021-10-19 11:23:51 +05:30
)
canonical_schema . extend ( fields )
return canonical_schema
def get_schema_fields_for_column (
2022-05-23 21:27:13 +05:30
self ,
dataset_name : str ,
column : dict ,
pk_constraints : dict = None ,
tags : Optional [ List [ str ] ] = None ,
2021-10-19 11:23:51 +05:30
) - > List [ SchemaField ] :
2022-05-23 21:27:13 +05:30
gtc : Optional [ GlobalTagsClass ] = None
if tags :
tags_str = [ make_tag_urn ( t ) for t in tags ]
tags_tac = [ TagAssociationClass ( t ) for t in tags_str ]
gtc = GlobalTagsClass ( tags_tac )
2021-10-19 11:23:51 +05:30
field = SchemaField (
fieldPath = column [ " name " ] ,
type = get_column_type ( self . report , dataset_name , column [ " type " ] ) ,
nativeDataType = column . get ( " full_type " , repr ( column [ " type " ] ) ) ,
description = column . get ( " comment " , None ) ,
nullable = column [ " nullable " ] ,
recursive = False ,
2022-05-23 21:27:13 +05:30
globalTags = gtc ,
2021-10-19 11:23:51 +05:30
)
if (
pk_constraints is not None
and isinstance ( pk_constraints , dict ) # some dialects (hive) return list
and column [ " name " ] in pk_constraints . get ( " constrained_columns " , [ ] )
) :
field . isPartOfKey = True
return [ field ]
2021-06-12 02:30:33 +02:00
def loop_views (
self ,
2021-06-29 17:49:57 -07:00
inspector : Inspector ,
2021-06-12 02:30:33 +02:00
schema : str ,
sql_config : SQLAlchemyConfig ,
2022-02-07 18:51:49 +01:00
) - > Iterable [ Union [ SqlWorkUnit , MetadataWorkUnit ] ] :
2022-03-08 04:02:44 +05:30
try :
for view in inspector . get_view_names ( schema ) :
schema , view = self . standardize_schema_table_names (
schema = schema , entity = view
)
dataset_name = self . get_identifier (
schema = schema , entity = view , inspector = inspector
)
dataset_name = self . normalise_dataset_name ( dataset_name )
2022-02-01 19:05:35 +01:00
2022-03-08 04:02:44 +05:30
self . report . report_entity_scanned ( dataset_name , ent_type = " view " )
2021-06-12 02:30:33 +02:00
2022-03-08 04:02:44 +05:30
if not sql_config . view_pattern . allowed ( dataset_name ) :
self . report . report_dropped ( dataset_name )
continue
2021-03-11 16:41:05 -05:00
2022-03-08 04:02:44 +05:30
try :
yield from self . _process_view (
dataset_name = dataset_name ,
inspector = inspector ,
schema = schema ,
view = view ,
sql_config = sql_config ,
)
except Exception as e :
logger . warning (
f " Unable to ingest view { schema } . { view } due to an exception. \n { traceback . format_exc ( ) } "
)
self . report . report_warning (
f " { schema } . { view } " , f " Ingestion error: { e } "
)
except Exception as e :
self . report . report_failure ( f " { schema } " , f " Views error: { e } " )
2021-07-02 10:26:50 -07:00
2022-02-20 23:32:59 +01:00
def _process_view (
self ,
dataset_name : str ,
inspector : Inspector ,
schema : str ,
view : str ,
sql_config : SQLAlchemyConfig ,
) - > Iterable [ Union [ SqlWorkUnit , MetadataWorkUnit ] ] :
try :
columns = inspector . get_columns ( view , schema )
except KeyError :
# For certain types of views, we are unable to fetch the list of columns.
self . report . report_warning (
dataset_name , " unable to get schema for this view "
2022-01-27 15:31:25 -08:00
)
2022-02-20 23:32:59 +01:00
schema_metadata = None
else :
schema_fields = self . get_schema_fields ( dataset_name , columns )
schema_metadata = get_schema_metadata (
self . report ,
dataset_name ,
self . platform ,
columns ,
canonical_schema = schema_fields ,
2021-06-12 02:30:33 +02:00
)
2022-02-20 23:32:59 +01:00
try :
# SQLALchemy stubs are incomplete and missing this method.
# PR: https://github.com/dropbox/sqlalchemy-stubs/pull/223.
view_info : dict = inspector . get_table_comment ( view , schema ) # type: ignore
except NotImplementedError :
description : Optional [ str ] = None
properties : Dict [ str , str ] = { }
2022-06-13 14:59:16 +05:30
except ProgrammingError as pe :
# Snowflake needs schema names quoted when fetching table comments.
logger . debug (
f " Encountered ProgrammingError. Retrying with quoted schema name for schema { schema } and view { view } " ,
pe ,
)
description = None
properties = { }
view_info : dict = inspector . get_table_comment ( view , f ' " { schema } " ' ) # type: ignore
2022-02-20 23:32:59 +01:00
else :
description = view_info [ " text " ]
2021-12-16 20:06:33 -08:00
2022-02-20 23:32:59 +01:00
# The "properties" field is a non-standard addition to SQLAlchemy's interface.
properties = view_info . get ( " properties " , { } )
try :
view_definition = inspector . get_view_definition ( view , schema )
if view_definition is None :
view_definition = " "
else :
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str ( view_definition )
except NotImplementedError :
view_definition = " "
properties [ " view_definition " ] = view_definition
properties [ " is_view " ] = " True "
dataset_urn = make_dataset_urn_with_platform_instance (
self . platform ,
dataset_name ,
self . config . platform_instance ,
self . config . env ,
)
dataset_snapshot = DatasetSnapshot (
urn = dataset_urn ,
aspects = [ StatusClass ( removed = False ) ] ,
)
db_name = self . get_db_name ( inspector )
yield from self . add_table_to_schema_container ( dataset_urn , db_name , schema )
if self . is_stateful_ingestion_configured ( ) :
cur_checkpoint = self . get_current_checkpoint (
self . get_default_ingestion_job_id ( )
)
if cur_checkpoint is not None :
checkpoint_state = cast (
BaseSQLAlchemyCheckpointState , cur_checkpoint . state
2021-06-12 02:30:33 +02:00
)
2022-02-20 23:32:59 +01:00
checkpoint_state . add_view_urn ( dataset_urn )
2022-03-27 18:49:26 -05:00
dataset_properties = DatasetPropertiesClass (
name = view ,
description = description ,
customProperties = properties ,
)
dataset_snapshot . aspects . append ( dataset_properties )
2022-02-20 23:32:59 +01:00
if schema_metadata :
dataset_snapshot . aspects . append ( schema_metadata )
mce = MetadataChangeEvent ( proposedSnapshot = dataset_snapshot )
wu = SqlWorkUnit ( id = dataset_name , mce = mce )
self . report . report_workunit ( wu )
yield wu
dpi_aspect = self . get_dataplatform_instance_aspect ( dataset_urn = dataset_urn )
if dpi_aspect :
yield dpi_aspect
2022-03-27 18:49:26 -05:00
subtypes_aspect = MetadataWorkUnit (
id = f " { view } -subtypes " ,
mcp = MetadataChangeProposalWrapper (
entityType = " dataset " ,
changeType = ChangeTypeClass . UPSERT ,
entityUrn = dataset_urn ,
aspectName = " subTypes " ,
aspect = SubTypesClass ( typeNames = [ " view " ] ) ,
) ,
)
2022-04-13 13:49:37 +05:30
self . report . report_workunit ( subtypes_aspect )
2022-03-27 18:49:26 -05:00
yield subtypes_aspect
if " view_definition " in properties :
view_definition_string = properties [ " view_definition " ]
view_properties_aspect = ViewPropertiesClass (
materialized = False , viewLanguage = " SQL " , viewLogic = view_definition_string
)
2022-04-13 13:49:37 +05:30
view_properties_wu = MetadataWorkUnit (
2022-03-27 18:49:26 -05:00
id = f " { view } -viewProperties " ,
mcp = MetadataChangeProposalWrapper (
entityType = " dataset " ,
changeType = ChangeTypeClass . UPSERT ,
entityUrn = dataset_urn ,
aspectName = " viewProperties " ,
aspect = view_properties_aspect ,
) ,
)
2022-04-13 13:49:37 +05:30
self . report . report_workunit ( view_properties_wu )
yield view_properties_wu
2022-03-27 18:49:26 -05:00
2022-02-20 23:32:59 +01:00
yield from self . _get_domain_wu (
dataset_name = dataset_name ,
entity_urn = dataset_urn ,
entity_type = " dataset " ,
sql_config = sql_config ,
)
2022-02-07 18:51:49 +01:00
def add_table_to_schema_container (
self , dataset_urn : str , db_name : str , schema : str
) - > Iterable [ Union [ MetadataWorkUnit , SqlWorkUnit ] ] :
schema_container_key = self . gen_schema_key ( db_name , schema )
container_workunits = add_dataset_to_container (
container_key = schema_container_key ,
dataset_urn = dataset_urn ,
)
for wu in container_workunits :
self . report . report_workunit ( wu )
yield wu
2022-06-27 14:21:54 +05:30
def get_profiler_instance ( self , inspector : Inspector ) - > " DatahubGEProfiler " :
2021-07-30 17:41:03 -07:00
from datahub . ingestion . source . ge_data_profiler import DatahubGEProfiler
2021-10-27 19:49:40 -07:00
return DatahubGEProfiler (
2022-02-20 16:39:25 -05:00
conn = inspector . bind ,
report = self . report ,
config = self . config . profiling ,
platform = self . platform ,
2021-10-27 19:49:40 -07:00
)
2021-07-30 17:41:03 -07:00
2022-06-27 14:21:54 +05:30
def get_profile_args ( self ) - > Dict :
""" Passed down to GE profiler """
return { }
2022-02-01 19:05:35 +01:00
# Override if needed
def generate_partition_profiler_query (
2022-02-08 19:09:30 +01:00
self , schema : str , table : str , partition_datetime : Optional [ datetime . datetime ]
2022-02-01 19:05:35 +01:00
) - > Tuple [ Optional [ str ] , Optional [ str ] ] :
return None , None
2022-06-08 23:03:32 +05:30
def is_table_partitioned (
self , database : Optional [ str ] , schema : str , table : str
) - > Optional [ bool ] :
return None
2022-06-13 14:59:16 +05:30
# Override if needed
def generate_profile_candidates (
2022-07-06 15:29:26 +05:30
self ,
inspector : Inspector ,
threshold_time : Optional [ datetime . datetime ] ,
schema : str ,
2022-06-13 14:59:16 +05:30
) - > Optional [ List [ str ] ] :
raise NotImplementedError ( )
2022-02-01 19:05:35 +01:00
# Override if you want to do additional checks
2022-05-06 02:07:31 -07:00
def is_dataset_eligible_for_profiling (
2022-06-13 14:59:16 +05:30
self ,
dataset_name : str ,
sql_config : SQLAlchemyConfig ,
inspector : Inspector ,
profile_candidates : Optional [ List [ str ] ] ,
2022-02-01 19:05:35 +01:00
) - > bool :
2022-06-13 14:59:16 +05:30
return (
sql_config . table_pattern . allowed ( dataset_name )
and sql_config . profile_pattern . allowed ( dataset_name )
) and (
profile_candidates is None
or ( profile_candidates is not None and dataset_name in profile_candidates )
)
2022-02-01 19:05:35 +01:00
2021-10-20 01:24:20 -04:00
def loop_profiler_requests (
2021-07-30 17:41:03 -07:00
self ,
inspector : Inspector ,
schema : str ,
sql_config : SQLAlchemyConfig ,
2021-10-20 01:24:20 -04:00
) - > Iterable [ " GEProfilerRequest " ] :
from datahub . ingestion . source . ge_data_profiler import GEProfilerRequest
2022-02-01 19:05:35 +01:00
tables_seen : Set [ str ] = set ( )
2022-06-13 14:59:16 +05:30
profile_candidates = None # Default value if profile candidates not available.
2022-07-06 15:29:26 +05:30
if (
sql_config . profiling . profile_if_updated_since_days is not None
or sql_config . profiling . profile_table_size_limit is not None
or sql_config . profiling . profile_table_row_limit is None
) :
2022-06-13 14:59:16 +05:30
try :
2022-07-06 15:29:26 +05:30
threshold_time : Optional [ datetime . datetime ] = None
if sql_config . profiling . profile_if_updated_since_days is not None :
threshold_time = datetime . datetime . now (
datetime . timezone . utc
) - datetime . timedelta (
sql_config . profiling . profile_if_updated_since_days
)
2022-06-13 14:59:16 +05:30
profile_candidates = self . generate_profile_candidates (
2022-07-06 15:29:26 +05:30
inspector , threshold_time , schema
2022-06-13 14:59:16 +05:30
)
except NotImplementedError :
logger . debug ( " Source does not support generating profile candidates. " )
2022-02-01 19:05:35 +01:00
2021-07-30 17:41:03 -07:00
for table in inspector . get_table_names ( schema ) :
schema , table = self . standardize_schema_table_names (
schema = schema , entity = table
)
dataset_name = self . get_identifier (
schema = schema , entity = table , inspector = inspector
)
2022-06-13 14:59:16 +05:30
if not self . is_dataset_eligible_for_profiling (
dataset_name , sql_config , inspector , profile_candidates
) :
2022-05-20 22:12:55 +05:30
if self . config . profiling . report_dropped_profiles :
self . report . report_dropped ( f " profile of { dataset_name } " )
2021-07-30 17:41:03 -07:00
continue
2022-02-01 19:05:35 +01:00
dataset_name = self . normalise_dataset_name ( dataset_name )
if dataset_name not in tables_seen :
tables_seen . add ( dataset_name )
else :
logger . debug ( f " { dataset_name } has already been seen, skipping... " )
continue
2022-06-08 23:03:32 +05:30
missing_column_info_warn = self . report . warnings . get ( MISSING_COLUMN_INFO )
if (
missing_column_info_warn is not None
and dataset_name in missing_column_info_warn
) :
continue
2022-02-01 19:05:35 +01:00
( partition , custom_sql ) = self . generate_partition_profiler_query (
2022-02-08 19:09:30 +01:00
schema , table , self . config . profiling . partition_datetime
2022-02-01 19:05:35 +01:00
)
2022-06-08 23:03:32 +05:30
if partition is None and self . is_table_partitioned (
database = None , schema = schema , table = table
) :
self . report . report_warning (
2022-07-13 13:21:45 +02:00
" profile skipped as partitioned table is empty or partition id was invalid " ,
dataset_name ,
2022-06-08 23:03:32 +05:30
)
continue
2022-03-02 07:29:48 +01:00
if (
partition is not None
and not self . config . profiling . partition_profiling_enabled
) :
logger . debug (
f " { dataset_name } and partition { partition } is skipped because profiling.partition_profiling_enabled property is disabled "
)
continue
2021-12-14 02:06:49 -05:00
self . report . report_entity_profiled ( dataset_name )
2022-06-27 14:21:54 +05:30
logger . debug (
f " Preparing profiling request for { schema } , { table } , { partition } "
)
2021-10-20 01:24:20 -04:00
yield GEProfilerRequest (
2021-07-30 17:41:03 -07:00
pretty_name = dataset_name ,
2022-02-01 19:05:35 +01:00
batch_kwargs = self . prepare_profiler_args (
2022-07-01 17:47:07 +02:00
inspector = inspector ,
2022-02-01 19:05:35 +01:00
schema = schema ,
table = table ,
partition = partition ,
custom_sql = custom_sql ,
) ,
2021-07-30 17:41:03 -07:00
)
2021-10-20 01:24:20 -04:00
def loop_profiler (
2022-05-19 10:43:06 +05:30
self ,
profile_requests : List [ " GEProfilerRequest " ] ,
profiler : " DatahubGEProfiler " ,
platform : Optional [ str ] = None ,
2021-10-20 01:24:20 -04:00
) - > Iterable [ MetadataWorkUnit ] :
for request , profile in profiler . generate_profiles (
2022-06-27 14:21:54 +05:30
profile_requests ,
self . config . profiling . max_workers ,
platform = platform ,
profiler_args = self . get_profile_args ( ) ,
2021-10-20 01:24:20 -04:00
) :
2021-10-27 19:49:40 -07:00
if profile is None :
continue
2021-10-20 01:24:20 -04:00
dataset_name = request . pretty_name
2022-01-27 15:31:25 -08:00
dataset_urn = make_dataset_urn_with_platform_instance (
self . platform ,
dataset_name ,
self . config . platform_instance ,
self . config . env ,
)
2021-07-30 17:41:03 -07:00
mcp = MetadataChangeProposalWrapper (
entityType = " dataset " ,
2021-12-16 20:06:33 -08:00
entityUrn = dataset_urn ,
2021-07-30 17:41:03 -07:00
changeType = ChangeTypeClass . UPSERT ,
aspectName = " datasetProfile " ,
aspect = profile ,
)
wu = MetadataWorkUnit ( id = f " profile- { dataset_name } " , mcp = mcp )
self . report . report_workunit ( wu )
2021-12-16 20:06:33 -08:00
2021-07-30 17:41:03 -07:00
yield wu
2022-02-01 19:05:35 +01:00
def prepare_profiler_args (
self ,
2022-07-01 17:47:07 +02:00
inspector : Inspector ,
2022-02-01 19:05:35 +01:00
schema : str ,
table : str ,
partition : Optional [ str ] ,
custom_sql : Optional [ str ] = None ,
) - > dict :
2021-07-30 17:41:03 -07:00
return dict (
2022-02-01 19:05:35 +01:00
schema = schema , table = table , partition = partition , custom_sql = custom_sql
2021-07-30 17:41:03 -07:00
)
2021-02-09 15:58:26 -08:00
def get_report ( self ) :
return self . report
2021-02-11 21:34:36 -08:00
2021-02-09 01:02:05 -08:00
def close ( self ) :
2022-02-02 13:19:15 -08:00
self . update_default_job_run_summary ( )
self . prepare_for_commit ( )