mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-04 04:39:10 +00:00 
			
		
		
		
	fix(snowflake): get external tables when there is default namespace (#4803)
This commit is contained in:
		
							parent
							
								
									8a24408cbf
								
							
						
					
					
						commit
						6828dc3d4c
					
				@ -93,11 +93,32 @@ class SnowflakeSource(SQLAlchemySource):
 | 
				
			|||||||
            **self.config.options,
 | 
					            **self.config.options,
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def inspect_version(self) -> Any:
 | 
					    def inspect_session_metadata(self) -> Any:
 | 
				
			||||||
        db_engine = self.get_metadata_engine()
 | 
					        db_engine = self.get_metadata_engine()
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
            logger.info("Checking current version")
 | 
					            logger.info("Checking current version")
 | 
				
			||||||
            for db_row in db_engine.execute("select CURRENT_VERSION()"):
 | 
					            for db_row in db_engine.execute("select CURRENT_VERSION()"):
 | 
				
			||||||
                self.report.saas_version = db_row[0]
 | 
					                self.report.saas_version = db_row[0]
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            self.report.report_failure("version", f"Error: {e}")
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            logger.info("Checking current warehouse")
 | 
				
			||||||
 | 
					            for db_row in db_engine.execute("select current_warehouse()"):
 | 
				
			||||||
 | 
					                self.report.default_warehouse = db_row[0]
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            self.report.report_failure("current_warehouse", f"Error: {e}")
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            logger.info("Checking current database")
 | 
				
			||||||
 | 
					            for db_row in db_engine.execute("select current_database()"):
 | 
				
			||||||
 | 
					                self.report.default_db = db_row[0]
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            self.report.report_failure("current_database", f"Error: {e}")
 | 
				
			||||||
 | 
					        try:
 | 
				
			||||||
 | 
					            logger.info("Checking current schema")
 | 
				
			||||||
 | 
					            for db_row in db_engine.execute("select current_schema()"):
 | 
				
			||||||
 | 
					                self.report.default_schema = db_row[0]
 | 
				
			||||||
 | 
					        except Exception as e:
 | 
				
			||||||
 | 
					            self.report.report_failure("current_schema", f"Error: {e}")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def inspect_role_grants(self) -> Any:
 | 
					    def inspect_role_grants(self) -> Any:
 | 
				
			||||||
        db_engine = self.get_metadata_engine()
 | 
					        db_engine = self.get_metadata_engine()
 | 
				
			||||||
@ -345,7 +366,7 @@ WHERE
 | 
				
			|||||||
            )
 | 
					            )
 | 
				
			||||||
        # Handles the case for explicitly created external tables.
 | 
					        # Handles the case for explicitly created external tables.
 | 
				
			||||||
        # NOTE: Snowflake does not log this information to the access_history table.
 | 
					        # NOTE: Snowflake does not log this information to the access_history table.
 | 
				
			||||||
        external_tables_query: str = "show external tables"
 | 
					        external_tables_query: str = "show external tables in account"
 | 
				
			||||||
        try:
 | 
					        try:
 | 
				
			||||||
            for db_row in engine.execute(external_tables_query):
 | 
					            for db_row in engine.execute(external_tables_query):
 | 
				
			||||||
                key = (
 | 
					                key = (
 | 
				
			||||||
@ -635,11 +656,7 @@ QUALIFY ROW_NUMBER() OVER (PARTITION BY downstream_table_name, upstream_table_na
 | 
				
			|||||||
        if not self.should_run_ingestion():
 | 
					        if not self.should_run_ingestion():
 | 
				
			||||||
            return
 | 
					            return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        try:
 | 
					        self.inspect_session_metadata()
 | 
				
			||||||
            self.inspect_version()
 | 
					 | 
				
			||||||
        except Exception as e:
 | 
					 | 
				
			||||||
            self.report.report_failure("version", f"Error: {e}")
 | 
					 | 
				
			||||||
            return
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        self.inspect_role_grants()
 | 
					        self.inspect_role_grants()
 | 
				
			||||||
        for wu in super().get_workunits():
 | 
					        for wu in super().get_workunits():
 | 
				
			||||||
 | 
				
			|||||||
@ -29,7 +29,10 @@ class SnowflakeReport(BaseSnowflakeReport, SQLSourceReport):
 | 
				
			|||||||
    provision_role_success: bool = False
 | 
					    provision_role_success: bool = False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # https://community.snowflake.com/s/topic/0TO0Z000000Unu5WAC/releases
 | 
					    # https://community.snowflake.com/s/topic/0TO0Z000000Unu5WAC/releases
 | 
				
			||||||
    saas_version: str = ""
 | 
					    saas_version: Optional[str] = None
 | 
				
			||||||
 | 
					    default_warehouse: Optional[str] = None
 | 
				
			||||||
 | 
					    default_db: Optional[str] = None
 | 
				
			||||||
 | 
					    default_schema: Optional[str] = None
 | 
				
			||||||
    role: str = ""
 | 
					    role: str = ""
 | 
				
			||||||
    check_role_grants: Optional[bool] = None
 | 
					    check_role_grants: Optional[bool] = None
 | 
				
			||||||
    role_grants: List[str] = field(default_factory=list)
 | 
					    role_grants: List[str] = field(default_factory=list)
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user