mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-04 12:51:23 +00:00 
			
		
		
		
	fix(ingest): changing datahub-graph to use underlying session connection. (#3743)
This commit is contained in:
		
							parent
							
								
									5c0a49d09a
								
							
						
					
					
						commit
						5f80e7a4b2
					
				@ -6,7 +6,6 @@ from typing import Any, Dict, List, Optional, Type, TypeVar
 | 
			
		||||
 | 
			
		||||
from avrogen.dict_wrapper import DictWrapper
 | 
			
		||||
from requests.models import HTTPError
 | 
			
		||||
from requests.sessions import Session
 | 
			
		||||
 | 
			
		||||
from datahub.configuration.common import ConfigModel, OperationalError
 | 
			
		||||
from datahub.emitter.rest_emitter import DatahubRestEmitter
 | 
			
		||||
@ -40,11 +39,10 @@ class DataHubGraph(DatahubRestEmitter):
 | 
			
		||||
            ca_certificate_path=self.config.ca_certificate_path,
 | 
			
		||||
        )
 | 
			
		||||
        self.test_connection()
 | 
			
		||||
        self.g_session = Session()
 | 
			
		||||
 | 
			
		||||
    def _get_generic(self, url: str) -> Dict:
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.g_session.get(url)
 | 
			
		||||
            response = self._session.get(url)
 | 
			
		||||
            response.raise_for_status()
 | 
			
		||||
            return response.json()
 | 
			
		||||
        except HTTPError as e:
 | 
			
		||||
@ -67,7 +65,7 @@ class DataHubGraph(DatahubRestEmitter):
 | 
			
		||||
        aspect_type: Type[Aspect],
 | 
			
		||||
    ) -> Optional[Aspect]:
 | 
			
		||||
        url = f"{self._gms_server}/aspects/{urllib.parse.quote(entity_urn)}?aspect={aspect}&version=0"
 | 
			
		||||
        response = self.g_session.get(url)
 | 
			
		||||
        response = self._session.get(url)
 | 
			
		||||
        if response.status_code == 404:
 | 
			
		||||
            # not found
 | 
			
		||||
            return None
 | 
			
		||||
@ -103,7 +101,7 @@ class DataHubGraph(DatahubRestEmitter):
 | 
			
		||||
        url = f"{self._gms_server}/aspects?action=getTimeseriesAspectValues"
 | 
			
		||||
        try:
 | 
			
		||||
            usage_aspects: List[DatasetUsageStatisticsClass] = []
 | 
			
		||||
            response = self.g_session.post(
 | 
			
		||||
            response = self._session.post(
 | 
			
		||||
                url, data=json.dumps(payload), headers=headers
 | 
			
		||||
            )
 | 
			
		||||
            if response.status_code != 200:
 | 
			
		||||
@ -135,7 +133,7 @@ class DataHubGraph(DatahubRestEmitter):
 | 
			
		||||
            "Content-Type": "application/json",
 | 
			
		||||
        }
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.g_session.post(
 | 
			
		||||
            response = self._session.post(
 | 
			
		||||
                url, data=json.dumps(payload), headers=headers
 | 
			
		||||
            )
 | 
			
		||||
            if response.status_code != 200:
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user