mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-08-22 16:08:13 +00:00
parent
67433f559c
commit
f33a08377c
@ -22,7 +22,8 @@
|
||||
"bulk_sink": {
|
||||
"type": "metadata-usage",
|
||||
"config": {
|
||||
"filename": "/tmp/sample_usage"
|
||||
"filename": "/tmp/sample_usage",
|
||||
"service_name": "bigquery_gcp"
|
||||
}
|
||||
},
|
||||
"metadata_server": {
|
||||
|
@ -33,6 +33,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class MetadataUsageSinkConfig(ConfigModel):
|
||||
filename: str
|
||||
service_name: str
|
||||
|
||||
|
||||
class MetadataUsageBulkSink(BulkSink):
|
||||
@ -51,17 +52,9 @@ class MetadataUsageBulkSink(BulkSink):
|
||||
self.file_handler = open(self.config.filename, "r")
|
||||
self.metadata = OpenMetadata(self.metadata_config)
|
||||
self.status = BulkSinkStatus()
|
||||
self.tables_dict = {}
|
||||
self.table_join_dict = {}
|
||||
self.__map_tables()
|
||||
self.today = datetime.today().strftime("%Y-%m-%d")
|
||||
|
||||
def __map_tables(self):
|
||||
table_entities = self.metadata.list_entities(entity=Table)
|
||||
for table in table_entities.entities:
|
||||
if table.name.__root__ not in self.tables_dict.keys():
|
||||
self.tables_dict[table.name.__root__] = table
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls, config_dict: dict, metadata_config_dict: dict, ctx: WorkflowContext
|
||||
@ -82,8 +75,10 @@ class MetadataUsageBulkSink(BulkSink):
|
||||
table_usage = TableUsageCount(**json.loads(record))
|
||||
if "." in table_usage.table:
|
||||
table_usage.table = table_usage.table.split(".")[1]
|
||||
if table_usage.table in self.tables_dict:
|
||||
table_entity = self.tables_dict[table_usage.table]
|
||||
table_entity = self.__get_table_entity(
|
||||
table_usage.database, table_usage.table
|
||||
)
|
||||
if table_entity is not None:
|
||||
table_usage_request = TableUsageRequest(
|
||||
date=table_usage.date, count=table_usage.count
|
||||
)
|
||||
@ -96,7 +91,6 @@ class MetadataUsageBulkSink(BulkSink):
|
||||
table_usage.table, err
|
||||
)
|
||||
)
|
||||
|
||||
table_join_request = self.__get_table_joins(table_usage)
|
||||
logger.debug("table join request {}".format(table_join_request))
|
||||
try:
|
||||
@ -130,20 +124,21 @@ class MetadataUsageBulkSink(BulkSink):
|
||||
def __get_table_joins(self, table_usage):
|
||||
table_joins: TableJoins = TableJoins(columnJoins=[], startDate=table_usage.date)
|
||||
column_joins_dict = {}
|
||||
joined_with = {}
|
||||
for column_join in table_usage.joins:
|
||||
joined_with = {}
|
||||
if column_join.table_column is None or len(column_join.joined_with) == 0:
|
||||
continue
|
||||
|
||||
if column_join.table_column.column in column_joins_dict.keys():
|
||||
joined_with = column_joins_dict[column_join.table_column.column]
|
||||
else:
|
||||
column_joins_dict[column_join.table_column.column] = {}
|
||||
|
||||
main_column_fqdn = self.__get_column_fqdn(column_join.table_column)
|
||||
main_column_fqdn = self.__get_column_fqdn(
|
||||
table_usage.database, column_join.table_column
|
||||
)
|
||||
for column in column_join.joined_with:
|
||||
joined_column_fqdn = self.__get_column_fqdn(column)
|
||||
|
||||
joined_column_fqdn = self.__get_column_fqdn(
|
||||
table_usage.database, column
|
||||
)
|
||||
if joined_column_fqdn in joined_with.keys():
|
||||
column_joined_with = joined_with[joined_column_fqdn]
|
||||
column_joined_with.joinCount += 1
|
||||
@ -153,7 +148,9 @@ class MetadataUsageBulkSink(BulkSink):
|
||||
fullyQualifiedName=joined_column_fqdn, joinCount=1
|
||||
)
|
||||
else:
|
||||
logger.info("Skipping join columns for {}".format(column))
|
||||
logger.info(
|
||||
f"Skipping join columns for {column} {joined_column_fqdn}"
|
||||
)
|
||||
column_joins_dict[column_join.table_column.column] = joined_with
|
||||
|
||||
for key, value in column_joins_dict.items():
|
||||
@ -162,14 +159,19 @@ class MetadataUsageBulkSink(BulkSink):
|
||||
)
|
||||
return table_joins
|
||||
|
||||
def __get_column_fqdn(self, table_column: TableColumn):
|
||||
if table_column.table not in self.tables_dict:
|
||||
def __get_column_fqdn(self, database: str, table_column: TableColumn):
|
||||
table_entity = self.__get_table_entity(database, table_column.table)
|
||||
if table_entity is None:
|
||||
return None
|
||||
table_entity = self.tables_dict[table_column.table]
|
||||
for tbl_column in table_entity.columns:
|
||||
if table_column.column.lower() == tbl_column.name.__root__.lower():
|
||||
return tbl_column.fullyQualifiedName.__root__
|
||||
|
||||
def __get_table_entity(self, database_name: str, table_name: str) -> Table:
|
||||
table_fqn = f"{self.config.service_name}.{database_name}.{table_name}"
|
||||
table_entity = self.metadata.get_by_name(Table, fqdn=table_fqn)
|
||||
return table_entity
|
||||
|
||||
def get_status(self):
|
||||
return self.status
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user