Ingestion: Refactor, docs, airflow example

This commit is contained in:
Suresh Srinivas 2021-08-14 11:43:53 -07:00
parent 8256de30e9
commit 5191cc2dca
3 changed files with 18 additions and 12 deletions

View File

@ -20,7 +20,7 @@ from metadata.ingestion.api.common import WorkflowContext, Record
from metadata.ingestion.api.sink import Sink, SinkStatus
from metadata.ingestion.models.user import MetadataTeam, MetadataUser
from metadata.ingestion.ometa.auth_provider import MetadataServerConfig
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.client import REST, APIError
logger = logging.getLogger(__name__)
@ -62,16 +62,19 @@ class MetadataRestUsersSink(Sink):
team_response = self.rest.get(self.api_team_get)
for team in team_response['data']:
self.team_entities[team['name']] = team['id']
self.team_entities[team['displayName']] = team['id']
def _create_team(self, record: MetadataUser) -> None:
team_name = record.team_name
metadata_team = MetadataTeam(team_name, 'Team Name')
r = self.rest.post(self.api_team_post,
data=metadata_team.to_json()
)
instance_id = r['id']
self.team_entities[team_name] = instance_id
try:
r = self.rest.post(self.api_team_post,
data=metadata_team.to_json()
)
instance_id = r['id']
self.team_entities[team_name] = instance_id
except APIError:
pass
def _create_user(self, record: MetadataUser) -> None:
if record.team_name not in self.team_entities:
@ -82,9 +85,12 @@ class MetadataRestUsersSink(Sink):
display_name=record.name,
email=record.email,
teams=teams)
self.rest.post(self.api_users, data=metadata_user.to_json())
self.status.records_written(record.github_username)
logger.info("Sink: {}".format(record.github_username))
try:
self.rest.post(self.api_users, data=metadata_user.to_json())
self.status.records_written(record.github_username)
logger.info("Sink: {}".format(record.github_username))
except APIError:
pass
def get_status(self) -> SinkStatus:
return self.status

View File

@ -53,7 +53,7 @@ class SampleUserMetadataGenerator:
schema['full_name'] = lambda: None
schema['github_username'] = lambda: None
schema['team_name'] = lambda: random.choice(
['Data_Infra', 'Infra', 'Payments', 'Legal', 'Dev_Platform', 'Trust', 'Marketplace'])
['Data Platform', 'Cloud Infra', 'Payments', 'Legal', 'Customer Support', 'Finance', 'Marketplace'])
schema['employee_type'] = lambda: None
schema['manager_email'] = lambda: fake.email()
schema['slack_id'] = lambda: None

View File

@ -234,9 +234,9 @@ class SQLSource(Source):
db = DatabaseEntity(id=uuid.uuid4(),
name=schema,
description=description if description is not None else ' ',
service=EntityReference(id=self.service.id, type=self.config.service_type))
table = TableEntity(name=table,
description=description if description is not None else ' ',
columns=table_columns)
table_and_db = OMetaDatabaseAndTable(table=table, database=db)