Bigquery Mark Paertitioned Tables (#7007)

This commit is contained in:
Mayur Singal 2022-08-29 17:45:59 +05:30 committed by GitHub
parent 3b67cc824d
commit 58d27811d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 90 additions and 38 deletions

View File

@ -19,7 +19,8 @@
"SecureView",
"MaterializedView",
"Iceberg",
"Local"
"Local",
"Partitioned"
],
"javaEnums": [
{
@ -42,6 +43,9 @@
},
{
"name": "Local"
},
{
"name": "Partitioned"
}
]
},
@ -150,7 +154,6 @@
"description": "partition interval , example hourly, daily, monthly."
}
},
"required": ["columns", "intervalType", "interval"],
"additionalProperties": false
},
"column": {
@ -412,11 +415,21 @@
},
"min": {
"description": "Minimum value in a column.",
"oneOf": [{ "type": "number"}, { "type": "integer"}, { "$ref": "../../type/basic.json#/definitions/dateTime" }, { "$ref": "../../type/basic.json#/definitions/date" }]
"oneOf": [
{ "type": "number" },
{ "type": "integer" },
{ "$ref": "../../type/basic.json#/definitions/dateTime" },
{ "$ref": "../../type/basic.json#/definitions/date" }
]
},
"max": {
"description": "Maximum value in a column.",
"oneOf": [{ "type": "number"}, { "type": "integer"}, { "$ref": "../../type/basic.json#/definitions/dateTime" }, { "$ref": "../../type/basic.json#/definitions/date" }]
"oneOf": [
{ "type": "number" },
{ "type": "integer" },
{ "$ref": "../../type/basic.json#/definitions/dateTime" },
{ "$ref": "../../type/basic.json#/definitions/date" }
]
},
"minLength": {
"description": "Minimum string length in a column.",
@ -476,20 +489,20 @@
"description": "This schema defines the type for Table profile config include Columns.",
"type": "object",
"javaType": "org.openmetadata.catalog.type.ColumnProfilerConfig",
"properties": {
"columnName": {
"description": "Column Name of the table to be included.",
"properties": {
"columnName": {
"description": "Column Name of the table to be included.",
"type": "string"
},
"metrics": {
"description": "Include only following metrics.",
"type": "array",
"items": {
"type": "string"
},
"metrics": {
"description": "Include only following metrics.",
"type": "array",
"items": {
"type": "string"
},
"default": null
}
}
"default": null
}
}
},
"tableProfilerConfig": {
"type": "object",

View File

@ -13,9 +13,10 @@ We require Taxonomy Admin permissions to fetch all Policy Tags
"""
import os
import traceback
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Tuple
from google import auth
from google.cloud.bigquery.client import Client
from google.cloud.datacatalog_v1 import PolicyTagManagerClient
from sqlalchemy import inspect
from sqlalchemy.engine.reflection import Inspector
@ -30,7 +31,11 @@ from metadata.generated.schema.api.tags.createTag import CreateTagRequest
from metadata.generated.schema.api.tags.createTagCategory import (
CreateTagCategoryRequest,
)
from metadata.generated.schema.entity.data.table import TableType
from metadata.generated.schema.entity.data.table import (
IntervalType,
TablePartition,
TableType,
)
from metadata.generated.schema.entity.services.connections.database.bigQueryConnection import (
BigQueryConnection,
)
@ -95,6 +100,10 @@ class BigquerySource(CommonDbSourceService):
self.temp_credentials = None
self.project_id = self.set_project_id()
def prepare(self):
self.client = Client()
return super().prepare()
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataConnection):
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
@ -194,6 +203,41 @@ class BigquerySource(CommonDbSourceService):
view_definition = ""
return view_definition
def get_table_partition_details(
self, table_name: str, schema_name: str, inspector: Inspector
) -> Tuple[bool, TablePartition]:
"""
check if the table is partitioned table and return the partition details
"""
database = self.context.database.name.__root__
table = self.client.get_table(f"{database}.{schema_name}.{table_name}")
if table.time_partitioning is not None:
table_partition = TablePartition(
interval=str(table.partitioning_type),
intervalType=IntervalType.TIME_UNIT.value,
)
if (
hasattr(table.time_partitioning, "field")
and table.time_partitioning.field
):
table_partition.columns = [table.time_partitioning.field]
return True, table_partition
elif table.range_partitioning:
table_partition = TablePartition(
intervalType=IntervalType.INTEGER_RANGE.value,
)
if hasattr(table.range_partitioning, "range_") and hasattr(
table.range_partitioning.range_, "interval"
):
table_partition.interval = table.range_partitioning.range_.interval
if (
hasattr(table.range_partitioning, "field")
and table.range_partitioning.field
):
table_partition.columns = [table.range_partitioning.field]
return True, table_partition
return False, None
def parse_raw_data_type(self, raw_data_type):
return raw_data_type.replace(", ", ",").replace(" ", ":").lower()

View File

@ -29,7 +29,7 @@ from metadata.generated.schema.api.data.createDatabaseSchema import (
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table, TableType
from metadata.generated.schema.entity.data.table import Table, TablePartition, TableType
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
@ -206,17 +206,6 @@ class CommonDbSourceService(
"Table pattern not allowed",
)
continue
if self.is_partition(
table_name=table_name,
schema_name=schema_name,
inspector=self.inspector,
):
self.status.filter(
f"{self.config.serviceName}.{table_name}",
"Table is partition",
)
continue
table_name = self.standardize_table_name(schema_name, table_name)
yield table_name, TableType.Regular
@ -262,6 +251,14 @@ class CommonDbSourceService(
) -> bool:
return False
def get_table_partition_details(
self, table_name: str, schema_name: str, inspector: Inspector
) -> Tuple[bool, TablePartition]:
"""
check if the table is partitioned table and return the partition details
"""
return False, None # By default the table will be a Regular Table
def yield_tag(self, schema_name: str) -> Iterable[OMetaTagAndCategory]:
pass
@ -311,6 +308,12 @@ class CommonDbSourceService(
table_name=table_name
), # Pick tags from context info, if any
)
is_partitioned, partiotion_details = self.get_table_partition_details(
table_name=table_name, schema_name=schema_name, inspector=self.inspector
)
if is_partitioned:
table_request.tableType = TableType.Partitioned.value
table_request.tablePartition = partiotion_details
if table_type == TableType.View or view_definition:
table_view = {

View File

@ -64,14 +64,6 @@ class SqlAlchemySource(ABC):
Method returns the table level comment
"""
@abstractmethod
def is_partition(
self, table_name: str, schema_name: str, inspector: Inspector
) -> bool:
"""
Method to check if the table is partitioned table
"""
@abstractmethod
def get_data_model(
self, database: str, schema_name: str, table_name: str