diff --git a/ingestion/src/metadata/ingestion/source/bigquery.py b/ingestion/src/metadata/ingestion/source/bigquery.py index ee078f7cd53..b3313ca2d37 100644 --- a/ingestion/src/metadata/ingestion/source/bigquery.py +++ b/ingestion/src/metadata/ingestion/source/bigquery.py @@ -8,7 +8,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import json import logging import os @@ -23,14 +22,18 @@ from sqlalchemy_bigquery._types import ( _get_transitive_schema_fields, ) +from metadata.generated.schema.api.tags.createTagCategory import ( + CreateTagCategoryRequest, +) +from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.entity.services.databaseService import ( DatabaseServiceType, ) -from metadata.generated.schema.entity.tags.tagCategory import TagCategory from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig from metadata.ingestion.source.sql_source import SQLSource from metadata.ingestion.source.sql_source_common import SQLConnectionConfig from metadata.utils.column_type_parser import create_sqlalchemy_type +from metadata.utils.helpers import get_start_and_end logger = logging.getLogger(__name__) GEOGRAPHY = create_sqlalchemy_type("GEOGRAPHY") @@ -78,6 +81,7 @@ class BigQueryConfig(SQLConnectionConfig): project_id: Optional[str] = None duration: int = 1 service_type = DatabaseServiceType.BigQuery.value + partition_query: str = 'select * from {}.{} WHERE DATE({}) >= "{}" LIMIT 1000' enable_policy_tags: bool = False tag_category_name: str = "BigqueryPolicyTags" @@ -97,7 +101,7 @@ class BigquerySource(SQLSource): try: if self.config.enable_policy_tags: self.metadata.create_tag_category( - TagCategory( + CreateTagCategoryRequest( name=self.config.tag_category_name, description="", categoryType="Classification", @@ -145,6 +149,33 @@ class BigquerySource(SQLSource): raise ValueError(f"schema {schema} does not match table {table}") return segments[0], segments[1] + def fetch_sample_data(self, schema: str, table: str) -> Optional[TableData]: + resp_sample_data = super().fetch_sample_data(schema, table) + if not resp_sample_data and self.config.partition_query: + try: + logger.info("Using Query for Partitioned Tables") + partition_details = self.inspector.get_indexes(table, schema) + start, end = get_start_and_end(self.config.duration) + + query = self.config.partition_query.format( + schema, + table, + partition_details[0]["column_names"][0], + start.strftime("%Y-%m-%d"), + ) + logger.info(query) + results = self.connection.execute(query) + cols = [] + for col in results.keys(): + cols.append(col.replace(".", "_DOT_")) + rows = [] + for res in results: + row = list(res) + rows.append(row) + return TableData(columns=cols, rows=rows) + except Exception as err: + logger.error(err) + def parse_raw_data_type(self, raw_data_type): return raw_data_type.replace(", ", ",").replace(" ", ":").lower() diff --git a/ingestion/src/metadata/ingestion/source/sql_source.py b/ingestion/src/metadata/ingestion/source/sql_source.py index 5b8983474f4..584e7d58c4c 100644 --- a/ingestion/src/metadata/ingestion/source/sql_source.py +++ b/ingestion/src/metadata/ingestion/source/sql_source.py @@ -24,6 +24,7 @@ from sqlalchemy.engine.reflection import Inspector from sqlalchemy.inspection import inspect from sqlalchemy.orm import Session +from metadata.generated.schema.api.tags.createTag import CreateTagRequest from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import ( Column, @@ -37,8 +38,8 @@ from metadata.generated.schema.entity.data.table import ( TableData, TableProfile, ) -from metadata.generated.schema.entity.tags.tagCategory import Tag from metadata.generated.schema.type.entityReference import EntityReference +from metadata.generated.schema.type.tagLabel import Source as TagSource from metadata.generated.schema.type.tagLabel import TagLabel from metadata.ingestion.api.common import Entity, WorkflowContext from metadata.ingestion.api.source import Source, SourceStatus @@ -198,6 +199,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): def next_record(self) -> Iterable[Entity]: inspector = inspect(self.engine) schema_names = inspector.get_schema_names() + for schema in schema_names: # clear any previous source database state self.database_source_state.clear() @@ -231,7 +233,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): "Table pattern not allowed", ) continue - if self._is_partition(table_name, schema): + if self._is_partition(table_name, schema, inspector): self.status.filter( f"{self.config.get_service_name()}.{table_name}", "Table is partition", @@ -363,7 +365,8 @@ class SQLSource(Source[OMetaDatabaseAndTable]): if table.fullyQualifiedName not in self.database_source_state: yield DeleteTable(table=table) - def _is_partition(self, table_name: str, schema: str) -> bool: + def _is_partition(self, table_name: str, schema: str, inspector) -> bool: + self.inspector = inspector return False def _parse_data_model(self): @@ -638,10 +641,11 @@ class SQLSource(Source[OMetaDatabaseAndTable]): and "policy_tags" in column and column["policy_tags"] ): - self.metadata.create_tag_category( - category=self.config.tag_category_name, - data=Tag( - name=column["policy_tags"], description="" + self.metadata.create_primary_tag( + category_name=self.config.tag_category_name, + primary_tag_body=CreateTagRequest( + name=column["policy_tags"], + description="Bigquery Policy Tag", ), ) except APIError: @@ -651,6 +655,7 @@ class SQLSource(Source[OMetaDatabaseAndTable]): tagFQN=f"{self.config.tag_category_name}.{column['policy_tags']}", labelType="Automated", state="Suggested", + source=TagSource.Tag.name, ) ] except Exception as err: