Fix linting (#1958)

This commit is contained in:
Pere Miquel Brull 2021-12-29 17:33:40 +01:00 committed by GitHub
parent b11e503117
commit 1e334af89c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 175 additions and 115 deletions

View File

@ -1,6 +1,10 @@
[BASIC]
# W1203: logging-fstring-interpolation - f-string brings better readability and unifies style
disable=W1203
# W1202: logging-format-interpolation - lazy formatting in logging functions
disable=W1203,W1202
docstring-min-length=20
max-args=7
max-attributes=12
[MASTER]
fail-under=6.0

View File

@ -8,6 +8,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Generic source to build SQL connectors.
"""
import json
import logging
import re
@ -16,7 +19,7 @@ import uuid
from abc import abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Type
from typing import Dict, Iterable, List, Optional, Tuple
from urllib.parse import quote_plus
from pydantic import SecretStr
@ -54,20 +57,22 @@ logger: logging.Logger = logging.getLogger(__name__)
@dataclass
class SQLSourceStatus(SourceStatus):
"""
Reports the source status after ingestion
"""
success: List[str] = field(default_factory=list)
failures: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
filtered: List[str] = field(default_factory=list)
def scanned(self, table_name: str) -> None:
self.success.append(table_name)
logger.info("Table Scanned: {}".format(table_name))
def scanned(self, record: str) -> None:
self.success.append(record)
logger.info(f"Table Scanned: {record}")
def filter(
self, table_name: str, err: str, dataset_name: str = None, col_type: str = None
) -> None:
self.filtered.append(table_name)
logger.warning("Dropped Table {} due to {}".format(table_name, err))
def filter(self, record: str, err: str) -> None:
self.filtered.append(record)
logger.warning(f"Dropped Table {record} due to {err}")
def build_sql_source_connection_url(
@ -76,8 +81,11 @@ def build_sql_source_connection_url(
username: Optional[str] = None,
password: Optional[SecretStr] = None,
database: Optional[str] = None,
options: dict = {},
options: Optional[dict] = None,
) -> str:
"""
Helper function to prepare the db URL
"""
url = f"{scheme}://"
if username is not None:
@ -100,6 +108,12 @@ def build_sql_source_connection_url(
class SQLConnectionConfig(ConfigModel):
"""
Config class containing all supported
configurations for an SQL source, including
data profiling and DBT generated information.
"""
username: Optional[str] = None
password: Optional[SecretStr] = None
host_port: str
@ -144,7 +158,8 @@ def _get_table_description(schema: str, table: str, inspector: Inspector) -> str
description = None
try:
table_info: dict = inspector.get_table_comment(table, schema)
except Exception as err:
# Catch any exception without breaking the ingestion
except Exception as err: # pylint: disable=broad-except
logger.error(f"Table Description Error : {err}")
else:
description = table_info["text"]
@ -152,6 +167,12 @@ def _get_table_description(schema: str, table: str, inspector: Inspector) -> str
class SQLSource(Source[OMetaDatabaseAndTable]):
"""
Source Connector implementation to extract
Database & Table information and convert it
to OpenMetadata Entities
"""
def __init__(
self,
config: SQLConnectionConfig,
@ -174,26 +195,38 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
self.data_profiler = None
self.data_models = {}
if self.config.dbt_catalog_file is not None:
self.dbt_catalog = json.load(open(self.config.dbt_catalog_file, "r"))
with open(self.config.dbt_catalog_file, "r", encoding="utf-8") as catalog:
self.dbt_catalog = json.load(catalog)
if self.config.dbt_manifest_file is not None:
self.dbt_manifest = json.load(open(self.config.dbt_manifest_file, "r"))
with open(self.config.dbt_manifest_file, "r", encoding="utf-8") as manifest:
self.dbt_manifest = json.load(manifest)
def _instantiate_profiler(self):
def _instantiate_profiler(self) -> bool:
"""
If the profiler is configured, load it and run.
Return True if the profiling ran correctly
"""
try:
if self.config.data_profiler_enabled:
if self.data_profiler is None:
# pylint: disable=import-outside-toplevel
from metadata.profiler.dataprofiler import DataProfiler
# pylint: enable=import-outside-toplevel
self.data_profiler = DataProfiler(
status=self.status, connection_str=self.connection_string
)
return True
return False
except Exception:
# Catch any errors during profiling init and continue ingestion
except Exception as exc: # pylint: disable=broad-except
logger.error(
f"Error loading profiler {exc}"
"DataProfiler configuration is enabled. Please make sure you ran "
"pip install 'openmetadata-ingestion[data-profiler]'"
)
return False
def prepare(self):
self._parse_data_model()
@ -204,15 +237,15 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
):
pass
def type_of_column_name(self, sa_type, table_name: str, column_name: str):
return sa_type
def standardize_schema_table_names(
self, schema: str, table: str
) -> Tuple[str, str]:
@staticmethod
def standardize_schema_table_names(schema: str, table: str) -> Tuple[str, str]:
return schema, table
def fetch_sample_data(self, schema: str, table: str):
def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]:
"""
Get some sample data from the source to be added
to the Table Entities
"""
try:
query = self.config.query.format(schema, table)
logger.info(query)
@ -221,14 +254,14 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
for col in results.keys():
cols.append(col.replace(".", "_DOT_"))
rows = []
for r in results:
row = list(r)
for res in results:
row = list(res)
rows.append(row)
return TableData(columns=cols, rows=rows)
except Exception as err:
logger.error(
"Failed to generate sample data for {} - {}".format(table, err)
)
# Catch any errors and continue the ingestion
except Exception as err: # pylint: disable=broad-except
logger.error(f"Failed to generate sample data for {table} - {err}")
return None
def next_record(self) -> Iterable[OMetaDatabaseAndTable]:
inspector = inspect(self.engine)
@ -236,7 +269,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
if not self.sql_config.schema_filter_pattern.included(schema):
self.status.filter(schema, "Schema pattern not allowed")
continue
logger.debug("total tables {}".format(inspector.get_table_names(schema)))
logger.debug(f"Total tables {inspector.get_table_names(schema)}")
if self.config.include_tables:
yield from self.fetch_tables(inspector, schema)
if self.config.include_views:
@ -245,6 +278,10 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
def fetch_tables(
self, inspector: Inspector, schema: str
) -> Iterable[OMetaDatabaseAndTable]:
"""
Scrape an SQL schema and prepare Database and Table
OpenMetadata Entities
"""
for table_name in inspector.get_table_names(schema):
try:
schema, table_name = self.standardize_schema_table_names(
@ -252,13 +289,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
)
if not self.sql_config.table_filter_pattern.included(table_name):
self.status.filter(
"{}.{}".format(self.config.get_service_name(), table_name),
f"{self.config.get_service_name()}.{table_name}",
"Table pattern not allowed",
)
continue
self.status.scanned(
"{}.{}".format(self.config.get_service_name(), table_name)
)
self.status.scanned(f"{self.config.get_service_name()}.{table_name}")
description = _get_table_description(schema, table_name, inspector)
fqn = f"{self.config.service_name}.{self.config.database}.{schema}.{table_name}"
@ -275,7 +310,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
if self.sql_config.generate_sample_data:
table_data = self.fetch_sample_data(schema, table_name)
table_entity.sampleData = table_data
except Exception as err:
# Catch any errors during the ingestion and continue
except Exception as err: # pylint: disable=broad-except
logger.error(repr(err))
logger.error(err)
@ -291,16 +327,19 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
table=table_entity, database=self._get_database(schema)
)
yield table_and_db
except Exception as err:
# Catch any errors during the ingestion and continue
except Exception as err: # pylint: disable=broad-except
logger.error(err)
self.status.warnings.append(
"{}.{}".format(self.config.service_name, table_name)
)
self.status.warnings.append(f"{self.config.service_name}.{table_name}")
continue
def fetch_views(
self, inspector: Inspector, schema: str
) -> Iterable[OMetaDatabaseAndTable]:
"""
Get all views in the SQL schema and prepare
Database & Table OpenMetadata Entities
"""
for view_name in inspector.get_view_names(schema):
try:
if self.config.scheme == "bigquery":
@ -309,12 +348,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
)
if not self.sql_config.table_filter_pattern.included(view_name):
self.status.filter(
"{}.{}".format(self.config.get_service_name(), view_name),
f"{self.config.get_service_name()}.{view_name}",
"View pattern not allowed",
)
continue
try:
if self.config.scheme == "bigquery":
view_definition = inspector.get_view_definition(
f"{self.config.project_id}.{schema}.{view_name}"
@ -329,17 +367,15 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
except NotImplementedError:
view_definition = ""
description = _get_table_description(schema, view_name, inspector)
table_columns = self._get_columns(schema, view_name, inspector)
view_name = view_name.replace(".", "_DOT_")
fqn = f"{self.config.service_name}.{self.config.database}.{schema}.{view_name}"
table = Table(
id=uuid.uuid4(),
name=view_name,
name=view_name.replace(".", "_DOT_"),
tableType="View",
description=description if description is not None else " ",
fullyQualifiedName=fqn,
columns=table_columns,
description=_get_table_description(schema, view_name, inspector)
or "",
# This will be generated in the backend!! #1673
fullyQualifiedName=view_name,
columns=self._get_columns(schema, view_name, inspector),
viewDefinition=view_definition,
)
if self.sql_config.generate_sample_data:
@ -350,31 +386,34 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
table=table, database=self._get_database(schema)
)
yield table_and_db
except Exception as err:
# Catch any errors and continue the ingestion
except Exception as err: # pylint: disable=broad-except
logger.error(err)
self.status.warnings.append(
"{}.{}".format(self.config.service_name, view_name)
)
self.status.warnings.append(f"{self.config.service_name}.{view_name}")
continue
def _parse_data_model(self):
"""
Get all the DBT information and feed it to the Table Entity
"""
if self.config.dbt_manifest_file and self.config.dbt_catalog_file:
logger.info("Parsing Data Models")
manifest_nodes = self.dbt_manifest["nodes"]
manifest_sources = self.dbt_manifest["sources"]
manifest_entities = {**manifest_nodes, **manifest_sources}
catalog_nodes = self.dbt_catalog["nodes"]
catalog_sources = self.dbt_catalog["sources"]
catalog_entities = {**catalog_nodes, **catalog_sources}
manifest_entities = {
**self.dbt_manifest["nodes"],
**self.dbt_manifest["sources"],
}
catalog_entities = {
**self.dbt_catalog["nodes"],
**self.dbt_catalog["sources"],
}
for key, mnode in manifest_entities.items():
name = mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
cnode = catalog_entities.get(key)
if cnode is not None:
columns = self._parse_data_model_columns(name, mnode, cnode)
else:
columns = []
columns = (
self._parse_data_model_columns(name, mnode, cnode) if cnode else []
)
if mnode["resource_type"] == "test":
continue
upstream_nodes = self._parse_data_model_upstream(mnode)
@ -382,14 +421,12 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
mnode["alias"] if "alias" in mnode.keys() else mnode["name"]
)
model_name = model_name.replace(".", "_DOT_")
description = mnode.get("description", "")
schema = mnode["schema"]
path = f"{mnode['root_path']}/{mnode['original_file_path']}"
raw_sql = mnode.get("raw_sql", "")
model = DataModel(
modelType=ModelType.DBT,
description=description,
path=path,
description=mnode.get("description", ""),
path=f"{mnode['root_path']}/{mnode['original_file_path']}",
rawSql=raw_sql,
sql=mnode.get("compiled_sql", raw_sql),
columns=columns,
@ -403,12 +440,14 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
if "depends_on" in mnode and "nodes" in mnode["depends_on"]:
for node in mnode["depends_on"]["nodes"]:
try:
node_type, database, table = node.split(".", 2)
_, database, table = node.split(".", 2)
table = table.replace(".", "_DOT_")
table_fqn = f"{self.config.service_name}.{database}.{table}"
upstream_nodes.append(table_fqn)
except Exception:
logger.error(f"Failed to parse the node {node} to capture lineage")
except Exception as err: # pylint: disable=broad-except
logger.error(
f"Failed to parse the node {node} to capture lineage {err}"
)
continue
return upstream_nodes
@ -443,8 +482,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
ordinalPosition=ccolumn["index"],
)
columns.append(col)
except Exception as e:
logger.error(f"Failed to parse column type due to {e}")
except Exception as err: # pylint: disable=broad-except
logger.error(f"Failed to parse column type due to {err}")
return columns
@ -454,33 +493,58 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
service=EntityReference(id=self.service.id, type=self.config.service_type),
)
def parse_raw_data_type(self, raw_data_type):
return raw_data_type
@staticmethod
def _get_column_constraints(
column, pk_columns, unique_columns
) -> Optional[Constraint]:
"""
Prepare column constraints for the Table Entity
"""
constraint = None
if column["nullable"]:
constraint = Constraint.NULL
elif not column["nullable"]:
constraint = Constraint.NOT_NULL
if column["name"] in pk_columns:
constraint = Constraint.PRIMARY_KEY
elif column["name"] in unique_columns:
constraint = Constraint.UNIQUE
return constraint
def _get_columns(
self, schema: str, table: str, inspector: Inspector
) -> List[Column]:
) -> Optional[List[Column]]:
"""
Get columns types and constraints information
"""
# Get inspector information:
pk_constraints = inspector.get_pk_constraint(table, schema)
try:
unique_constraints = inspector.get_unique_constraints(table, schema)
except NotImplementedError:
logger.warning("Cannot obtain constraints - NotImplementedError")
unique_constraints = []
pk_columns = (
pk_constraints["column_constraints"]
if len(pk_constraints) > 0 and "column_constraints" in pk_constraints.keys()
else {}
)
unique_constraints = []
try:
unique_constraints = inspector.get_unique_constraints(table, schema)
except NotImplementedError:
pass
unique_columns = []
for constraint in unique_constraints:
if "column_names" in constraint.keys():
unique_columns = constraint["column_names"]
unique_columns = [
constraint["column_names"]
for constraint in unique_constraints
if "column_names" in constraint.keys()
]
dataset_name = f"{schema}.{table}"
columns = inspector.get_columns(table, schema)
table_columns = []
row_order = 1
try:
for column in columns:
for row_order, column in enumerate(inspector.get_columns(table, schema)):
if "." in column["name"]:
logger.info(f"Found '.' in {column['name']}")
column["name"] = column["name"].replace(".", "_DOT_")
@ -489,9 +553,6 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
col_data_length = None
arr_data_type = None
if "raw_data_type" in column and column["raw_data_type"] is not None:
column["raw_data_type"] = self.parse_raw_data_type(
column["raw_data_type"]
)
(
col_type,
data_type_display,
@ -514,26 +575,16 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
r"(?:\w*)(?:[(]*)(\w*)(?:.*)", str(column["type"])
).groups()
data_type_display = column["type"]
col_constraint = None
if column["nullable"]:
col_constraint = Constraint.NULL
elif not column["nullable"]:
col_constraint = Constraint.NOT_NULL
if column["name"] in pk_columns:
col_constraint = Constraint.PRIMARY_KEY
elif column["name"] in unique_columns:
col_constraint = Constraint.UNIQUE
if col_type.upper() in ["CHAR", "VARCHAR", "BINARY", "VARBINARY"]:
col_constraint = self._get_column_constraints(
column, pk_columns, unique_columns
)
if col_type.upper() in {"CHAR", "VARCHAR", "BINARY", "VARBINARY"}:
col_data_length = column["type"].length
if col_data_length is None:
col_data_length = 1
try:
if col_type == "NULL":
col_type = self.type_of_column_name(
col_type,
column_name=column["name"],
table_name=dataset_name,
)
if col_type == "NULL":
col_type = "VARCHAR"
data_type_display = "varchar"
@ -544,27 +595,32 @@ class SQLSource(Source[OMetaDatabaseAndTable]):
name=column["name"],
description=column.get("comment", None),
dataType=col_type,
dataTypeDisplay="{}({})".format(col_type, col_data_length)
dataTypeDisplay=f"{col_type}({col_data_length})"
if data_type_display is None
else f"{data_type_display}",
dataLength=col_data_length,
constraint=col_constraint,
ordinalPosition=row_order,
children=children if children is not None else None,
ordinalPosition=row_order + 1, # enumerate starts at 0
children=children,
arrayDataType=arr_data_type,
)
except Exception as err:
except Exception as err: # pylint: disable=broad-except
logger.error(traceback.format_exc())
logger.error(traceback.print_exc())
logger.error(f"{err} : {column}")
continue
table_columns.append(om_column)
row_order = row_order + 1
return table_columns
except Exception as err:
logger.error("{}: {} {}".format(repr(err), table, err))
except Exception as err: # pylint: disable=broad-except
logger.error(f"{repr(err)}: {table} {err}")
return None
def run_data_profiler(self, table: str, schema: str) -> TableProfile:
"""
Run the profiler for a table in a schema.
Prepare specific namings for different sources, e.g. bigquery
"""
dataset_name = f"{schema}.{table}"
self.status.scanned(f"profile of {dataset_name}")
logger.info(