2023-10-04 06:53:15 -04:00
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
from airflow import DAG
|
|
|
|
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
|
|
|
|
|
|
|
|
SNOWFLAKE_COST_TABLE = "costs"
|
|
|
|
SNOWFLAKE_PROCESSED_TABLE = "processed_costs"
|
|
|
|
|
2024-06-28 00:33:58 -07:00
|
|
|
|
|
|
|
def _fake_snowflake_execute(*args, **kwargs):
|
|
|
|
raise ValueError("mocked snowflake execute to not run queries")
|
|
|
|
|
|
|
|
|
2023-10-04 06:53:15 -04:00
|
|
|
with DAG(
|
|
|
|
"snowflake_operator",
|
|
|
|
start_date=datetime(2023, 1, 1),
|
|
|
|
schedule_interval=None,
|
|
|
|
catchup=False,
|
|
|
|
) as dag:
|
2024-06-28 00:33:58 -07:00
|
|
|
# HACK: We don't want to send real requests to Snowflake. As a workaround,
|
|
|
|
# we can simply monkey-patch the operator.
|
|
|
|
SnowflakeOperator.execute = _fake_snowflake_execute # type: ignore
|
|
|
|
|
2023-10-04 06:53:15 -04:00
|
|
|
transform_cost_table = SnowflakeOperator(
|
|
|
|
snowflake_conn_id="my_snowflake",
|
|
|
|
task_id="transform_cost_table",
|
|
|
|
sql="""
|
|
|
|
CREATE OR REPLACE TABLE {{ params.out_table_name }} AS
|
|
|
|
SELECT
|
|
|
|
id,
|
|
|
|
month,
|
|
|
|
total_cost,
|
|
|
|
area,
|
|
|
|
total_cost / area as cost_per_area
|
|
|
|
FROM {{ params.in_table_name }}
|
|
|
|
""",
|
|
|
|
params={
|
|
|
|
"in_table_name": SNOWFLAKE_COST_TABLE,
|
|
|
|
"out_table_name": SNOWFLAKE_PROCESSED_TABLE,
|
|
|
|
},
|
|
|
|
)
|