Fixed markDeletedTables for bigquery (#4930)

Co-authored-by: Onkar Ravgan <onkarravgan@Onkars-MacBook-Pro.local>
This commit is contained in:
Onkar Ravgan 2022-05-13 16:10:48 +05:30 committed by GitHub
parent 5be22fe154
commit 4ef1f4b729
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -9,6 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import traceback
import uuid
from typing import Iterable, Optional, Tuple
@ -36,11 +37,12 @@ from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.source.sql_source import SQLSource, _get_table_description
from metadata.utils.column_type_parser import create_sqlalchemy_type
from metadata.utils.filters import filter_by_table
from metadata.utils.filters import filter_by_schema, filter_by_table
from metadata.utils.helpers import get_start_and_end
from metadata.utils.logger import ingestion_logger
@ -171,6 +173,31 @@ class BigquerySource(SQLSource):
),
)
def next_record(self) -> Iterable[Entity]:
for inspector in self.get_databases():
schema_names = inspector.get_schema_names()
for schema in schema_names:
# clear any previous source database state
try:
self.database_source_state.clear()
if filter_by_schema(
self.source_config.schemaFilterPattern, schema_name=schema
):
self.status.filter(schema, "Schema pattern not allowed")
continue
if self.source_config.includeTables:
yield from self.fetch_tables(inspector, schema)
if self.source_config.includeViews:
yield from self.fetch_views(inspector, schema)
if self.source_config.markDeletedTables:
schema_fqdn = f"{self.config.serviceName}.{self.service_connection.projectId}.{schema}"
yield from self.delete_tables(schema_fqdn)
except Exception as err:
logger.debug(traceback.format_exc())
logger.error(err)
def fetch_views(
self, inspector: Inspector, schema: str
) -> Iterable[OMetaDatabaseAndTable]: