mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-06 16:49:03 +00:00

Co-authored-by: ksrinath <ksrinath@users.noreply.github.com> Co-authored-by: Chakravarthy Racharla <chakru.racharla@acryl.io>
109 lines
3.2 KiB
Python
109 lines
3.2 KiB
Python
from datetime import datetime
|
|
|
|
import pyarrow as pa
|
|
import pyiceberg
|
|
from constants import namespace, table_name, warehouse
|
|
from pyiceberg.catalog import load_catalog
|
|
from pyiceberg.schema import Schema
|
|
from pyiceberg.types import LongType, NestedField, StringType, TimestampType
|
|
|
|
from datahub.ingestion.graph.client import get_default_graph
|
|
|
|
# Define a more comprehensive schema for ski resort data
|
|
schema = Schema(
|
|
NestedField(
|
|
field_id=1,
|
|
name="resort_id",
|
|
field_type=LongType(),
|
|
required=True,
|
|
doc="Unique identifier for each ski resort",
|
|
initial_default=None,
|
|
),
|
|
NestedField(
|
|
field_id=2,
|
|
name="resort_name",
|
|
field_type=StringType(),
|
|
required=True,
|
|
doc="Official name of the ski resort",
|
|
initial_default=None,
|
|
),
|
|
NestedField(
|
|
field_id=3,
|
|
name="daily_snowfall",
|
|
field_type=LongType(),
|
|
required=False,
|
|
doc="Amount of new snow in inches during the last 24 hours. Null if no measurement available",
|
|
initial_default=0,
|
|
),
|
|
NestedField(
|
|
field_id=4,
|
|
name="conditions",
|
|
field_type=StringType(),
|
|
required=False,
|
|
doc="Current snow conditions description (e.g., 'Powder', 'Packed Powder', 'Groomed'). Null if not reported",
|
|
initial_default=None,
|
|
),
|
|
NestedField(
|
|
field_id=5,
|
|
name="last_updated",
|
|
field_type=TimestampType(),
|
|
required=False,
|
|
doc="Timestamp of when the snow report was last updated",
|
|
initial_default=None,
|
|
),
|
|
)
|
|
|
|
# Load the catalog with new warehouse name
|
|
graph = get_default_graph()
|
|
catalog = load_catalog("local_datahub", warehouse=warehouse, token=graph.config.token)
|
|
|
|
# Create namespace (database)
|
|
try:
|
|
catalog.create_namespace(namespace)
|
|
except Exception as e:
|
|
print(f"Namespace creation error (might already exist): {e}")
|
|
|
|
full_table_name = f"{namespace}.{table_name}"
|
|
try:
|
|
catalog.create_table(full_table_name, schema)
|
|
except pyiceberg.exceptions.TableAlreadyExistsError:
|
|
print(f"Table {full_table_name} already exists")
|
|
|
|
# Create sample data with explicit PyArrow schema to match required fields
|
|
pa_schema = pa.schema(
|
|
[
|
|
("resort_id", pa.int64(), False), # False means not nullable
|
|
("resort_name", pa.string(), False), # False means not nullable
|
|
("daily_snowfall", pa.int64(), True),
|
|
("conditions", pa.string(), True),
|
|
("last_updated", pa.timestamp("us"), True),
|
|
]
|
|
)
|
|
# Create sample data
|
|
sample_data = pa.Table.from_pydict(
|
|
{
|
|
"resort_id": [1, 2, 3],
|
|
"resort_name": ["Snowpeak Resort", "Alpine Valley", "Glacier Heights"],
|
|
"daily_snowfall": [12, 8, 15],
|
|
"conditions": ["Powder", "Packed", "Fresh Powder"],
|
|
"last_updated": [
|
|
pa.scalar(datetime.now()),
|
|
pa.scalar(datetime.now()),
|
|
pa.scalar(datetime.now()),
|
|
],
|
|
},
|
|
schema=pa_schema,
|
|
)
|
|
|
|
# Write data to table
|
|
table = catalog.load_table(full_table_name)
|
|
table.overwrite(sample_data)
|
|
|
|
table.refresh()
|
|
# Read and verify data
|
|
con = table.scan().to_duckdb(table_name=f"{table_name}")
|
|
print("\nResort Metrics Data:")
|
|
print("-" * 50)
|
|
for row in con.execute(f"SELECT * FROM {table_name}").fetchall():
|
|
print(row)
|