mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 18:48:35 +00:00 
			
		
		
		
	ISSUE #2033-C - Support For DBX Exporter + Minor Fix to Status (#23313)
* feat: added config support for databricks * fix: allow incrementing record count directly without storing element * Update generated TypeScript types --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> (cherry picked from commit f3cb001d2bc12b8a3e3172004bdb32c078879c71)
This commit is contained in:
		
							parent
							
								
									4159a2345c
								
							
						
					
					
						commit
						696467756c
					
				| @ -53,6 +53,7 @@ class Status(BaseModel): | ||||
|     ) | ||||
| 
 | ||||
|     records: Annotated[List[Any], Field(default_factory=list)] | ||||
|     record_count: int = Field(default=0) | ||||
|     updated_records: Annotated[List[Any], Field(default_factory=list)] | ||||
|     warnings: Annotated[List[Any], Field(default_factory=list)] | ||||
|     filtered: Annotated[List[Dict[str, str]], Field(default_factory=list)] | ||||
| @ -75,6 +76,9 @@ class Status(BaseModel): | ||||
|         if log_name := get_log_name(record): | ||||
|             self.updated_records.append(log_name) | ||||
| 
 | ||||
|     def increment_record_count(self, increment: int = 1) -> None: | ||||
|         self.record_count += increment | ||||
| 
 | ||||
|     def warning(self, key: str, reason: str) -> None: | ||||
|         self.warnings.append({key: reason}) | ||||
| 
 | ||||
| @ -108,8 +112,9 @@ class Status(BaseModel): | ||||
|         self.failures.extend(failures) | ||||
| 
 | ||||
