diff --git a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py index 0991ca24eb..01ebe1c696 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py +++ b/metadata-ingestion/src/datahub/ingestion/source/datahub/datahub_database_reader.py @@ -104,6 +104,22 @@ class DataHubDatabaseReader: ORDER BY mav.urn """ + def _get_json_extract_expression(self) -> str: + """ + Returns the appropriate JSON extraction expression based on the database dialect. + + Returns: + Database-specific JSON extraction expression + """ + # Return the correct JSON extraction expression for the "removed" field, + # depending on the database dialect. + if self.engine.dialect.name == "postgresql": + # For PostgreSQL, cast the metadata column to JSON and extract the 'removed' key as boolean. + return "((metadata::json)->>'removed')::boolean" + else: + # For other databases (e.g., MySQL), use JSON_EXTRACT. + return "JSON_EXTRACT(metadata, '$.removed')" + def query(self, set_structured_properties_filter: bool) -> str: """ Main query that gets data for specified date range with appropriate filters. @@ -125,7 +141,7 @@ class DataHubDatabaseReader: LEFT JOIN ( SELECT *, - JSON_EXTRACT(metadata, '$.removed') as removed + {self._get_json_extract_expression()} as removed FROM {self.engine.dialect.identifier_preparer.quote(self.config.database_table_name)} WHERE aspect = 'status' AND version = 0 @@ -241,15 +257,10 @@ class DataHubDatabaseReader: "end_createdon": end_date.strftime(DATETIME_FORMAT), "limit": limit, "offset": offset, + # Always pass exclude_aspects as a tuple, postgres doesn't support lists + "exclude_aspects": tuple(self.config.exclude_aspects), } - # Add exclude_aspects if needed - if ( - hasattr(self.config, "exclude_aspects") - and self.config.exclude_aspects - ): - params["exclude_aspects"] = tuple(self.config.exclude_aspects) - logger.info( f"Querying data from {start_date.strftime(DATETIME_FORMAT)} to {end_date.strftime(DATETIME_FORMAT)} " f"with limit {limit} and offset {offset} (inclusive range)" diff --git a/metadata-ingestion/tests/unit/test_datahub_source.py b/metadata-ingestion/tests/unit/test_datahub_source.py index 0b82a41480..34ef67d692 100644 --- a/metadata-ingestion/tests/unit/test_datahub_source.py +++ b/metadata-ingestion/tests/unit/test_datahub_source.py @@ -304,4 +304,5 @@ def test_get_rows_for_date_range_exclude_aspects(mock_reader): # Assert called_params = mock_reader.execute_server_cursor.call_args[0][1] assert "exclude_aspects" in called_params + assert isinstance(called_params["exclude_aspects"], tuple) assert called_params["exclude_aspects"] == ("aspect1", "aspect2")