diff --git a/ingestion/src/metadata/profiler/api/workflow.py b/ingestion/src/metadata/profiler/api/workflow.py index 84cd004281f..f3cb1eb5576 100644 --- a/ingestion/src/metadata/profiler/api/workflow.py +++ b/ingestion/src/metadata/profiler/api/workflow.py @@ -491,10 +491,6 @@ class ProfilerWorkflow(WorkflowStatusMixin): raise WorkflowExecutionError( "Source reported warnings", self.source_status ) - if self.source_status.warnings: - raise WorkflowExecutionError( - "Processor reported warnings", self.source_status - ) if hasattr(self, "sink") and self.sink.get_status().warnings: raise WorkflowExecutionError( "Sink reported warnings", self.sink.get_status() diff --git a/ingestion/src/metadata/profiler/metrics/static/min.py b/ingestion/src/metadata/profiler/metrics/static/min.py index 89f94914967..32871f7f207 100644 --- a/ingestion/src/metadata/profiler/metrics/static/min.py +++ b/ingestion/src/metadata/profiler/metrics/static/min.py @@ -14,7 +14,6 @@ Min Metric definition """ # pylint: disable=duplicate-code - from sqlalchemy import column from sqlalchemy.ext.compiler import compiles from sqlalchemy.sql.functions import GenericFunction diff --git a/ingestion/src/metadata/profiler/orm/converter.py b/ingestion/src/metadata/profiler/orm/converter.py index 60725af8dd2..7d5602e4564 100644 --- a/ingestion/src/metadata/profiler/orm/converter.py +++ b/ingestion/src/metadata/profiler/orm/converter.py @@ -41,7 +41,7 @@ _TYPE_MAP = { DataType.DOUBLE: sqlalchemy.DECIMAL, DataType.DECIMAL: sqlalchemy.DECIMAL, DataType.NUMERIC: sqlalchemy.NUMERIC, - DataType.TIMESTAMP: sqlalchemy.TIMESTAMP, + DataType.TIMESTAMP: CustomTypes.TIMESTAMP.value, DataType.TIME: sqlalchemy.TIME, DataType.DATE: sqlalchemy.DATE, DataType.DATETIME: sqlalchemy.DATETIME, diff --git a/ingestion/src/metadata/profiler/orm/registry.py b/ingestion/src/metadata/profiler/orm/registry.py index 3b2ee0a1722..eee00e29e84 100644 --- a/ingestion/src/metadata/profiler/orm/registry.py +++ b/ingestion/src/metadata/profiler/orm/registry.py @@ -21,6 +21,7 @@ from metadata.generated.schema.entity.data.table import DataType from metadata.ingestion.source import sqa_types from metadata.profiler.orm.types.bytea_to_string import ByteaToHex from metadata.profiler.orm.types.custom_array import CustomArray +from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp from metadata.profiler.orm.types.hex_byte_string import HexByteString from metadata.profiler.orm.types.uuid import UUIDString from metadata.profiler.registry import TypeRegistry @@ -32,6 +33,7 @@ class CustomTypes(TypeRegistry): UUID = UUIDString BYTEA = ByteaToHex ARRAY = CustomArray + TIMESTAMP = CustomTimestamp class Dialects(Enum): diff --git a/ingestion/src/metadata/profiler/orm/types/custom_timestamp.py b/ingestion/src/metadata/profiler/orm/types/custom_timestamp.py new file mode 100644 index 00000000000..00946d210c3 --- /dev/null +++ b/ingestion/src/metadata/profiler/orm/types/custom_timestamp.py @@ -0,0 +1,50 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=abstract-method + +""" +Expand sqlalchemy types to map them to OpenMetadata DataType +""" +from sqlalchemy.sql.sqltypes import TIMESTAMP, TypeDecorator + +from metadata.utils.logger import profiler_logger + +logger = profiler_logger() + + +class CustomTimestamp(TypeDecorator): + """ + Convert RowVersion + """ + + impl = TIMESTAMP + cache_ok = True + + @property + def python_type(self): + return str + + def process_result_value(self, value, dialect): + """This is executed during result retrieval + + Args: + value: database record + dialect: database dialect + Returns: + python rowversion conversion to timestamp + """ + import struct # pylint: disable=import-outside-toplevel + + if dialect.name == "mssql" and isinstance(value, bytes): + bytes_to_int = struct.unpack(">Q", value)[0] + return bytes_to_int + return value diff --git a/ingestion/tests/integration/orm_profiler/test_converter.py b/ingestion/tests/integration/orm_profiler/test_converter.py index a5d1b5158db..1fa12617596 100644 --- a/ingestion/tests/integration/orm_profiler/test_converter.py +++ b/ingestion/tests/integration/orm_profiler/test_converter.py @@ -44,6 +44,7 @@ from metadata.generated.schema.security.client.openMetadataJWTClientConfig impor ) from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.profiler.orm.converter import ometa_to_sqa_orm +from metadata.profiler.orm.types.custom_timestamp import CustomTimestamp class ProfilerWorkflowTest(TestCase): @@ -121,7 +122,7 @@ class ProfilerWorkflowTest(TestCase): assert isinstance(orm_table.id.type, sqlalchemy.BIGINT) assert isinstance(orm_table.name.type, sqlalchemy.String) assert isinstance(orm_table.age.type, sqlalchemy.INTEGER) - assert isinstance(orm_table.last_updated.type, sqlalchemy.TIMESTAMP) + assert isinstance(orm_table.last_updated.type, CustomTimestamp) assert isinstance(orm_table.created_date.type, sqlalchemy.DATE) assert isinstance(orm_table.group.type, sqlalchemy.CHAR) assert isinstance(orm_table.savings.type, sqlalchemy.DECIMAL)