mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-14 04:31:02 +00:00
76 lines
2.1 KiB
Python
76 lines
2.1 KiB
Python
from datetime import datetime
|
|
|
|
from airflow import DAG
|
|
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
|
|
|
|
CONN_ID = "my_sqlite"
|
|
|
|
COST_TABLE = "costs"
|
|
PROCESSED_TABLE = "processed_costs"
|
|
|
|
with DAG(
|
|
"sqlite_operator",
|
|
start_date=datetime(2023, 1, 1),
|
|
schedule_interval=None,
|
|
catchup=False,
|
|
) as dag:
|
|
create_cost_table = SqliteOperator(
|
|
sqlite_conn_id=CONN_ID,
|
|
task_id="create_cost_table",
|
|
sql="""
|
|
CREATE TABLE IF NOT EXISTS {{ params.table_name }} (
|
|
id INTEGER PRIMARY KEY,
|
|
month TEXT NOT NULL,
|
|
total_cost REAL NOT NULL,
|
|
area REAL NOT NULL
|
|
)
|
|
""",
|
|
params={"table_name": COST_TABLE},
|
|
)
|
|
|
|
populate_cost_table = SqliteOperator(
|
|
sqlite_conn_id=CONN_ID,
|
|
task_id="populate_cost_table",
|
|
sql="""
|
|
INSERT INTO {{ params.table_name }} (id, month, total_cost, area)
|
|
VALUES
|
|
(1, '2021-01', 100, 10),
|
|
(2, '2021-02', 200, 20),
|
|
(3, '2021-03', 300, 30)
|
|
""",
|
|
params={"table_name": COST_TABLE},
|
|
)
|
|
|
|
transform_cost_table = SqliteOperator(
|
|
sqlite_conn_id=CONN_ID,
|
|
task_id="transform_cost_table",
|
|
sql="""
|
|
CREATE TABLE IF NOT EXISTS {{ params.out_table_name }} AS
|
|
SELECT
|
|
id,
|
|
month,
|
|
total_cost,
|
|
area,
|
|
total_cost / area as cost_per_area
|
|
FROM {{ params.in_table_name }}
|
|
""",
|
|
params={
|
|
"in_table_name": COST_TABLE,
|
|
"out_table_name": PROCESSED_TABLE,
|
|
},
|
|
)
|
|
|
|
cleanup_tables = []
|
|
for table_name in [COST_TABLE, PROCESSED_TABLE]:
|
|
cleanup_table = SqliteOperator(
|
|
sqlite_conn_id=CONN_ID,
|
|
task_id=f"cleanup_{table_name}",
|
|
sql="""
|
|
DROP TABLE {{ params.table_name }}
|
|
""",
|
|
params={"table_name": table_name},
|
|
)
|
|
cleanup_tables.append(cleanup_table)
|
|
|
|
create_cost_table >> populate_cost_table >> transform_cost_table >> cleanup_tables
|