From f3cb001d2bc12b8a3e3172004bdb32c078879c71 Mon Sep 17 00:00:00 2001 From: Teddy Date: Wed, 10 Sep 2025 12:04:46 +0200 Subject: [PATCH] ISSUE #2033-C - Support For DBX Exporter + Minor Fix to Status (#23313) * feat: added config support for databricks * fix: allow incrementing record count directly without storing element * Update generated TypeScript types --------- Co-authored-by: github-actions[bot] --- .../src/metadata/ingestion/api/status.py | 7 ++- ingestion/src/metadata/ingestion/api/step.py | 4 +- ingestion/src/metadata/workflow/base.py | 9 +++- .../external/metadataExporterAppConfig.json | 3 ++ .../external/metadataExporterAppConfig.ts | 52 +++++++++++++++---- 5 files changed, 63 insertions(+), 12 deletions(-) diff --git a/ingestion/src/metadata/ingestion/api/status.py b/ingestion/src/metadata/ingestion/api/status.py index 77322f7ff0b..835326cea34 100644 --- a/ingestion/src/metadata/ingestion/api/status.py +++ b/ingestion/src/metadata/ingestion/api/status.py @@ -53,6 +53,7 @@ class Status(BaseModel): ) records: Annotated[List[Any], Field(default_factory=list)] + record_count: int = Field(default=0) updated_records: Annotated[List[Any], Field(default_factory=list)] warnings: Annotated[List[Any], Field(default_factory=list)] filtered: Annotated[List[Dict[str, str]], Field(default_factory=list)] @@ -75,6 +76,9 @@ class Status(BaseModel): if log_name := get_log_name(record): self.updated_records.append(log_name) + def increment_record_count(self, increment: int = 1) -> None: + self.record_count += increment + def warning(self, key: str, reason: str) -> None: self.warnings.append({key: reason}) @@ -108,8 +112,9 @@ class Status(BaseModel): self.failures.extend(failures) def calculate_success(self) -> float: + record_count = self.record_count if self.record_count > 0 else len(self.records) source_success = max( - len(self.records) + len(self.updated_records), 1 + record_count + len(self.updated_records), 1 ) # To avoid ZeroDivisionError using minimum value as 1 source_failed = len(self.failures) return round(source_success * 100 / (source_success + source_failed), 2) diff --git a/ingestion/src/metadata/ingestion/api/step.py b/ingestion/src/metadata/ingestion/api/step.py index 18770531c7b..1e10620d05c 100644 --- a/ingestion/src/metadata/ingestion/api/step.py +++ b/ingestion/src/metadata/ingestion/api/step.py @@ -77,7 +77,9 @@ class Summary(StepSummary): """Compute summary from Step""" return Summary( name=step.name, - records=len(step.status.records), + records=step.status.record_count + if step.status.record_count > 0 + else len(step.status.records), updated_records=len(step.status.updated_records), warnings=len(step.status.warnings), errors=len(step.status.failures), diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index 17f7e82730a..51c935d1526 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -324,8 +324,15 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): """ try: for step in self.workflow_steps(): + + record_count: int = ( + step.status.record_count + if step.status.record_count > 0 + else len(step.status.records) + ) + logger.info( - f"{step.name}: Processed {len(step.status.records)} records," + f"{step.name}: Processed {record_count} records," f" updated {len(step.status.updated_records)} records," f" filtered {len(step.status.filtered)} records," f" found {len(step.status.failures)} errors" diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/external/metadataExporterAppConfig.json b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/external/metadataExporterAppConfig.json index cc87854cbe7..5ebb7b91e7c 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/external/metadataExporterAppConfig.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/applications/configuration/external/metadataExporterAppConfig.json @@ -74,6 +74,9 @@ "oneOf": [ { "$ref": "metadataExporterConnectors/snowflakeConnection.json" + }, + { + "$ref": "metadataExporterConnectors/databricksConnection.json" } ] }, diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/configuration/external/metadataExporterAppConfig.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/configuration/external/metadataExporterAppConfig.ts index a34654d4ed2..dac82eda9a8 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/configuration/external/metadataExporterAppConfig.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/applications/configuration/external/metadataExporterAppConfig.ts @@ -22,7 +22,7 @@ export interface MetadataExporterAppConfig { /** * Connection details for the Metadata Exporter Application. */ - connectionConfig: SnowflakeConnection; + connectionConfig: Connection; /** * List of event types to export. */ @@ -46,13 +46,15 @@ export interface MetadataExporterAppConfig { * Connection details for the Metadata Exporter Application. * * Snowflake Connection Config + * + * Databricks Connection Config */ -export interface SnowflakeConnection { +export interface Connection { /** * If the Snowflake URL is https://xyz1234.us-east-1.gcp.snowflakecomputing.com, then the * account is xyz1234.us-east-1.gcp */ - account: string; + account?: string; /** * Optional configuration for ingestion to keep the client session active in case the * ingestion process runs for longer durations. @@ -86,7 +88,7 @@ export interface SnowflakeConnection { /** * SQLAlchemy driver scheme options. */ - scheme?: SnowflakeScheme; + scheme?: Scheme; /** * Snowflake Passphrase Key used with Private Key */ @@ -94,22 +96,53 @@ export interface SnowflakeConnection { /** * Service Type */ - type?: SnowflakeType; + type?: Type; /** * Username to connect to Snowflake. This user should have privileges to read all the * metadata in Snowflake. */ - username: string; + username?: string; /** * Snowflake warehouse. */ - warehouse: string; + warehouse?: string; + /** + * Catalog of the data source(Example: hive_metastore). This is optional parameter, if you + * would like to restrict the metadata reading to a single catalog. When left blank, + * OpenMetadata Ingestion attempts to scan all the catalog. + */ + catalog?: string; + /** + * The maximum amount of time (in seconds) to wait for a successful connection to the data + * source. If the connection attempt takes longer than this timeout period, an error will be + * returned. + */ + connectionTimeout?: number; + /** + * Database Schema of the data source. This is optional parameter, if you would like to + * restrict the metadata reading to a single schema. When left blank, OpenMetadata Ingestion + * attempts to scan all the schemas. + */ + databaseSchema?: string; + /** + * Host and port of the Databricks service. + */ + hostPort?: string; + /** + * Databricks compute resources URL. + */ + httpPath?: string; + /** + * Generated Token to connect to Databricks. + */ + token?: string; } /** * SQLAlchemy driver scheme options. */ -export enum SnowflakeScheme { +export enum Scheme { + DatabricksConnector = "databricks+connector", Snowflake = "snowflake", } @@ -118,7 +151,8 @@ export enum SnowflakeScheme { * * Service type. */ -export enum SnowflakeType { +export enum Type { + Databricks = "Databricks", Snowflake = "Snowflake", }