mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-10-30 18:26:58 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			85 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			85 lines
		
	
	
		
			2.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| from multiprocessing.pool import ThreadPool as Pool
 | |
| 
 | |
| import requests
 | |
| from datahub.emitter.serialization_helper import pre_json_transform
 | |
| from datahub.metadata.com.linkedin.pegasus2avro.common import (
 | |
|     BrowsePaths,
 | |
|     Owner,
 | |
|     Ownership,
 | |
|     OwnershipType,
 | |
| )
 | |
| from datahub.metadata.com.linkedin.pegasus2avro.dataset import DatasetProperties
 | |
| from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
 | |
| 
 | |
| 
 | |
| def _build_snapshot(id: int):
 | |
|     urn = _build_urn(id)
 | |
|     return DatasetSnapshot(
 | |
|         urn,
 | |
|         [
 | |
|             _build_properties(),
 | |
|             _build_ownership(id),
 | |
|             _build_browsepaths(id),
 | |
|         ],
 | |
|     )
 | |
| 
 | |
| 
 | |
| def _build_urn(id: int):
 | |
|     return f"urn:li:dataset:(urn:li:dataPlatform:bigquery,test_dataset_{id},PROD)"
 | |
| 
 | |
| 
 | |
| def _build_properties():
 | |
|     return DatasetProperties(description="This is a great dataset")
 | |
| 
 | |
| 
 | |
| def _build_browsepaths(id: int):
 | |
|     return BrowsePaths([f"/perf/testing/path/{id}"])
 | |
| 
 | |
| 
 | |
| def _build_ownership(id: int):
 | |
|     return Ownership(
 | |
|         [
 | |
|             Owner(f"urn:li:corpuser:test_{id}", OwnershipType.DATAOWNER),
 | |
|             Owner("urn:li:corpuser:common", OwnershipType.DATAOWNER),
 | |
|         ]
 | |
|     )
 | |
| 
 | |
| 
 | |
| def main(url: str, id: int):
 | |
|     proposed_snapshot = _build_snapshot(id)
 | |
|     snapshot_fqn = (
 | |
|         f"com.linkedin.metadata.snapshot.{proposed_snapshot.RECORD_SCHEMA.name}"
 | |
|     )
 | |
|     requests.post(
 | |
|         f"{url}/entities?action=ingest",
 | |
|         data=json.dumps(
 | |
|             {
 | |
|                 "entity": {
 | |
|                     "value": {
 | |
|                         snapshot_fqn: pre_json_transform(proposed_snapshot.to_obj())
 | |
|                     }
 | |
|                 }
 | |
|             }
 | |
|         ),
 | |
|     )
 | |
| 
 | |
| 
 | |
| def worker(index: int):
 | |
|     try:
 | |
|         main("http://localhost:8080", index)
 | |
|     except RuntimeError as e:
 | |
|         print(f"error with {index}")
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
| 
 | |
|     POOL_SIZE = 10
 | |
|     DATASETS = 100000
 | |
| 
 | |
|     pool = Pool(POOL_SIZE)
 | |
|     for i in range(1, DATASETS + 1):
 | |
|         pool.apply_async(worker, (i,))
 | |
|     pool.close()
 | |
|     pool.join()
 | 
