mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-04 12:51:23 +00:00 
			
		
		
		
	fix(ingest): handle 'fields' list missing in bigquery-usage (#2844)
This commit is contained in:
		
							parent
							
								
									f6e72291c3
								
							
						
					
					
						commit
						fd02f711cd
					
				@ -120,7 +120,7 @@ class ReadEvent:
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    resource: BigQueryTableRef
 | 
					    resource: BigQueryTableRef
 | 
				
			||||||
    fieldsRead: List[str]
 | 
					    fieldsRead: List[str]
 | 
				
			||||||
    readReason: str
 | 
					    readReason: Optional[str]
 | 
				
			||||||
    jobName: Optional[str]
 | 
					    jobName: Optional[str]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    payload: Any
 | 
					    payload: Any
 | 
				
			||||||
@ -143,8 +143,8 @@ class ReadEvent:
 | 
				
			|||||||
        resourceName = entry.payload["resourceName"]
 | 
					        resourceName = entry.payload["resourceName"]
 | 
				
			||||||
        readInfo = entry.payload["metadata"]["tableDataRead"]
 | 
					        readInfo = entry.payload["metadata"]["tableDataRead"]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        fields = readInfo["fields"]
 | 
					        fields = readInfo.get("fields", [])
 | 
				
			||||||
        readReason = readInfo["reason"]
 | 
					        readReason = readInfo.get("reason")
 | 
				
			||||||
        jobName = None
 | 
					        jobName = None
 | 
				
			||||||
        if readReason == "JOB":
 | 
					        if readReason == "JOB":
 | 
				
			||||||
            jobName = readInfo["jobName"]
 | 
					            jobName = readInfo["jobName"]
 | 
				
			||||||
@ -284,7 +284,7 @@ class BigQueryUsageSource(Source):
 | 
				
			|||||||
        self, entries: Iterable[AuditLogEntry]
 | 
					        self, entries: Iterable[AuditLogEntry]
 | 
				
			||||||
    ) -> Iterable[Union[ReadEvent, QueryEvent]]:
 | 
					    ) -> Iterable[Union[ReadEvent, QueryEvent]]:
 | 
				
			||||||
        for entry in entries:
 | 
					        for entry in entries:
 | 
				
			||||||
            event: Union[ReadEvent, QueryEvent]
 | 
					            event: Union[None, ReadEvent, QueryEvent] = None
 | 
				
			||||||
            if ReadEvent.can_parse_entry(entry):
 | 
					            if ReadEvent.can_parse_entry(entry):
 | 
				
			||||||
                event = ReadEvent.from_entry(entry)
 | 
					                event = ReadEvent.from_entry(entry)
 | 
				
			||||||
            elif QueryEvent.can_parse_entry(entry):
 | 
					            elif QueryEvent.can_parse_entry(entry):
 | 
				
			||||||
@ -294,7 +294,8 @@ class BigQueryUsageSource(Source):
 | 
				
			|||||||
                    f"{entry.log_name}-{entry.insert_id}",
 | 
					                    f"{entry.log_name}-{entry.insert_id}",
 | 
				
			||||||
                    f"unable to parse log entry: {entry!r}",
 | 
					                    f"unable to parse log entry: {entry!r}",
 | 
				
			||||||
                )
 | 
					                )
 | 
				
			||||||
            yield event
 | 
					            if event:
 | 
				
			||||||
 | 
					                yield event
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    def _join_events_by_job_id(
 | 
					    def _join_events_by_job_id(
 | 
				
			||||||
        self, events: Iterable[Union[ReadEvent, QueryEvent]]
 | 
					        self, events: Iterable[Union[ReadEvent, QueryEvent]]
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user