|     def calculate_success(self) -> float: | ||||
|         record_count = self.record_count if self.record_count > 0 else len(self.records) | ||||
|         source_success = max( | ||||
|             len(self.records) + len(self.updated_records), 1 | ||||
|             record_count + len(self.updated_records), 1 | ||||
|         )  # To avoid ZeroDivisionError using minimum value as 1 | ||||
|         source_failed = len(self.failures) | ||||
|         return round(source_success * 100 / (source_success + source_failed), 2) | ||||
|  | ||||
| @ -77,7 +77,9 @@ class Summary(StepSummary): | ||||
|         """Compute summary from Step""" | ||||
|         return Summary( | ||||
|             name=step.name, | ||||
|             records=len(step.status.records), | ||||
|             records=step.status.record_count | ||||
|             if step.status.record_count > 0 | ||||
|             else len(step.status.records), | ||||
|             updated_records=len(step.status.updated_records), | ||||
|             warnings=len(step.status.warnings), | ||||
|             errors=len(step.status.failures), | ||||
|  | ||||
| @ -324,8 +324,15 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): | ||||
|         """ | ||||
|         try: | ||||
|             for step in self.workflow_steps(): | ||||
| 
 | ||||
|                 record_count: int = ( | ||||
|                     step.status.record_count | ||||
|                     if step.status.record_count > 0 | ||||
|                     else len(step.status.records) | ||||
|                 ) | ||||
| 
 | ||||
|                 logger.info( | ||||
|                     f"{step.name}: Processed {len(step.status.records)} records," | ||||
|                     f"{step.name}: Processed {record_count} records," | ||||
|                     f" updated {len(step.status.updated_records)} records," | ||||
|                     f" filtered {len(step.status.filtered)} records," | ||||
|                     f" found {len(step.status.failures)} errors" | ||||
|  | ||||
| @ -74,6 +74,9 @@ | ||||
|       "oneOf": [ | ||||
|         { | ||||
|           "$ref": "metadataExporterConnectors/snowflakeConnection.json" | ||||
|         }, | ||||
|         { | ||||
|           "$ref": "metadataExporterConnectors/databricksConnection.json" | ||||
|         } | ||||
|       ] | ||||
|     }, | ||||
|  | ||||
| @ -22,7 +22,7 @@ export interface MetadataExporterAppConfig { | ||||
|     /** | ||||
|      * Connection details for the Metadata Exporter Application. | ||||
|      */ | ||||
|     connectionConfig: SnowflakeConnection; | ||||
|     connectionConfig: Connection; | ||||
|     /** | ||||
|      * List of event types to export. | ||||
|      */ | ||||
| @ -46,13 +46,15 @@ export interface MetadataExporterAppConfig { | ||||
|  * Connection details for the Metadata Exporter Application. | ||||
|  * | ||||
|  * Snowflake Connection Config | ||||
|  * | ||||
|  * Databricks Connection Config | ||||
|  */ | ||||
| export interface SnowflakeConnection { | ||||
| export interface Connection { | ||||
|     /** | ||||
|      * If the Snowflake URL is https://xyz1234.us-east-1.gcp.snowflakecomputing.com, then the
 | ||||
|      * account is xyz1234.us-east-1.gcp | ||||
|      */ | ||||
|     account: string; | ||||
|     account?: string; | ||||
|     /** | ||||
|      * Optional configuration for ingestion to keep the client session active in case the | ||||
|      * ingestion process runs for longer durations. | ||||
| @ -86,7 +88,7 @@ export interface SnowflakeConnection { | ||||
|     /** | ||||
|      * SQLAlchemy driver scheme options. | ||||
|      */ | ||||
|     scheme?: SnowflakeScheme; | ||||
|     scheme?: Scheme; | ||||
|     /** | ||||
|      * Snowflake Passphrase Key used with Private Key | ||||
|      */ | ||||
| @ -94,22 +96,53 @@ export interface SnowflakeConnection { | ||||
|     /** | ||||
|      * Service Type | ||||
|      */ | ||||
|     type?: SnowflakeType; | ||||
|     type?: Type; | ||||
|     /** | ||||
|      * Username to connect to Snowflake. This user should have privileges to read all the | ||||
|      * metadata in Snowflake. | ||||
|      */ | ||||
|     username: string; | ||||
|     username?: string; | ||||
|     /** | ||||
|      * Snowflake warehouse. | ||||
|      */ | ||||
|     warehouse: string; | ||||
|     warehouse?: string; | ||||
|     /** | ||||
|      * Catalog of the data source(Example: hive_metastore). This is optional parameter, if you | ||||
|      * would like to restrict the metadata reading to a single catalog. When left blank, | ||||
|      * OpenMetadata Ingestion attempts to scan all the catalog. | ||||
|      */ | ||||
|     catalog?: string; | ||||
|     /** | ||||
|      * The maximum amount of time (in seconds) to wait for a successful connection to the data | ||||
|      * source. If the connection attempt takes longer than this timeout period, an error will be | ||||
|      * returned. | ||||
|      */ | ||||
|     connectionTimeout?: number; | ||||
|     /** | ||||
|      * Database Schema of the data source. This is optional parameter, if you would like to | ||||
|      * restrict the metadata reading to a single schema. When left blank, OpenMetadata Ingestion | ||||
|      * attempts to scan all the schemas. | ||||
|      */ | ||||
|     databaseSchema?: string; | ||||
|     /** | ||||
|      * Host and port of the Databricks service. | ||||
|      */ | ||||
|     hostPort?: string; | ||||
|     /** | ||||
|      * Databricks compute resources URL. | ||||
|      */ | ||||
|     httpPath?: string; | ||||
|     /** | ||||
|      * Generated Token to connect to Databricks. | ||||
|      */ | ||||
|     token?: string; | ||||
| } | ||||
| 
 | ||||
| /** | ||||
|  * SQLAlchemy driver scheme options. | ||||
|  */ | ||||
| export enum SnowflakeScheme { | ||||
| export enum Scheme { | ||||
|     DatabricksConnector = "databricks+connector", | ||||
|     Snowflake = "snowflake", | ||||
| } | ||||
| 
 | ||||
| @ -118,7 +151,8 @@ export enum SnowflakeScheme { | ||||
|  * | ||||
|  * Service type. | ||||
|  */ | ||||
| export enum SnowflakeType { | ||||
| export enum Type { | ||||
|     Databricks = "Databricks", | ||||
|     Snowflake = "Snowflake", | ||||
| } | ||||
| 
 | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Teddy
						Teddy