diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 0ee1269f32..16505f4d27 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -631,7 +631,6 @@ timestamp < "{end_time}" return None def test_capability(self, project_id: str) -> None: - lineage_metadata: Dict[str, Set[str]] if self.config.use_exported_bigquery_audit_metadata: bigquery_client: BigQueryClient = BigQueryClient(project=project_id) entries = self._get_exported_bigquery_audit_metadata( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index 7227c209f8..97915586e8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -4,7 +4,7 @@ import textwrap import time from dataclasses import dataclass from datetime import datetime -from typing import Any, Dict, Iterable, List, MutableMapping, Optional, Set, Union, cast +from typing import Any, Dict, Iterable, List, MutableMapping, Optional, Union, cast import cachetools from google.cloud.bigquery import Client as BigQueryClient @@ -443,7 +443,6 @@ class BigQueryUsageExtractor: ) -> Optional[OperationalDataMeta]: # If we don't have Query object that means this is a queryless read operation or a read operation which was not executed as JOB # https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason/ - operation_meta: OperationalDataMeta if not event.query_event and event.read_event: return OperationalDataMeta( statement_type=OperationTypeClass.CUSTOM, @@ -839,7 +838,6 @@ class BigQueryUsageExtractor: ) def test_capability(self, project_id: str) -> None: - lineage_metadata: Dict[str, Set[str]] for entry in self._get_parsed_bigquery_log_events(project_id, limit=1): logger.debug(f"Connection test got one {entry}") return diff --git a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py index e2f2e586df..4100944079 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/confluent_schema_registry.py @@ -141,7 +141,8 @@ class ConfluentSchemaRegistry(KafkaSchemaRegistryBase): if schema_seen is None: schema_seen = set() - for schema_ref in schema.references: # type: SchemaReference + schema_ref: SchemaReference + for schema_ref in schema.references: ref_subject: str = schema_ref["subject"] if ref_subject in schema_seen: continue diff --git a/metadata-ingestion/src/datahub_provider/operators/datahub_operation_operator.py b/metadata-ingestion/src/datahub_provider/operators/datahub_operation_operator.py index 905b7f71e2..e5e45c2bf4 100644 --- a/metadata-ingestion/src/datahub_provider/operators/datahub_operation_operator.py +++ b/metadata-ingestion/src/datahub_provider/operators/datahub_operation_operator.py @@ -83,7 +83,6 @@ class DataHubOperationCircuitBreakerOperator(BaseSensorOperator): else: raise Exception(f"urn parameter has invalid type {type(self.urn)}") - partition: Optional[str] for urn in urns: self.log.info(f"Checking if dataset {self.urn} is ready to be consumed") ret = self.circuit_breaker.is_circuit_breaker_active(