mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-08 05:26:19 +00:00
parent
a49fb8f0b2
commit
04ede3b05b
4
.coveragerc
Normal file
4
.coveragerc
Normal file
@ -0,0 +1,4 @@
|
||||
[run]
|
||||
relative_files = True
|
||||
branch = True
|
||||
source = ingestion/
|
||||
5
.github/workflows/py-tests-3_9.yml
vendored
5
.github/workflows/py-tests-3_9.yml
vendored
@ -34,6 +34,7 @@ jobs:
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
ref: ${{ github.event.pull_request.head.sha }}
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up JDK 11
|
||||
uses: actions/setup-java@v2
|
||||
@ -70,7 +71,6 @@ jobs:
|
||||
rm pom.xml
|
||||
|
||||
- name: Fetch main for sonar
|
||||
if: ${{ github.event_name == 'pull_request_target' }}
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
run: |
|
||||
@ -83,7 +83,6 @@ jobs:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
SONAR_TOKEN: ${{ secrets.INGESTION_SONAR_SECRET }}
|
||||
with:
|
||||
projectBaseDir: ingestion/
|
||||
args: >
|
||||
-Dsonar.pullrequest.key=${{ github.event.pull_request.number }}
|
||||
-Dsonar.pullrequest.branch=${{ github.head_ref }}
|
||||
@ -97,5 +96,3 @@ jobs:
|
||||
env:
|
||||
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
|
||||
SONAR_TOKEN: ${{ secrets.INGESTION_SONAR_SECRET }}
|
||||
with:
|
||||
projectBaseDir: ingestion/
|
||||
|
||||
13
.pylintrc
13
.pylintrc
@ -3,14 +3,17 @@
|
||||
# W1202: logging-format-interpolation - lazy formatting in logging functions
|
||||
# R0903: too-few-public-methods - False negatives in pydantic classes
|
||||
# W0707: raise-missing-from - Tends to be a false positive as exception are closely encapsulated
|
||||
disable=W1203,W1202,R0903,W0707
|
||||
# R0901: too-many-ancestors - We are already inheriting from SQA classes with a bunch of ancestors
|
||||
# W0703: broad-except - We are dealing with many different source systems, but we want to make sure workflows run until the end
|
||||
# W0511: fixme - These are internal notes and guides
|
||||
disable=W1203,W1202,R0903,W0707,R0901,W1201,W0703,W0511
|
||||
|
||||
docstring-min-length=20
|
||||
max-args=7
|
||||
max-attributes=12
|
||||
|
||||
# usual typevar naming
|
||||
good-names=T,C
|
||||
good-names=T,C,fn
|
||||
|
||||
[MASTER]
|
||||
fail-under=6.0
|
||||
@ -20,8 +23,6 @@ extension-pkg-allow-list=pydantic
|
||||
[MESSAGES CONTROL]
|
||||
disable=no-name-in-module
|
||||
|
||||
[TYPECHECK]
|
||||
ignored-classes=optparse.Values,thread._local,_thread._local,SQLAlchemyHelper,FieldInfo
|
||||
|
||||
[FORMAT]
|
||||
max-line-length=88
|
||||
# We all have big monitors now
|
||||
max-line-length=120
|
||||
|
||||
9
Makefile
9
Makefile
@ -67,11 +67,11 @@ generate: ## Generate the pydantic models from the JSON Schemas to the ingestio
|
||||
## Ingestion tests & QA
|
||||
.PHONY: run_ometa_integration_tests
|
||||
run_ometa_integration_tests: ## Run Python integration tests
|
||||
coverage run -a -m pytest -c ingestion/setup.cfg --doctest-modules --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/stage
|
||||
coverage run -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-integration.xml ingestion/tests/integration/ometa ingestion/tests/integration/stage
|
||||
|
||||
.PHONY: unit_ingestion
|
||||
unit_ingestion: ## Run Python unit tests
|
||||
coverage run -a -m pytest -c ingestion/setup.cfg -s --doctest-modules --junitxml=ingestion/junit/test-results-unit.xml ingestion/tests/unit
|
||||
coverage run -a --branch -m pytest -c ingestion/setup.cfg --junitxml=ingestion/junit/test-results-unit.xml ingestion/tests/unit
|
||||
|
||||
.PHONY: coverage
|
||||
coverage: ## Run all Python tests and generate the coverage report
|
||||
@ -79,11 +79,6 @@ coverage: ## Run all Python tests and generate the coverage report
|
||||
$(MAKE) unit_ingestion
|
||||
$(MAKE) run_ometa_integration_tests
|
||||
coverage xml -i -o ingestion/coverage.xml
|
||||
# Now base dir for filename
|
||||
# Fix GA path https://community.sonarsource.com/t/sonar-on-github-actions-with-python-coverage-source-issue/36057
|
||||
# sed -e "s/filename=\"ingestion\//filename=\"/g" -e "s/<source>\/home\/runner\/work\/OpenMetadata\/OpenMetadata<\/source>/<source>\/github\/workspace<\/source>/g" ingestion/coverage.xml >> ingestion/ci-coverage.xml
|
||||
sed -e "s/<source>\/home\/runner\/work\/OpenMetadata\/OpenMetadata<\/source>/<source>\/github\/workspace<\/source>/g" ingestion/coverage.xml >> ingestion/ci-coverage.xml
|
||||
cat ingestion/ci-coverage.xml
|
||||
|
||||
.PHONY: sonar_ingestion
|
||||
sonar_ingestion: ## Run the Sonar analysis based on the tests results and push it to SonarCloud
|
||||
|
||||
@ -46,8 +46,3 @@ package_dir =
|
||||
[options.packages.find]
|
||||
where = src
|
||||
include = *
|
||||
|
||||
[coverage:run]
|
||||
relative_files = True
|
||||
branch = True
|
||||
source = ingestion/
|
||||
|
||||
@ -3,9 +3,9 @@ sonar.projectName=open-metadata-ingestion
|
||||
sonar.organization=open-metadata
|
||||
sonar.language=py
|
||||
|
||||
sonar.sources=src
|
||||
sonar.tests=tests
|
||||
sonar.exclusions=src/metadata_server/static/**,ingestion/src/metadata_server/templates/**
|
||||
sonar.python.xunit.reportPath=junit/test-results-*.xml
|
||||
sonar.python.coverage.reportPaths=ci-coverage.xml
|
||||
sonar.sources=ingestion/src
|
||||
sonar.tests=ingestion/tests
|
||||
sonar.exclusions=ingestion/src/metadata_server/static/**,ingestion/src/metadata_server/templates/**
|
||||
sonar.python.xunit.reportPath=ingestion/junit/test-results-*.xml
|
||||
sonar.python.coverage.reportPaths=ingestion/ci-coverage.xml
|
||||
sonar.python.version=3.7,3.8,3.9
|
||||
|
||||
@ -13,6 +13,9 @@ from abc import ABCMeta, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Generic, List
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.ingestion.api.closeable import Closeable
|
||||
from metadata.ingestion.api.common import Entity
|
||||
from metadata.ingestion.api.status import Status
|
||||
@ -39,7 +42,7 @@ class Processor(Closeable, Generic[Entity], metaclass=ABCMeta):
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(
|
||||
cls, config_dict: dict, metadata_config_dict: dict, **kwargs
|
||||
cls, config_dict: dict, metadata_config: OpenMetadataConnection, **kwargs
|
||||
) -> "Processor":
|
||||
pass
|
||||
|
||||
|
||||
@ -15,6 +15,9 @@ from typing import Any, Generic, List
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
|
||||
OpenMetadataConnection,
|
||||
)
|
||||
from metadata.ingestion.api.closeable import Closeable
|
||||
from metadata.ingestion.api.common import Entity
|
||||
from metadata.ingestion.api.status import Status
|
||||
@ -42,7 +45,9 @@ class Sink(Closeable, Generic[Entity], metaclass=ABCMeta):
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def create(cls, config_dict: dict, metadata_config_dict: dict) -> "Sink":
|
||||
def create(
|
||||
cls, config_dict: dict, metadata_config: OpenMetadataConnection
|
||||
) -> "Sink":
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@ -185,6 +185,10 @@ class ProfilerWorkflow:
|
||||
self.sink.write_record(profile_and_tests)
|
||||
|
||||
def print_status(self) -> int:
|
||||
"""
|
||||
Runs click echo to print the
|
||||
workflow results
|
||||
"""
|
||||
click.echo()
|
||||
click.secho("Source Status:", bold=True)
|
||||
click.echo(self.source_status.as_string())
|
||||
@ -202,16 +206,16 @@ class ProfilerWorkflow:
|
||||
):
|
||||
click.secho("Workflow finished with failures", fg="bright_red", bold=True)
|
||||
return 1
|
||||
elif (
|
||||
if (
|
||||
self.source_status.warnings
|
||||
or self.processor.get_status().failures
|
||||
or (hasattr(self, "sink") and self.sink.get_status().warnings)
|
||||
):
|
||||
click.secho("Workflow finished with warnings", fg="yellow", bold=True)
|
||||
return 0
|
||||
else:
|
||||
click.secho("Workflow finished successfully", fg="green", bold=True)
|
||||
return 0
|
||||
|
||||
click.secho("Workflow finished successfully", fg="green", bold=True)
|
||||
return 0
|
||||
|
||||
def raise_from_status(self, raise_warnings=False):
|
||||
"""
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Distinct Ratio Composed Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
from metadata.orm_profiler.metrics.core import ComposedMetric
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Count Duplicates Composed Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
from metadata.orm_profiler.metrics.core import ComposedMetric
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
ILIKE Ratio Composed Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
from metadata.orm_profiler.metrics.core import ComposedMetric
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
LIKE Ratio Composed Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
from metadata.orm_profiler.metrics.core import ComposedMetric
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Null Ratio Composed Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
from metadata.orm_profiler.metrics.core import ComposedMetric
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Unique Ratio Composed Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
|
||||
from metadata.orm_profiler.metrics.core import ComposedMetric
|
||||
|
||||
@ -139,7 +139,7 @@ class Metric(ABC):
|
||||
return self.col.type.python_type if self.col else None
|
||||
|
||||
|
||||
MetricType = TypeVar("MetricType", bound=Metric)
|
||||
TMetric = TypeVar("TMetric", bound=Metric)
|
||||
|
||||
|
||||
class StaticMetric(Metric, ABC):
|
||||
|
||||
@ -12,7 +12,10 @@
|
||||
"""
|
||||
Table Column Count Metric definition
|
||||
"""
|
||||
from sqlalchemy import func, inspect, literal
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import inspect, literal
|
||||
from sqlalchemy.orm import DeclarativeMeta
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -21,9 +24,15 @@ class ColumnCount(StaticMetric):
|
||||
"""
|
||||
COLUMN_COUNT Metric
|
||||
|
||||
Count all columns on a table
|
||||
Count all columns on a table.
|
||||
|
||||
This Metric needs to be initialised passing the Table
|
||||
information:
|
||||
add_props(table=table)(Metrics.COLUMN_COUNT.value)
|
||||
"""
|
||||
|
||||
table: DeclarativeMeta
|
||||
|
||||
@classmethod
|
||||
def name(cls):
|
||||
return "columnCount"
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Count Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -12,6 +12,10 @@
|
||||
"""
|
||||
CountInSet Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from typing import List
|
||||
|
||||
from sqlalchemy import case, column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
@ -25,8 +29,14 @@ class CountInSet(StaticMetric):
|
||||
COUNT_IN_SET Metric
|
||||
|
||||
Given a column, return the count of values in a given set.
|
||||
|
||||
This Metric needs to be initialised passing the values to look for
|
||||
the count:
|
||||
add_props(values=["John"])(Metrics.COUNT_IN_SET.value)
|
||||
"""
|
||||
|
||||
values: List[str]
|
||||
|
||||
@classmethod
|
||||
def name(cls):
|
||||
return "countInSet"
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Distinct Count Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import column, distinct, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -14,7 +14,7 @@ Histogram Metric definition
|
||||
"""
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy import and_, column, func
|
||||
from sqlalchemy import column, func
|
||||
from sqlalchemy.orm import DeclarativeMeta, Session
|
||||
|
||||
from metadata.orm_profiler.metrics.core import QueryMetric
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
ILIKE Count Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import case, column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
@ -23,8 +25,13 @@ class ILikeCount(StaticMetric):
|
||||
|
||||
Given a column, and an expression, return the number of
|
||||
rows that match it
|
||||
|
||||
This Metric needs to be initialised passing the expression to check
|
||||
add_props(expression="j%")(Metrics.ILIKE_COUNT.value)
|
||||
"""
|
||||
|
||||
expression: str
|
||||
|
||||
@classmethod
|
||||
def name(cls):
|
||||
return "iLikeCount"
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Like Count Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import case, column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
@ -23,8 +25,13 @@ class LikeCount(StaticMetric):
|
||||
|
||||
Given a column, and an expression, return the number of
|
||||
rows that match it
|
||||
|
||||
This Metric needs to be initialised passing the expression to check
|
||||
add_props(expression="j%")(Metrics.LIKE_COUNT.value)
|
||||
"""
|
||||
|
||||
expression: str
|
||||
|
||||
@classmethod
|
||||
def name(cls):
|
||||
return "likeCount"
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Max Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
MAX_LENGTH Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
AVG Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Min Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
MIN_LENGTH Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Null Count Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import case, column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Table Count Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Population Standard deviation Metric definition
|
||||
"""
|
||||
# Keep SQA docs style defining custom constructs
|
||||
# pylint: disable=consider-using-f-string,duplicate-code
|
||||
from sqlalchemy import column
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.functions import FunctionElement
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
SUM Metric definition
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy import column, func
|
||||
|
||||
from metadata.orm_profiler.metrics.core import StaticMetric, _label
|
||||
|
||||
@ -14,7 +14,7 @@ Converter logic to transform an OpenMetadata Table Entity
|
||||
to an SQLAlchemy ORM class.
|
||||
"""
|
||||
from functools import singledispatch
|
||||
from typing import Optional, Union
|
||||
from typing import Union
|
||||
|
||||
import sqlalchemy
|
||||
from sqlalchemy.orm import DeclarativeMeta, declarative_base
|
||||
@ -80,13 +80,11 @@ def build_orm_col(idx: int, col: Column) -> sqlalchemy.Column:
|
||||
return sqlalchemy.Column(
|
||||
name=str(col.name.__root__),
|
||||
type_=_TYPE_MAP.get(col.dataType),
|
||||
primary_key=True if idx == 0 else False,
|
||||
primary_key=bool(idx),
|
||||
)
|
||||
|
||||
|
||||
def ometa_to_orm(
|
||||
table: Table, schema: Union[DatabaseSchema, str], dialect: Optional[str] = None
|
||||
) -> DeclarativeMeta:
|
||||
def ometa_to_orm(table: Table, schema: Union[DatabaseSchema, str]) -> DeclarativeMeta:
|
||||
"""
|
||||
Given an OpenMetadata instance, prepare
|
||||
the SQLAlchemy DeclarativeMeta class
|
||||
|
||||
@ -12,6 +12,9 @@
|
||||
"""
|
||||
Define Concat function
|
||||
"""
|
||||
# Keep SQA docs style defining custom constructs
|
||||
# pylint: disable=consider-using-f-string,duplicate-code
|
||||
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.functions import FunctionElement
|
||||
|
||||
|
||||
@ -15,6 +15,8 @@ This will be executed as a way to make sure
|
||||
that the Engine can reach and execute in the
|
||||
source.
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.expression import ClauseElement, Executable
|
||||
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Define Length function
|
||||
"""
|
||||
# Keep SQA docs style defining custom constructs
|
||||
# pylint: disable=consider-using-f-string,duplicate-code
|
||||
from sqlalchemy.ext.compiler import compiles
|
||||
from sqlalchemy.sql.functions import FunctionElement
|
||||
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Expand sqlalchemy types to map them to OpenMetadata DataType
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from sqlalchemy.sql.sqltypes import String, TypeDecorator
|
||||
|
||||
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
Expand sqlalchemy types to map them to OpenMetadata DataType
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from uuid import UUID
|
||||
|
||||
from sqlalchemy.sql.sqltypes import String, TypeDecorator
|
||||
|
||||
@ -145,7 +145,6 @@ class OrmProfilerProcessor(Processor[Table]):
|
||||
)
|
||||
|
||||
# Here we will need to add the logic to pass kwargs to the metrics
|
||||
# TODO: add_props when needed for incoming metrics
|
||||
metrics = (
|
||||
[Metrics.get(name) for name in self.config.profiler.metrics]
|
||||
if self.config.profiler.metrics
|
||||
@ -473,9 +472,7 @@ class OrmProfilerProcessor(Processor[Table]):
|
||||
schema = self.metadata.get_by_id(
|
||||
entity=DatabaseSchema, entity_id=record.databaseSchema.id
|
||||
)
|
||||
orm_table = ometa_to_orm(
|
||||
table=record, schema=schema, dialect=self.session.bind.dialect.name
|
||||
)
|
||||
orm_table = ometa_to_orm(table=record, schema=schema)
|
||||
|
||||
entity_profile = self.profile_entity(orm_table, record)
|
||||
|
||||
|
||||
@ -18,7 +18,7 @@ from typing import Any, Dict, Generic, List, Optional, Tuple, Type, Union
|
||||
|
||||
from pydantic import ValidationError
|
||||
from sqlalchemy import Column, inspect
|
||||
from sqlalchemy.orm import DeclarativeMeta, Query, aliased
|
||||
from sqlalchemy.orm import DeclarativeMeta
|
||||
from sqlalchemy.orm.session import Session
|
||||
from sqlalchemy.orm.util import AliasedClass
|
||||
|
||||
@ -26,9 +26,9 @@ from metadata.generated.schema.entity.data.table import ColumnProfile, TableProf
|
||||
from metadata.orm_profiler.metrics.core import (
|
||||
ComposedMetric,
|
||||
CustomMetric,
|
||||
MetricType,
|
||||
QueryMetric,
|
||||
StaticMetric,
|
||||
TMetric,
|
||||
)
|
||||
from metadata.orm_profiler.metrics.static.row_count import RowCount
|
||||
from metadata.orm_profiler.orm.registry import NOT_COMPUTE
|
||||
@ -48,7 +48,7 @@ class MissingMetricException(Exception):
|
||||
"""
|
||||
|
||||
|
||||
class Profiler(Generic[MetricType]):
|
||||
class Profiler(Generic[TMetric]):
|
||||
"""
|
||||
Core Profiler.
|
||||
|
||||
@ -58,9 +58,11 @@ class Profiler(Generic[MetricType]):
|
||||
- A tuple of metrics, from which we will construct queries.
|
||||
"""
|
||||
|
||||
# pylint: disable=too-many-instance-attributes,too-many-public-methods
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*metrics: Type[MetricType],
|
||||
*metrics: Type[TMetric],
|
||||
session: Session,
|
||||
table: DeclarativeMeta,
|
||||
profile_date: datetime = datetime.now(),
|
||||
@ -117,7 +119,7 @@ class Profiler(Generic[MetricType]):
|
||||
return self._table
|
||||
|
||||
@property
|
||||
def metrics(self) -> Tuple[Type[MetricType], ...]:
|
||||
def metrics(self) -> Tuple[Type[TMetric], ...]:
|
||||
return self._metrics
|
||||
|
||||
@property
|
||||
@ -158,7 +160,7 @@ class Profiler(Generic[MetricType]):
|
||||
|
||||
return self._columns
|
||||
|
||||
def _filter_metrics(self, _type: Type[MetricType]) -> List[Type[MetricType]]:
|
||||
def _filter_metrics(self, _type: Type[TMetric]) -> List[Type[TMetric]]:
|
||||
"""
|
||||
Filter metrics by type
|
||||
"""
|
||||
@ -181,7 +183,7 @@ class Profiler(Generic[MetricType]):
|
||||
return self._filter_metrics(QueryMetric)
|
||||
|
||||
@staticmethod
|
||||
def get_col_metrics(metrics: List[Type[MetricType]]) -> List[Type[MetricType]]:
|
||||
def get_col_metrics(metrics: List[Type[TMetric]]) -> List[Type[TMetric]]:
|
||||
"""
|
||||
Filter list of metrics for column metrics with allowed types
|
||||
"""
|
||||
@ -258,7 +260,7 @@ class Profiler(Generic[MetricType]):
|
||||
except (TimeoutError, Exception) as err:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.error(
|
||||
f"Error while running table metric for: {self.table.__tablename__}"
|
||||
f"Error while running table metric for: {self.table.__tablename__} - {err}"
|
||||
)
|
||||
self.session.rollback()
|
||||
|
||||
|
||||
@ -39,8 +39,10 @@ class ProfilerDef(BaseModel):
|
||||
# rule_metrics: ...
|
||||
|
||||
@validator("metrics", each_item=True)
|
||||
def valid_metric(cls, value): # cls as per pydantic docs
|
||||
def valid_metric(cls, value): # pylint: disable=no-self-argument,no-self-use
|
||||
"""
|
||||
We are using cls as per pydantic docs
|
||||
|
||||
Validate that the input metrics are correctly named
|
||||
and can be found in the Registry
|
||||
"""
|
||||
|
||||
@ -54,7 +54,9 @@ class MetricRegistry(Enum):
|
||||
return self.value(*args, **kwargs)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
def name( # pylint: disable=function-redefined, invalid-overridden-method
|
||||
self,
|
||||
) -> str:
|
||||
"""
|
||||
Override the default `name` on Enums
|
||||
to use the mapped class name instead.
|
||||
|
||||
@ -28,6 +28,11 @@ class FileSinkConfig(ConfigModel):
|
||||
|
||||
|
||||
class FileSink(Sink[Entity]):
|
||||
"""
|
||||
Helper sink to save profiler
|
||||
results in a file for analysis
|
||||
"""
|
||||
|
||||
config: FileSinkConfig
|
||||
report: SinkStatus
|
||||
|
||||
@ -44,7 +49,7 @@ class FileSink(Sink[Entity]):
|
||||
# Build the path if it does not exist
|
||||
if not fpath.parent.is_dir():
|
||||
Path(self.config.filename).mkdir(parents=True, exist_ok=True)
|
||||
self.file = fpath.open("w")
|
||||
self.file = fpath.open("w", encoding="utf-8")
|
||||
self.wrote_something = False
|
||||
|
||||
@classmethod
|
||||
@ -61,7 +66,7 @@ class FileSink(Sink[Entity]):
|
||||
self.file.write(f"{record.profile.json()}\n")
|
||||
|
||||
if record.record_tests:
|
||||
self.file.write(f"\nTest results:\n")
|
||||
self.file.write("\nTest results:\n")
|
||||
self.file.write(f"{record.record_tests.json()}\n")
|
||||
|
||||
self.wrote_something = True
|
||||
|
||||
@ -33,6 +33,10 @@ class MetadataRestSinkConfig(ConfigModel):
|
||||
|
||||
|
||||
class MetadataRestSink(Sink[Entity]):
|
||||
"""
|
||||
Metadata Sink sending the profiler
|
||||
and tests results to the OM API
|
||||
"""
|
||||
|
||||
config: MetadataRestSinkConfig
|
||||
status: SinkStatus
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
"""
|
||||
ColumnValueLengthsToBeBetween validation implementation
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
@ -12,7 +12,7 @@
|
||||
"""
|
||||
ColumnValuesMissingCount validation implementation
|
||||
"""
|
||||
|
||||
# pylint: disable=duplicate-code
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
"""
|
||||
ColumnValuesToBeNotNull validation implementation
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
"""
|
||||
ColumnValuesToBeBetween validation implementation
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
"""
|
||||
ColumnValuesToBeNotNull validation implementation
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
"""
|
||||
ColumnValuesToBeUnique validation implementation
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
@ -58,7 +59,7 @@ def column_values_to_be_unique(
|
||||
)
|
||||
result = (
|
||||
f"Found valuesCount={col_profile.valuesCount} vs. uniqueCount={col_profile.uniqueCount}."
|
||||
+ f" Both counts should be equal for column values to be unique."
|
||||
+ " Both counts should be equal for column values to be unique."
|
||||
)
|
||||
|
||||
return TestCaseResult(
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
"""
|
||||
ColumnValuesToBeNotNull validation implementation
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
TableColumnCountToEqual validation implementation
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from metadata.generated.schema.entity.data.table import TableProfile
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
TableRowCountToBeBetween validation implementation
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from metadata.generated.schema.entity.data.table import TableProfile
|
||||
|
||||
@ -12,6 +12,8 @@
|
||||
"""
|
||||
TableRowCountToEqual validation implementation
|
||||
"""
|
||||
# pylint: disable=duplicate-code
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
from metadata.generated.schema.entity.data.table import TableProfile
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user