From 20b7dd3642a9f0cfe7d42178f2630ca2faa267fc Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 4 Jan 2024 16:34:08 -0500 Subject: [PATCH] fix(ingest/looker): limit dashboard object lifetime to thread (#9564) --- .../ingestion/source/looker/looker_source.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index 0cce267bf5..cecf6164e3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -1044,7 +1044,7 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase): self, dashboard_id: str, fields: List[str] ) -> Tuple[ List[MetadataWorkUnit], - Optional[Dashboard], + Optional[looker_usage.LookerDashboardForUsage], str, datetime.datetime, datetime.datetime, @@ -1095,9 +1095,15 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase): ) workunits.extend(metric_dim_workunits) self.reporter.report_dashboards_scanned() + + # generate usage tracking object + dashboard_usage = looker_usage.LookerDashboardForUsage.from_dashboard( + dashboard_object + ) + return ( workunits, - dashboard_object, + dashboard_usage, dashboard_id, start_time, datetime.datetime.now(), @@ -1287,7 +1293,7 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase): for job in concurrent.futures.as_completed(async_workunits): ( work_units, - dashboard_object, + dashboard_usage, dashboard_id, start_time, end_time, @@ -1299,12 +1305,8 @@ class LookerDashboardSource(TestableSource, StatefulIngestionSourceBase): self.reporter.report_upstream_latency(start_time, end_time) yield from work_units - if dashboard_object is not None: - looker_dashboards_for_usage.append( - looker_usage.LookerDashboardForUsage.from_dashboard( - dashboard_object - ) - ) + if dashboard_usage is not None: + looker_dashboards_for_usage.append(dashboard_usage) self.reporter.report_stage_end("dashboard_chart_metadata")