2023-10-04 16:23:15 +05:30

76 lines
2.1 KiB
Python

from datetime import datetime
from airflow import DAG
from airflow.providers.sqlite.operators.sqlite import SqliteOperator
CONN_ID = "my_sqlite"
COST_TABLE = "costs"
PROCESSED_TABLE = "processed_costs"
with DAG(
"sqlite_operator",
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False,
) as dag:
create_cost_table = SqliteOperator(
sqlite_conn_id=CONN_ID,
task_id="create_cost_table",
sql="""
CREATE TABLE IF NOT EXISTS {{ params.table_name }} (
id INTEGER PRIMARY KEY,
month TEXT NOT NULL,
total_cost REAL NOT NULL,
area REAL NOT NULL
)
""",
params={"table_name": COST_TABLE},
)
populate_cost_table = SqliteOperator(
sqlite_conn_id=CONN_ID,
task_id="populate_cost_table",
sql="""
INSERT INTO {{ params.table_name }} (id, month, total_cost, area)
VALUES
(1, '2021-01', 100, 10),
(2, '2021-02', 200, 20),
(3, '2021-03', 300, 30)
""",
params={"table_name": COST_TABLE},
)
transform_cost_table = SqliteOperator(
sqlite_conn_id=CONN_ID,
task_id="transform_cost_table",
sql="""
CREATE TABLE IF NOT EXISTS {{ params.out_table_name }} AS
SELECT
id,
month,
total_cost,
area,
total_cost / area as cost_per_area
FROM {{ params.in_table_name }}
""",
params={
"in_table_name": COST_TABLE,
"out_table_name": PROCESSED_TABLE,
},
)
cleanup_tables = []
for table_name in [COST_TABLE, PROCESSED_TABLE]:
cleanup_table = SqliteOperator(
sqlite_conn_id=CONN_ID,
task_id=f"cleanup_{table_name}",
sql="""
DROP TABLE {{ params.table_name }}
""",
params={"table_name": table_name},
)
cleanup_tables.append(cleanup_table)
create_cost_table >> populate_cost_table >> transform_cost_table >> cleanup_tables