Sample Data Ingestion use Create request (#6546)

* Sample Data Ingestion use Create request

* Fix: Code smell

* Fix: make_pyformat

* Fix: Changed based on comments
This commit is contained in:
Milan Bariya 2022-08-04 14:43:48 +05:30 committed by GitHub
parent dbc4e7fe36
commit 45e6c2c7d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 123 additions and 70 deletions

View File

@ -1,7 +1,6 @@
{
"tables": [
{
"id": "3cda8ecb-f4c6-4ed4-8506-abe965b54b86",
"name": "dim_address",
"description": "This dimension table contains the billing and shipping addresses of customers. You can join this table with the sales table to generate lists of the billing and shipping addresses. Customers can enter their addresses more than once, so the same address can appear in more than one row in this table. This table contains one row per customer address.",
"version": 0.1,
@ -1044,7 +1043,6 @@
}
},
{
"id": "1cda9ecb-f4c6-4ed4-8506-abe965b64c87",
"name": "dim_address_clean",
"description": "Created from dim_address after a small cleanup.",
"version": 0.1,
@ -1379,7 +1377,6 @@
}
},
{
"id": "753833e0-f526-47b6-8555-97f5bb2882d5",
"name": "dim.api/client",
"description": "This dimension table contains a row for each channel or app that your customers use to create orders. Some examples of these include Facebook and Online Store. You can join this table with the sales table to measure channel performance.",
"version": 0.1,
@ -1725,7 +1722,6 @@
}
},
{
"id": "9c96042f-311a-4826-b5b2-8658914e2ee8",
"name": "dim_customer",
"description": "The dimension table contains data about your customers. The customers table contains one row per customer. It includes historical metrics (such as the total amount that each customer has spent in your store) as well as forward-looking metrics (such as the predicted number of days between future orders and the expected order value in the next 30 days). This table also includes columns that segment customers into various categories (such as new, returning, promising, at risk, dormant, and loyal), which you can use to target marketing activities.",
"version": 0.1,
@ -4021,7 +4017,6 @@
}
},
{
"id": "e64426b9-852a-468e-ac16-1231bb01fe96",
"name": "dim_location",
"description": "The dimension table contains metrics about your Shopify POS. This table contains one row per Shopify POS location. You can use this table to generate a list of the Shopify POS locations or you can join the table with the sales table to measure sales performance.",
"version": 0.1,
@ -4349,7 +4344,6 @@
}
},
{
"id": "f888bde6-ce08-42c3-96aa-1d726e248930",
"name": "dim.product",
"description": "This dimension table contains information about each of the products in your store. This table contains one row per product. This table reflects the current state of products in your Shopify admin.",
"version": 0.1,
@ -4735,7 +4729,6 @@
}
},
{
"id": "c71690a3-0764-4791-a1d3-0c47f1e0c2ab",
"name": "dim.product.variant",
"description": "This dimension table contains current information about each of the product variants in your store. This table contains one row per product variant.",
"version": 0.1,
@ -5210,7 +5203,6 @@
}
},
{
"id": "bf6a7486-617d-474c-8d40-0f3735cc9272",
"name": "dim.shop",
"description": "This dimension table contains online shop information. This table contains one shop per row.",
"version": 0.1,
@ -5493,7 +5485,6 @@
}
},
{
"id": "7a0483c9-68c8-4faa-a553-a353e135aeff",
"name": "dim_staff",
"description": "This dimension table contains information about the staff accounts in the store. It contains one row per staff account. Use this table to generate a list of your staff accounts, or join it with the sales, API clients and locations tables to analyze staff performance at Shopify POS locations.",
"version": 0.1,
@ -5918,7 +5909,6 @@
}
},
{
"id": "ebb535b6-234a-477f-8713-b0bc7ff5897f",
"name": "fact_line_item",
"fullyQualifiedName": "sample_data.shopify.fact_line_item",
"description": "The fact table contains information about the line items in orders. Each row in the table is a line item in an order. It contains product and product variant details as they were at the time of the order. This table does not include information about returns. Join this table with the TODO fact_sales table to get the details of the product on the day it was sold. This data will match what appears on the order in your Shopify admin as well as the in the Sales reports.",
@ -6631,7 +6621,6 @@
}
},
{
"id": "6bd37162-1eb2-4c22-88ef-aedb7bf35d23",
"name": "fact_order",
"fullyQualifiedName": "sample_data.shopify.fact_order",
"description": "The orders table contains information about each order in your store. Although this table is good for generating order lists and joining with the dim_customer, use the sales table instead for computing financial or other metrics.",
@ -7427,7 +7416,6 @@
}
},
{
"id": "d16bcfa1-a63b-4f06-b042-9dd39a8c429b",
"name": "fact_sale",
"fullyQualifiedName": "sample_data.shopify.fact_sale",
"description": "The fact table captures the value of products sold or returned, as well as the values of other charges such as taxes and shipping costs. The sales table contains one row per order line item, one row per returned line item, and one row per shipping charge. Use this table when you need financial metrics.",
@ -9865,7 +9853,6 @@
}
},
{
"id": "e12a4906-aaa3-47f8-960c-f19409a5e5a4",
"name": "fact_session",
"description": "This fact table contains information about the visitors to your online store. This table has one row per session, where one session can contain many page views. If you use Urchin Traffic Module (UTM) parameters in marketing campaigns, then you can use this table to track how many customers they direct to your store.",
"version": 0.1,
@ -12126,7 +12113,6 @@
}
},
{
"id": "d3e3283f-9a51-4de9-add7-c12c3ed58ba1",
"name": "raw_customer",
"description": "This is a raw customers table as represented in our online DB. This contains personal, shipping and billing addresses and details of the customer store and customer profile. This table is used to build our dimensional and fact tables",
"version": 0.1,
@ -13019,7 +13005,6 @@
}
},
{
"id": "96b4af15-3bdf-4483-a602-3014bbd8ebc6",
"name": "raw_order",
"description": "This is a raw orders table as represented in our online DB. This table contains all the orders by the customers and can be used to buid our dim and fact tables",
"version": 0.1,
@ -14068,7 +14053,6 @@
}
},
{
"id": "afce49e7-8ae1-41e3-971e-8f79c254b24a",
"name": "raw_product_catalog",
"description": "This is a raw product catalog table contains the product listing, price, seller etc.. represented in our online DB. ",
"version": 0.1,

View File

@ -14,7 +14,6 @@ import json
import os
import sys
import traceback
import uuid
from collections import namedtuple
from dataclasses import dataclass, field
from typing import Any, Dict, Iterable, List, Union
@ -23,9 +22,14 @@ from pydantic import ValidationError
from metadata.generated.schema.api.data.createChart import CreateChartRequest
from metadata.generated.schema.api.data.createDashboard import CreateDashboardRequest
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.api.data.createDatabaseSchema import (
CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createLocation import CreateLocationRequest
from metadata.generated.schema.api.data.createMlModel import CreateMlModelRequest
from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.api.data.createTopic import CreateTopicRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.api.teams.createRole import CreateRoleRequest
@ -74,6 +78,8 @@ from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.models.table_tests import OMetaTableTest
from metadata.ingestion.models.user import OMetaUserProfile
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.database_service import TableLocationLink
from metadata.utils import fqn
from metadata.utils.helpers import (
get_chart_entities_from_id,
get_standard_chart_type,
@ -409,83 +415,146 @@ class SampleDataSource(Source[Entity]):
)
yield location_ev
def ingest_glue(self) -> Iterable[OMetaDatabaseAndTable]:
db = Database(
id=uuid.uuid4(),
name=self.glue_database["name"],
description=self.glue_database["description"],
def ingest_glue(self):
db = CreateDatabaseRequest(
name=self.database["name"],
description=self.database["description"],
service=EntityReference(
id=self.glue_database_service.id,
type=self.glue_database_service.serviceType.value,
id=self.database_service.id.__root__, type="databaseService"
),
)
db_schema = DatabaseSchema(
id=uuid.uuid4(),
name=self.glue_database_schema["name"],
description=self.glue_database_schema["description"],
service=EntityReference(
id=self.glue_database_service.id,
type=self.glue_database_service.serviceType.value,
),
database=EntityReference(id=db.id, type="database"),
yield db
database_entity = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.database_service.name.__root__,
database_name=db.name.__root__,
)
database_object = self.metadata.get_by_name(
entity=Database, fqn=database_entity
)
schema = CreateDatabaseSchemaRequest(
name=self.database_schema["name"],
description=self.database_schema["description"],
database=EntityReference(id=database_object.id, type="database"),
)
yield schema
database_schema_entity = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.database_service.name.__root__,
database_name=db.name.__root__,
schema_name=schema.name.__root__,
)
database_schema_object = self.metadata.get_by_name(
entity=DatabaseSchema, fqn=database_schema_entity
)
for table in self.glue_tables["tables"]:
table["id"] = uuid.uuid4()
parameters = table.get("Parameters")
table = {key: val for key, val in table.items() if key != "Parameters"}
table_metadata = Table(**table)
location_type = LocationType.Table
if parameters:
location_type = (
location_type
if parameters.get("table_type") != "ICEBERG"
else LocationType.Iceberg
)
location_metadata = Location(
id=uuid.uuid4(),
table_request = CreateTableRequest(
name=table["name"],
path="s3://glue_bucket/dwh/schema/" + table["name"],
description=table["description"],
locationType=location_type,
columns=table["columns"],
databaseSchema=EntityReference(
id=database_schema_object.id, type="databaseSchema"
),
tableConstraints=table.get("tableConstraints"),
tableType=table["tableType"],
)
self.status.scanned("table", table_request.name.__root__)
yield table_request
location = CreateLocationRequest(
name=table["name"],
service=EntityReference(
id=self.glue_storage_service.id, type="storageService"
),
)
db_table_location = OMetaDatabaseAndTable(
database=db,
table=table_metadata,
location=location_metadata,
database_schema=db_schema,
)
self.status.scanned("table", table_metadata.name.__root__)
yield db_table_location
self.status.scanned("location", location.name)
yield location
def ingest_tables(self) -> Iterable[OMetaDatabaseAndTable]:
db = Database(
id=uuid.uuid4(),
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=self.database_service.name.__root__,
database_name=db.name.__root__,
schema_name=schema.name.__root__,
table_name=table_request.name.__root__,
)
location_fqn = fqn.build(
self.metadata,
entity_type=Location,
service_name=self.glue_storage_service.name.__root__,
location_name=location.name.__root__,
)
if table_fqn and location_fqn:
yield TableLocationLink(table_fqn=table_fqn, location_fqn=location_fqn)
def ingest_tables(self):
db = CreateDatabaseRequest(
name=self.database["name"],
description=self.database["description"],
service=EntityReference(
id=self.database_service.id, type=self.service_connection.type.value
id=self.database_service.id.__root__, type="databaseService"
),
)
schema = DatabaseSchema(
id=uuid.uuid4(),
yield db
database_entity = fqn.build(
self.metadata,
entity_type=Database,
service_name=self.database_service.name.__root__,
database_name=db.name.__root__,
)
database_object = self.metadata.get_by_name(
entity=Database, fqn=database_entity
)
schema = CreateDatabaseSchemaRequest(
name=self.database_schema["name"],
description=self.database_schema["description"],
service=EntityReference(
id=self.database_service.id, type=self.service_connection.type.value
),
database=EntityReference(id=db.id, type="database"),
database=EntityReference(id=database_object.id, type="database"),
)
yield schema
database_schema_entity = fqn.build(
self.metadata,
entity_type=DatabaseSchema,
service_name=self.database_service.name.__root__,
database_name=db.name.__root__,
schema_name=schema.name.__root__,
)
database_schema_object = self.metadata.get_by_name(
entity=DatabaseSchema, fqn=database_schema_entity
)
resp = self.metadata.list_entities(entity=User, limit=5)
self.user_entity = resp.entities
for table in self.tables["tables"]:
table_metadata = Table(**table)
table_and_db = OMetaDatabaseAndTable(
table=table_metadata, database=db, database_schema=schema
table_and_db = CreateTableRequest(
name=table["name"],
description=table["description"],
columns=table["columns"],
databaseSchema=EntityReference(
id=database_schema_object.id, type="databaseSchema"
),
tableType=table["tableType"],
tableConstraints=table.get("tableConstraints"),
tags=table["tags"],
)
self.status.scanned("table", table_metadata.name.__root__)
self.status.scanned("table", table_and_db.name)
yield table_and_db
def ingest_topics(self) -> Iterable[CreateTopicRequest]: