feat(airflow): Circuit breaker and python api for Assertion and Operation (#5196)

This commit is contained in:
Tamas Nemeth 2022-07-13 19:17:38 +02:00 committed by GitHub
parent 9ec4fbae86
commit 4334248953
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 1779 additions and 0 deletions

View File

@ -0,0 +1,38 @@
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from datahub.api.graphql.operation import Operation
from datahub_provider.entities import Dataset
from datahub_provider.hooks.datahub import DatahubRestHook
dag = DAG(
dag_id="snowflake_load",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="0 0 * * *",
catchup=False,
)
# Operation push
# The number of rows is hardcoded in this example but this shouldn't in normal operation
def report_operation(context):
hook: DatahubRestHook = DatahubRestHook("datahub_longtail")
host, password, timeout_sec = hook._get_config()
reporter = Operation(datahub_host=host, datahub_token=password, timeout=timeout_sec)
task = context["ti"].task
for outlet in task._outlets:
print(f"Reporting insert operation for {outlet.urn}")
reporter.report_operation(
urn=outlet.urn, operation_type="INSERT", num_affected_rows=123
)
pet_profiles_load = BashOperator(
task_id="load_s3_adoption_pet_profiles",
dag=dag,
inlets=[Dataset("s3", "longtail-core-data/mongo/adoption/pet_profiles")],
outlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo Dummy Task",
on_success_callback=report_operation,
)

View File

@ -0,0 +1,36 @@
import datetime
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from datahub_provider.entities import Dataset
from datahub_provider.operators.datahub_operation_sensor import (
DataHubOperationCircuitBreakerSensor,
)
dag = DAG(
dag_id="marketing-send_emails",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="0 0 * * *",
catchup=False,
)
# New DataHub Operation Circuit Breaker Sensor
pet_profiles_operation_sensor = DataHubOperationCircuitBreakerSensor(
task_id="pet_profiles_operation_sensor",
datahub_rest_conn_id="datahub_longtail",
urn=[
"urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.pet_profiles,PROD)"
],
time_delta=datetime.timedelta(minutes=10),
)
send_email = BashOperator(
task_id="send_emails",
dag=dag,
inlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo Dummy Task",
)
pet_profiles_operation_sensor.set_downstream(send_email)

View File

@ -0,0 +1,42 @@
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from datahub.api.graphql.operation import Operation
from datahub_provider.entities import Dataset
from datahub_provider.hooks.datahub import DatahubRestHook
dag = DAG(
dag_id="snowflake_load",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="0 0 * * *",
catchup=False,
)
def report_operation(context):
hook: DatahubRestHook = DatahubRestHook("datahub_longtail")
host, password, timeout_sec = hook._get_config()
reporter = Operation(datahub_host=host, datahub_token=password, timeout=timeout_sec)
task = context["ti"].task
for inlet in task._outlets:
reporter.report_operation(urn=inlet.urn, operation_type="INSERT")
pet_profiles_load = BashOperator(
task_id="load_s3_adoption_pet_profiles",
dag=dag,
inlets=[Dataset("s3", "longtail-core-data/mongo/adoption/pet_profiles")],
outlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo Dummy Task",
on_success_callback=report_operation,
)
# Simple bash command as example to load great expectation tests
run_ge_tests = BashOperator(
task_id="pet_profiles_ge_tests_run",
inlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo /usr/local/airflow/.local/bin/great_expectations checkpoint run pet_profiles",
)
pet_profiles_load.set_downstream(run_ge_tests)

View File

@ -0,0 +1,52 @@
import datetime
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from datahub_provider.entities import Dataset
from datahub_provider.operators.datahub_assertion_operator import (
DataHubAssertionOperator,
)
from datahub_provider.operators.datahub_operation_sensor import (
DataHubOperationCircuitBreakerSensor,
)
dag = DAG(
dag_id="marketing-send_emails",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule_interval="0 0 * * *",
catchup=False,
)
items_operation_sensor = DataHubOperationCircuitBreakerSensor(
dag=dag,
task_id="pet_profiles_operation_sensor",
datahub_rest_conn_id="datahub_longtail",
urn=[
"urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.pet_profiles,PROD)"
],
time_delta=datetime.timedelta(days=1),
)
# Assertion circuit breaker to check if there are assertions for the urns specified.
# verify_after_last_update is enabled which means it will get from the latest operation the timeframe
# it accepts assertions.
assertion_circuit_breaker = DataHubAssertionOperator(
task_id="pet_profiles_assertion_circuit_breaker",
datahub_rest_conn_id="datahub_longtail",
urn=[
"urn:li:dataset:(urn:li:dataPlatform:snowflake,long_tail_companions.adoption.pet_profiles,PROD)"
],
check_last_assertion_time=True,
)
send_email = BashOperator(
task_id="send_emails",
dag=dag,
inlets=[Dataset("snowflake", "long_tail_companions.adoption.pet_profiles")],
bash_command="echo Dummy Task",
)
items_operation_sensor.set_downstream(assertion_circuit_breaker)
assertion_circuit_breaker.set_downstream(send_email)

View File

@ -192,6 +192,10 @@ plugins: Dict[str, Set[str]] = {
"airflow": {
"apache-airflow >= 1.10.2",
},
"circuit-breaker": {
"gql>=3.3.0",
"gql[requests]>=3.3.0",
},
"great-expectations": sql_common | {"sqllineage==1.3.5"},
# Source plugins
# PyAthena is pinned with exact version because we use private method in PyAthena
@ -422,6 +426,7 @@ full_test_dev_requirements = {
*list(
dependency
for plugin in [
"circuit-breaker",
"clickhouse",
"druid",
"feast-legacy",

View File

@ -0,0 +1,8 @@
from datahub.api.circuit_breaker.assertion_circuit_breaker import (
AssertionCircuitBreaker,
AssertionCircuitBreakerConfig,
)
from datahub.api.circuit_breaker.operation_circuit_breaker import (
OperationCircuitBreaker,
OperationCircuitBreakerConfig,
)

View File

@ -0,0 +1,137 @@
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from pydantic import Field
from datahub.api.circuit_breaker.circuit_breaker import (
AbstractCircuitBreaker,
CircuitBreakerConfig,
)
from datahub.api.graphql import Assertion, Operation
logger: logging.Logger = logging.getLogger(__name__)
class AssertionCircuitBreakerConfig(CircuitBreakerConfig):
verify_after_last_update: bool = Field(
default=True,
description="Whether to check if assertion happened after the dataset was last updated.",
)
time_delta: timedelta = Field(
default=(timedelta(days=1)),
description="In what timeframe should accept an assertion result if updated field is not available for the dataset",
)
class AssertionCircuitBreaker(AbstractCircuitBreaker):
r"""
DataHub Assertion Circuit Breaker
The circuit breaker checks if there are passing assertion on the Dataset.
"""
config: AssertionCircuitBreakerConfig
def __init__(self, config: AssertionCircuitBreakerConfig):
super().__init__(config.datahub_host, config.datahub_token, config.timeout)
self.config = config
self.assertion_api = Assertion(
datahub_host=config.datahub_host,
datahub_token=config.datahub_token,
timeout=config.timeout,
)
def get_last_updated(self, urn: str) -> Optional[datetime]:
operation_api: Operation = Operation(transport=self.assertion_api.transport)
operations = operation_api.query_operations(urn=urn)
if not operations:
return None
else:
return datetime.fromtimestamp(operations[0]["lastUpdatedTimestamp"] / 1000)
def _check_if_assertion_failed(
self, assertions: List[Dict[str, Any]], last_updated: Optional[datetime] = None
) -> bool:
@dataclass
class AssertionResult:
time: int
state: str
run_event: Any
# If last_updated is set we expect to have at least one successfull assertion
if not assertions and last_updated:
return True
result: bool = True
assertion_last_states: Dict[str, AssertionResult] = {}
for assertion in assertions:
if "runEvents" in assertion and "runEvents" in assertion["runEvents"]:
for run_event in assertion["runEvents"]["runEvents"]:
assertion_time = run_event["timestampMillis"]
assertion_state = run_event["result"]["type"]
assertion_urn = run_event["assertionUrn"]
if (
assertion_urn not in assertion_last_states
or assertion_last_states[assertion_urn].time < assertion_time
):
assertion_last_states[assertion_urn] = AssertionResult(
time=assertion_time,
state=assertion_state,
run_event=run_event,
)
for assertion_urn, last_assertion in assertion_last_states.items():
if last_assertion.state == "FAILURE":
logger.debug(f"Runevent: {last_assertion.run_event}")
logger.info(
f"Assertion {assertion_urn} is failed on dataset. Breaking the circuit"
)
return True
elif last_assertion.state == "SUCCESS":
logger.info(f"Found successful assertion: {assertion_urn}")
result = False
if last_updated is not None:
last_run = datetime.fromtimestamp(last_assertion.time / 1000)
if last_updated > last_run:
logger.error(
f"Missing assertion run for {assertion_urn}. The dataset was updated on {last_updated} but the last assertion run was at {last_run}"
)
return True
return result
def is_circuit_breaker_active(self, urn: str) -> bool:
r"""
Checks if the circuit breaker is active
:param urn: The DataHub dataset unique identifier.
"""
last_updated: Optional[datetime] = None
if self.config.verify_after_last_update:
last_updated = self.get_last_updated(urn)
logger.info(
f"The dataset {urn} was last updated at {last_updated}, using this as min assertion date."
)
if not last_updated:
last_updated = datetime.now() - self.config.time_delta
logger.info(
f"Dataset {urn} doesn't have last updated or check_last_assertion_time is false, using calculated min assertion date {last_updated}"
)
assertions = self.assertion_api.query_assertion(
urn,
start_time_millis=int(last_updated.timestamp() * 1000),
status="COMPLETE",
)
if self._check_if_assertion_failed(
assertions,
last_updated if self.config.verify_after_last_update is True else None,
):
logger.info(f"Dataset {urn} has failed or missing assertion(s).")
return True
return False

View File

@ -0,0 +1,50 @@
import logging
from abc import abstractmethod
from typing import Optional
from gql import Client
from gql.transport.requests import RequestsHTTPTransport
from pydantic import Field
from datahub.configuration import ConfigModel
logger = logging.getLogger(__name__)
class CircuitBreakerConfig(ConfigModel):
datahub_host: str = Field(description="Url of the DataHub instance")
datahub_token: Optional[str] = Field(default=None, description="The datahub token")
timeout: Optional[int] = Field(
default=None,
description="The number of seconds to wait for your client to establish a connection to a remote machine",
)
class AbstractCircuitBreaker:
client: Client
def __init__(
self,
datahub_host: str,
datahub_token: Optional[str] = None,
timeout: Optional[int] = None,
):
# logging.basicConfig(level=logging.DEBUG)
# Select your transport with a defined url endpoint
self.transport = RequestsHTTPTransport(
url=datahub_host + "/api/graphql",
headers={"Authorization": "Bearer " + datahub_token}
if datahub_token is not None
else None,
method="POST",
timeout=timeout,
)
self.client = Client(
transport=self.transport,
fetch_schema_from_transport=True,
)
@abstractmethod
def is_circuit_breaker_active(self, urn: str) -> bool:
pass

View File

@ -0,0 +1,81 @@
import logging
from datetime import datetime, timedelta
from typing import Optional
from pydantic import Field
from datahub.api.circuit_breaker.circuit_breaker import (
AbstractCircuitBreaker,
CircuitBreakerConfig,
)
from datahub.api.graphql import Operation
logger: logging.Logger = logging.getLogger(__name__)
class OperationCircuitBreakerConfig(CircuitBreakerConfig):
time_delta: timedelta = Field(
default=(timedelta(days=1)),
description="In what timeframe should accept an operation result if updated field is not available for the dataset",
)
class OperationCircuitBreaker(AbstractCircuitBreaker):
r"""
DataHub Operation Circuit Breaker
The circuit breaker checks if there is an operation metadata for the dataset.
If there is no valid Operation metadata then the circuit breaker fails.
"""
config: OperationCircuitBreakerConfig
operation_api: Operation
def __init__(self, config: OperationCircuitBreakerConfig):
super().__init__(config.datahub_host, config.datahub_token, config.timeout)
self.config = config
self.operation_api = Operation(
datahub_host=config.datahub_host,
datahub_token=config.datahub_token,
timeout=config.timeout,
)
def is_circuit_breaker_active(
self,
urn: str,
partition: Optional[str] = None,
source_type: Optional[str] = None,
operation_type: Optional[str] = None,
) -> bool:
r"""
Checks if the circuit breaker is active
:param urn: The Datahub dataset unique identifier.
:param datahub_rest_conn_id: The REST DataHub connection id to communicate with DataHub
which is set as Airflow connection.
:param partition: The partition to check the operation.
:param source_type: The source type to filter on. If not set it will accept any source type.
See valid types here: https://datahubproject.io/docs/graphql/enums#operationsourcetype
:param operation_type: The operation type to filter on. If not set it will accept any source type.
See valid types here: https://datahubproject.io/docs/graphql/enums/#operationtype
"""
start_time_millis: int = int(
(datetime.now() - self.config.time_delta).timestamp() * 1000
)
operations = self.operation_api.query_operations(
urn,
start_time_millis=start_time_millis,
partition=partition,
source_type=source_type,
operation_type=operation_type,
)
logger.info(f"Operations: {operations}")
for operation in operations:
if (
operation.get("lastUpdatedTimestamp")
and operation["lastUpdatedTimestamp"] >= start_time_millis
):
return False
return True

View File

@ -0,0 +1,2 @@
from datahub.api.graphql.assertion import Assertion
from datahub.api.graphql.operation import Operation

View File

@ -0,0 +1,91 @@
import logging
from typing import Any, Dict, List, Optional
from gql import gql
from datahub.api.graphql.base import BaseApi
logger = logging.getLogger(__name__)
class Assertion(BaseApi):
ASSERTION_QUERY = """
query dataset($urn: String!, $start: Int, $count: Int, $status: AssertionRunStatus,$limit: Int, $startTimeMillis:Long, $endTimeMillis:Long, $filter:FilterInput) {
dataset(urn: $urn) {
assertions(start: $start, count: $count){
__typename
total
assertions{
__typename
runEvents(status: $status, limit: $limit, startTimeMillis: $startTimeMillis, endTimeMillis: $endTimeMillis, filter: $filter) {
total
failed
succeeded
runEvents {
__typename
timestampMillis
partitionSpec {
__typename
type
partition
timePartition {
startTimeMillis
durationMillis
}
}
result {
__typename
type
rowCount
missingCount
unexpectedCount
actualAggValue
externalUrl
}
assertionUrn
}
}
}
}
}
}
"""
def query_assertion(
self,
urn: str,
status: Optional[str] = None,
start_time_millis: Optional[int] = None,
end_time_millis: Optional[int] = None,
limit: Optional[int] = None,
filter: Optional[Dict[str, Optional[str]]] = None,
) -> List[Dict[Any, Any]]:
r"""
Query assertions for a dataset.
:param urn: The DataHub dataset unique identifier.
:param status: The assertion status to filter for. Every status will be accepted if it is not set.
See valid status at https://datahubproject.io/docs/graphql/enums#assertionrunstatus
:param start_time_millis: The start time in milliseconds from the assertions will be queried.
:param end_time_millis: The end time in milliseconds until the assertions will be queried.
:param filter: Additional key value filters which will be applied as AND query
"""
result = self.client.execute(
gql(Assertion.ASSERTION_QUERY),
variable_values={
"urn": urn,
"filter": self.gen_filter(filter) if filter else None,
"limit": limit,
"status": status,
"startTimeMillis": start_time_millis,
"endTimeMillis": end_time_millis,
},
)
assertions = []
if "dataset" in result and "assertions" in result["dataset"]:
assertions = result["dataset"]["assertions"]["assertions"]
return assertions

View File

@ -0,0 +1,52 @@
from typing import Dict, List, Optional
from gql import Client
from gql.transport.requests import RequestsHTTPTransport
class BaseApi:
client: Client
def __init__(
self,
datahub_host: Optional[str] = None,
datahub_token: Optional[str] = None,
timeout: Optional[int] = None,
transport: Optional[RequestsHTTPTransport] = None,
):
# logging.basicConfig(level=logging.DEBUG)
if transport:
self.transport = transport
else:
assert datahub_host is not None
# Select your transport with a defined url endpoint
self.transport = RequestsHTTPTransport(
url=datahub_host + "/api/graphql",
headers={"Authorization": "Bearer " + datahub_token}
if datahub_token is not None
else None,
method="POST",
timeout=timeout,
)
self.client = Client(
transport=self.transport,
fetch_schema_from_transport=True,
)
def gen_filter(
self, filters: Dict[str, Optional[str]]
) -> Optional[Dict[str, List[Dict[str, str]]]]:
filter_expression: Optional[Dict[str, List[Dict[str, str]]]] = None
if not filters:
return None
filter = []
for key, value in filters.items():
if value is None:
continue
filter.append({"field": key, "value": value})
filter_expression = {"and": filter}
return filter_expression

View File

@ -0,0 +1,140 @@
import logging
from typing import Any, Dict, List, Optional
from gql import gql
from datahub.api.graphql.base import BaseApi
logger = logging.getLogger(__name__)
class Operation(BaseApi):
REPORT_OPERATION_MUTATION: str = """
mutation reportOperation($urn: String!, $sourceType: OperationSourceType!, $operationType: OperationType!, $partition: String, $numAffectedRows: Long, $customProperties: [StringMapEntryInput!]) {
reportOperation(input: {
urn: $urn
sourceType: $sourceType
operationType: $operationType
partition: $partition
numAffectedRows: $numAffectedRows
customProperties: $customProperties
})
}"""
QUERY_OPERATIONS: str = """
query dataset($urn: String!, $startTimeMillis: Long, $endTimeMillis: Long, $limit: Int, $filter:FilterInput) {
dataset(urn: $urn) {
urn
operations (startTimeMillis: $startTimeMillis, endTimeMillis: $endTimeMillis, limit: $limit, filter: $filter) {
__typename
actor
operationType
sourceType
numAffectedRows
partition
timestampMillis
lastUpdatedTimestamp
customProperties {
key
value
}
}
}
}"""
def report_operation(
self,
urn: str,
source_type: str = "DATA_PROCESS",
operation_type: str = "UPDATE",
partition: Optional[str] = None,
num_affected_rows: int = 0,
custom_properties: Optional[Dict[str, str]] = None,
) -> str:
r"""
Report operation metadata for a dataset.
:param source_type: The source type to filter on. If not set it will accept any source type.
Default value: DATA_PROCESS
See valid types here: https://datahubproject.io/docs/graphql/enums#operationsourcetype
:param operation_type: The operation type to filter on. If not set it will accept any source type.
Default value: "UPDATE"
See valid types here: https://datahubproject.io/docs/graphql/enums/#operationtype
:param partition: The partition to set the operation.
:param num_affected_rows: The number of rows affected by this operation.
:param custom_properties: Key/value pair of custom propertis
"""
variable_values = {
"urn": urn,
"sourceType": source_type,
"operationType": operation_type,
"numAffectedRows": num_affected_rows,
}
if partition is not None:
variable_values["partition"] = partition
if num_affected_rows is not None:
variable_values["numAffectedRows"] = num_affected_rows
if custom_properties is not None:
variable_values["customProperties"] = custom_properties
result = self.client.execute(
gql(Operation.REPORT_OPERATION_MUTATION), variable_values
)
return result["reportOperation"]
def query_operations(
self,
urn: str,
start_time_millis: Optional[int] = None,
end_time_millis: Optional[int] = None,
limit: Optional[int] = None,
source_type: Optional[str] = None,
operation_type: Optional[str] = None,
partition: Optional[str] = None,
) -> List[Dict[Any, Any]]:
r"""
Query operations for a dataset.
:param urn: The DataHub dataset unique identifier.
:param start_time_millis: The start time in milliseconds from the operations will be queried.
:param end_time_millis: The end time in milliseconds until the operations will be queried.
:param limit: The maximum number of items to return.
:param source_type: The source type to filter on. If not set it will accept any source type.
See valid types here: https://datahubproject.io/docs/graphql/enums#operationsourcetype
:param operation_type: The operation type to filter on. If not set it will accept any source type.
See valid types here: https://datahubproject.io/docs/graphql/enums#operationsourcetype
:param partition: The partition to check the operation.
"""
result = self.client.execute(
gql(Operation.QUERY_OPERATIONS),
variable_values={
"urn": urn,
"startTimeMillis": start_time_millis,
"end_time_millis": end_time_millis,
"limit": limit,
"filter": self.gen_filter(
{
"sourceType": source_type,
"operationType": operation_type,
"partition": partition,
}
if filter
else None
),
},
)
if "dataset" in result and "operations" in result["dataset"]:
operations = []
if source_type is not None:
for operation in result["dataset"]["operations"]:
if operation["sourceType"] == source_type:
operations.append(operation)
else:
operations = result["dataset"]["operations"]
return operations
return []

View File

@ -0,0 +1,78 @@
import datetime
from typing import Any, List, Optional, Sequence, Union
from airflow.models import BaseOperator
from datahub.api.circuit_breaker import (
AssertionCircuitBreaker,
AssertionCircuitBreakerConfig,
)
from datahub_provider.hooks.datahub import DatahubRestHook
class DataHubAssertionOperator(BaseOperator):
r"""
DataHub Assertion Circuit Breaker Operator.
:param urn: The DataHub dataset unique identifier. (templated)
:param datahub_rest_conn_id: The REST datahub connection id to communicate with DataHub
which is set as Airflow connection.
:param check_last_assertion_time: If set it checks assertions after the last operation was set on the dataset.
By default it is True.
:param time_delta: If verify_after_last_update is False it checks for assertion within the time delta.
"""
template_fields: Sequence[str] = ("urn",)
circuit_breaker: AssertionCircuitBreaker
urn: Union[List[str], str]
def __init__( # type: ignore[no-untyped-def]
self,
*,
urn: Union[List[str], str],
datahub_rest_conn_id: Optional[str] = None,
check_last_assertion_time: bool = True,
time_delta: Optional[datetime.timedelta] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
hook: DatahubRestHook
if datahub_rest_conn_id is not None:
hook = DatahubRestHook(datahub_rest_conn_id=datahub_rest_conn_id)
else:
hook = DatahubRestHook()
host, password, timeout_sec = hook._get_config()
self.urn = urn
config: AssertionCircuitBreakerConfig = AssertionCircuitBreakerConfig(
datahub_host=host,
datahub_token=password,
timeout=timeout_sec,
verify_after_last_update=check_last_assertion_time,
time_delta=time_delta if time_delta else datetime.timedelta(days=1),
)
self.circuit_breaker = AssertionCircuitBreaker(config=config)
def execute(self, context: Any) -> bool:
if "datahub_silence_circuit_breakers" in context["dag_run"].conf:
self.log.info(
"Circuit breaker is silenced because datahub_silence_circuit_breakers config is set"
)
return True
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
if type(self.urn) == str:
urns = [self.urn]
elif type(self.urn) == list:
urns = self.urn
else:
raise Exception(f"urn parameter has invalid type {type(self.urn)}")
for urn in urns:
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
ret = self.circuit_breaker.is_circuit_breaker_active(urn=urn)
if ret:
raise Exception(f"Dataset {self.urn} is not in consumable state")
return True

View File

@ -0,0 +1,78 @@
import datetime
from typing import Any, List, Optional, Sequence, Union
from airflow.sensors.base import BaseSensorOperator
from datahub.api.circuit_breaker import (
AssertionCircuitBreaker,
AssertionCircuitBreakerConfig,
)
from datahub_provider.hooks.datahub import DatahubRestHook
class DataHubAssertionSensor(BaseSensorOperator):
r"""
DataHub Assertion Circuit Breaker Sensor.
:param urn: The DataHub dataset unique identifier. (templated)
:param datahub_rest_conn_id: The REST datahub connection id to communicate with DataHub
which is set as Airflow connection.
:param check_last_assertion_time: If set it checks assertions after the last operation was set on the dataset.
By default it is True.
:param time_delta: If verify_after_last_update is False it checks for assertion within the time delta.
"""
template_fields: Sequence[str] = ("urn",)
circuit_breaker: AssertionCircuitBreaker
urn: Union[List[str], str]
def __init__( # type: ignore[no-untyped-def]
self,
*,
urn: Union[List[str], str],
datahub_rest_conn_id: Optional[str] = None,
check_last_assertion_time: bool = True,
time_delta: datetime.timedelta = datetime.timedelta(days=1),
**kwargs,
) -> None:
super().__init__(**kwargs)
hook: DatahubRestHook
if datahub_rest_conn_id is not None:
hook = DatahubRestHook(datahub_rest_conn_id=datahub_rest_conn_id)
else:
hook = DatahubRestHook()
host, password, timeout_sec = hook._get_config()
self.urn = urn
config: AssertionCircuitBreakerConfig = AssertionCircuitBreakerConfig(
datahub_host=host,
datahub_token=password,
timeout=timeout_sec,
verify_after_last_update=check_last_assertion_time,
time_delta=time_delta,
)
self.circuit_breaker = AssertionCircuitBreaker(config=config)
def poke(self, context: Any) -> bool:
if "datahub_silence_circuit_breakers" in context["dag_run"].conf:
self.log.info(
"Circuit breaker is silenced because datahub_silence_circuit_breakers config is set"
)
return True
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
if type(self.urn) == str:
urns = [self.urn]
elif type(self.urn) == list:
urns = self.urn
else:
raise Exception(f"urn parameter has invalid type {type(self.urn)}")
for urn in urns:
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
ret = self.circuit_breaker.is_circuit_breaker_active(urn=urn)
if ret:
self.log.info(f"Dataset {self.urn} is not in consumable state")
return False
return True

View File

@ -0,0 +1,98 @@
import datetime
from typing import Any, List, Optional, Sequence, Union
from airflow.sensors.base import BaseSensorOperator
from datahub.api.circuit_breaker import (
OperationCircuitBreaker,
OperationCircuitBreakerConfig,
)
from datahub_provider.hooks.datahub import DatahubRestHook
class DataHubOperationCircuitBreakerOperator(BaseSensorOperator):
r"""
DataHub Operation Circuit Breaker Operator.
:param urn: The DataHub dataset unique identifier. (templated)
:param datahub_rest_conn_id: The REST datahub connection id to communicate with DataHub
which is set as Airflow connection.
:param partition: The partition to check the operation.
:param source_type: The partition to check the operation. :ref:`https://datahubproject.io/docs/graphql/enums#operationsourcetype`
"""
template_fields: Sequence[str] = (
"urn",
"partition",
"source_type",
"operation_type",
)
circuit_breaker: OperationCircuitBreaker
urn: Union[List[str], str]
partition: Optional[str]
source_type: Optional[str]
operation_type: Optional[str]
def __init__( # type: ignore[no-untyped-def]
self,
*,
urn: Union[List[str], str],
datahub_rest_conn_id: Optional[str] = None,
time_delta: Optional[datetime.timedelta] = datetime.timedelta(days=1),
partition: Optional[str] = None,
source_type: Optional[str] = None,
operation_type: Optional[str] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
hook: DatahubRestHook
if datahub_rest_conn_id is not None:
hook = DatahubRestHook(datahub_rest_conn_id=datahub_rest_conn_id)
else:
hook = DatahubRestHook()
host, password, timeout_sec = hook._get_config()
self.urn = urn
self.partition = partition
self.operation_type = operation_type
self.source_type = source_type
config: OperationCircuitBreakerConfig = OperationCircuitBreakerConfig(
datahub_host=host,
datahub_token=password,
timeout=timeout_sec,
time_delta=time_delta,
)
self.circuit_breaker = OperationCircuitBreaker(config=config)
def execute(self, context: Any) -> bool:
if "datahub_silence_circuit_breakers" in context["dag_run"].conf:
self.log.info(
"Circuit breaker is silenced because datahub_silence_circuit_breakers config is set"
)
return True
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
if type(self.urn) == str:
urns = [self.urn]
elif type(self.urn) == list:
urns = self.urn
else:
raise Exception(f"urn parameter has invalid type {type(self.urn)}")
partition: Optional[str]
for urn in urns:
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
ret = self.circuit_breaker.is_circuit_breaker_active(
urn=urn,
partition=self.partition,
operation_type=self.operation_type,
source_type=self.source_type,
)
if ret:
raise Exception(f"Dataset {self.urn} is not in consumable state")
return True

View File

@ -0,0 +1,100 @@
import datetime
from typing import Any, List, Optional, Sequence, Union
from airflow.sensors.base import BaseSensorOperator
from datahub.api.circuit_breaker import (
OperationCircuitBreaker,
OperationCircuitBreakerConfig,
)
from datahub_provider.hooks.datahub import DatahubRestHook
class DataHubOperationCircuitBreakerSensor(BaseSensorOperator):
r"""
DataHub Operation Circuit Breaker Sensor.
:param urn: The DataHub dataset unique identifier. (templated)
:param datahub_rest_conn_id: The REST datahub connection id to communicate with DataHub
which is set as Airflow connection.
:param partition: The partition to check the operation.
:param source_type: The source type to filter on. If not set it will accept any source type.
See valid values at: https://datahubproject.io/docs/graphql/enums#operationsourcetype
:param operation_type: The operation type to filter on. If not set it will accept any source type.
See valid values at: https://datahubproject.io/docs/graphql/enums/#operationtype
"""
template_fields: Sequence[str] = (
"urn",
"partition",
"source_type",
"operation_type",
)
circuit_breaker: OperationCircuitBreaker
urn: Union[List[str], str]
partition: Optional[str]
source_type: Optional[str]
operation_type: Optional[str]
def __init__( # type: ignore[no-untyped-def]
self,
*,
urn: Union[List[str], str],
datahub_rest_conn_id: Optional[str] = None,
time_delta: Optional[datetime.timedelta] = datetime.timedelta(days=1),
partition: Optional[str] = None,
source_type: Optional[str] = None,
operation_type: Optional[str] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
hook: DatahubRestHook
if datahub_rest_conn_id is not None:
hook = DatahubRestHook(datahub_rest_conn_id=datahub_rest_conn_id)
else:
hook = DatahubRestHook()
host, password, timeout_sec = hook._get_config()
self.urn = urn
self.partition = partition
self.operation_type = operation_type
self.source_type = source_type
config: OperationCircuitBreakerConfig = OperationCircuitBreakerConfig(
datahub_host=host,
datahub_token=password,
timeout=timeout_sec,
time_delta=time_delta,
)
self.circuit_breaker = OperationCircuitBreaker(config=config)
def poke(self, context: Any) -> bool:
if "datahub_silence_circuit_breakers" in context["dag_run"].conf:
self.log.info(
"Circuit breaker is silenced because datahub_silence_circuit_breakers config is set"
)
return True
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
if type(self.urn) == str:
urns = [self.urn]
elif type(self.urn) == list:
urns = self.urn
else:
raise Exception(f"urn parameter has invalid type {type(self.urn)}")
for urn in urns:
self.log.info(f"Checking if dataset {self.urn} is ready to be consumed")
ret = self.circuit_breaker.is_circuit_breaker_active(
urn=urn,
partition=self.partition,
operation_type=self.operation_type,
source_type=self.source_type,
)
if ret:
self.log.info(f"Dataset {self.urn} is not in consumable state")
return False
return True

View File

@ -0,0 +1,9 @@
{
"dataset": {
"assertions": {
"__typename": "EntityAssertionsResult",
"total": 0,
"assertions": []
}
}
}

View File

@ -0,0 +1,244 @@
{
"dataset": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres1.postgres.public.foo1,PROD)",
"operations": [],
"incidents": {
"incidents": []
},
"assertions": {
"total": 5,
"assertions": [
{
"__typename": "Assertion",
"runEvents": {
"total": 2,
"failed": 0,
"succeeded": 2,
"runEvents": [
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catA\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:3d1699164901675df774ab34fd16f4f3"
},
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catB\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:3d1699164901675df774ab34fd16f4f3"
}
]
}
},
{
"__typename": "Assertion",
"runEvents": {
"total": 2,
"failed": 2,
"succeeded": 0,
"runEvents": [
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catA\"}",
"timePartition": null
},
"result": {
"type": "FAILURE",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:358c683782c93c2fc2bd4bdd4fdb0153"
},
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catB\"}",
"timePartition": null
},
"result": {
"type": "FAILURE",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:358c683782c93c2fc2bd4bdd4fdb0153"
}
]
}
},
{
"__typename": "Assertion",
"runEvents": {
"total": 2,
"failed": 0,
"succeeded": 2,
"runEvents": [
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catA\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": 3,
"missingCount": null,
"unexpectedCount": 0,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:9729dfafea4bb2c2f114bc80e513a7ec"
},
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catB\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": 2,
"missingCount": null,
"unexpectedCount": 0,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:9729dfafea4bb2c2f114bc80e513a7ec"
}
]
}
},
{
"__typename": "Assertion",
"runEvents": {
"total": 2,
"failed": 0,
"succeeded": 2,
"runEvents": [
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catA\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": 1,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:16d6f586b2febda7f2b53faec6bb9035"
},
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catB\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": 3,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:16d6f586b2febda7f2b53faec6bb9035"
}
]
}
},
{
"__typename": "Assertion",
"runEvents": {
"total": 2,
"failed": 0,
"succeeded": 2,
"runEvents": [
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catA\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": 3,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:4cf76385ccf614cc6cbb9daa551c3c74"
},
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catB\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": 2,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:4cf76385ccf614cc6cbb9daa551c3c74"
}
]
}
}
]
}
}
}

View File

@ -0,0 +1,198 @@
{
"dataset": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres1.postgres.public.foo1,PROD)",
"operations": [],
"incidents": {
"incidents": []
},
"assertions": {
"total": 4,
"assertions": [
{
"__typename": "Assertion",
"runEvents": {
"total": 2,
"failed": 0,
"succeeded": 2,
"runEvents": [
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catA\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:3d1699164901675df774ab34fd16f4f3"
},
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catB\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:3d1699164901675df774ab34fd16f4f3"
}
]
}
},
{
"__typename": "Assertion",
"runEvents": {
"total": 2,
"failed": 0,
"succeeded": 2,
"runEvents": [
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catA\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": 3,
"missingCount": null,
"unexpectedCount": 0,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:9729dfafea4bb2c2f114bc80e513a7ec"
},
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catB\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": 2,
"missingCount": null,
"unexpectedCount": 0,
"actualAggValue": null,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:9729dfafea4bb2c2f114bc80e513a7ec"
}
]
}
},
{
"__typename": "Assertion",
"runEvents": {
"total": 2,
"failed": 0,
"succeeded": 2,
"runEvents": [
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catA\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": 1,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:16d6f586b2febda7f2b53faec6bb9035"
},
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catB\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": 3,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:16d6f586b2febda7f2b53faec6bb9035"
}
]
}
},
{
"__typename": "Assertion",
"runEvents": {
"total": 2,
"failed": 0,
"succeeded": 2,
"runEvents": [
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catA\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": 3,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:4cf76385ccf614cc6cbb9daa551c3c74"
},
{
"__typename": "AssertionRunEvent",
"timestampMillis": 1640692800000,
"partitionSpec": {
"type": "PARTITION",
"partition": "{\"category\": \"catB\"}",
"timePartition": null
},
"result": {
"type": "SUCCESS",
"rowCount": null,
"missingCount": null,
"unexpectedCount": null,
"actualAggValue": 2,
"externalUrl": null
},
"assertionUrn": "urn:li:assertion:4cf76385ccf614cc6cbb9daa551c3c74"
}
]
}
}
]
}
}
}

View File

@ -0,0 +1,6 @@
{
"dataset": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
"operations": []
}
}

View File

@ -0,0 +1,79 @@
{
"dataset": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.jaffle_shop.customers,PROD)",
"operations": [
{
"__typename": "Operation",
"actor": "urn:li:corpuser:bq-usage",
"operationType": "CUSTOM",
"sourceType": null,
"numAffectedRows": 1,
"partition": "FULL_TABLE_SNAPSHOT",
"timestampMillis": 1655769674092,
"lastUpdatedTimestamp": 1655700504432,
"customProperties": [
{
"key": "millisecondsTaken",
"value": "222"
},
{
"key": "text",
"value": "/* {\"app\": \"dbt\", \"dbt_version\": \"1.1.0\", \"profile_name\": \"jaffle_shop\", \"target_name\": \"dev\", \"node_id\": \"test.jaffle_shop.unique_customers_customer_id.c5af1ff4b1\"} */\nselect\n count(*) as failures,\n count(*) != 0 as should_warn,\n count(*) != 0 as should_error\n from (\n \n \n \n\nwith dbt_test__target as (\n\n select customer_id as unique_field\n from `my_project`.`jaffle_shop`.`customers`\n where customer_id is not null\n\n)\n\nselect\n unique_field,\n count(*) as n_records\n\nfrom dbt_test__target\ngroup by unique_field\nhaving count(*) > 1\n\n\n\n \n ) dbt_internal_test"
},
{
"key": "sessionId",
"value": "projects/my_project/jobs/b68487dc-61db-4f01-abd7-c5f7d931a46c"
},
{
"key": "fieldsRead",
"value": "customer_id"
},
{
"key": "readReason",
"value": "JOB"
},
{
"key": "bytesProcessed",
"value": "10485760"
}
]
},
{
"__typename": "Operation",
"actor": "urn:li:corpuser:bq-usage",
"operationType": "CUSTOM",
"sourceType": null,
"numAffectedRows": 1,
"partition": "FULL_TABLE_SNAPSHOT",
"timestampMillis": 1655769674090,
"lastUpdatedTimestamp": 1655700503898,
"customProperties": [
{
"key": "millisecondsTaken",
"value": "234"
},
{
"key": "text",
"value": "/* {\"app\": \"dbt\", \"dbt_version\": \"1.1.0\", \"profile_name\": \"jaffle_shop\", \"target_name\": \"dev\", \"node_id\": \"test.jaffle_shop.relationships_orders_customer_id__customer_id__ref_customers_.c6ec7f58f2\"} */\nselect\n count(*) as failures,\n count(*) != 0 as should_warn,\n count(*) != 0 as should_error\n from (\n \n \n \n\nwith child as (\n select customer_id as from_field\n from `my_project`.`jaffle_shop`.`orders`\n where customer_id is not null\n),\n\nparent as (\n select customer_id as to_field\n from `my_project`.`jaffle_shop`.`customers`\n)\n\nselect\n from_field\n\nfrom child\nleft join parent\n on child.from_field = parent.to_field\n\nwhere parent.to_field is null\n\n\n\n \n ) dbt_internal_test"
},
{
"key": "sessionId",
"value": "projects/my_project/jobs/4b6ae0b9-b7d3-43d4-aaae-1baf91be3553"
},
{
"key": "fieldsRead",
"value": "customer_id"
},
{
"key": "readReason",
"value": "JOB"
},
{
"key": "bytesProcessed",
"value": "20971520"
}
]
}
]
}
}

View File

@ -0,0 +1,155 @@
import json
from unittest.mock import patch
import pytest
from freezegun import freeze_time
try:
from datahub.api.circuit_breaker import (
AssertionCircuitBreaker,
AssertionCircuitBreakerConfig,
OperationCircuitBreaker,
OperationCircuitBreakerConfig,
)
# Imports are only available if we are running integrations tests
except ImportError:
pass
lastUpdatedResponseBeforeLastAssertion = {
"dataset": {"operations": [{"lastUpdatedTimestamp": 1640685600000}]}
}
lastUpdatedResponseAfterLastAssertion = {
"dataset": {"operations": [{"lastUpdatedTimestamp": 1652450039000}]}
}
@pytest.mark.integration
def test_operation_circuit_breaker_with_empty_response(pytestconfig):
with patch("gql.client.Client.execute") as mock_gql_client:
test_resources_dir = pytestconfig.rootpath / "tests/integration/circuit_breaker"
f = open(
f"{test_resources_dir}/operation_gql_empty_response.json",
)
data = json.load(f)
mock_gql_client.side_effect = [data]
config = OperationCircuitBreakerConfig(datahub_host="dummy")
cb = OperationCircuitBreaker(config)
result = cb.is_circuit_breaker_active(
urn="urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD))"
)
assert result is True
@freeze_time("2022-06-20 05:00:00")
@pytest.mark.integration
def test_operation_circuit_breaker_with_valid_response(pytestconfig):
with patch("gql.client.Client.execute") as mock_gql_client:
test_resources_dir = pytestconfig.rootpath / "tests/integration/circuit_breaker"
f = open(
f"{test_resources_dir}/operation_gql_response.json",
)
data = json.load(f)
mock_gql_client.side_effect = [data]
config = OperationCircuitBreakerConfig(datahub_host="dummy")
cb = OperationCircuitBreaker(config)
result = cb.is_circuit_breaker_active(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.jaffle_shop.customers,PROD)"
)
assert result is False
@freeze_time("2022-06-21 07:00:00")
@pytest.mark.integration
def test_operation_circuit_breaker_with_not_recent_operation(pytestconfig):
with patch("gql.client.Client.execute") as mock_gql_client:
test_resources_dir = pytestconfig.rootpath / "tests/integration/circuit_breaker"
f = open(
f"{test_resources_dir}/operation_gql_response.json",
)
data = json.load(f)
mock_gql_client.side_effect = [data]
config = OperationCircuitBreakerConfig(datahub_host="dummy")
cb = OperationCircuitBreaker(config)
result = cb.is_circuit_breaker_active(
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,my_project.jaffle_shop.customers,PROD)"
)
assert result is True
@pytest.mark.integration
def test_assertion_circuit_breaker_with_empty_response(pytestconfig):
with patch("gql.client.Client.execute") as mock_gql_client:
test_resources_dir = pytestconfig.rootpath / "tests/integration/circuit_breaker"
f = open(
f"{test_resources_dir}/assertion_gql_empty_response.json",
)
data = json.load(f)
mock_gql_client.side_effect = [lastUpdatedResponseBeforeLastAssertion, data]
config = AssertionCircuitBreakerConfig(datahub_host="dummy")
cb = AssertionCircuitBreaker(config)
result = cb.is_circuit_breaker_active(
urn="urn:li:dataset:(urn:li:dataPlatform:postgres,postgres1.postgres.public.foo1,PROD)"
)
assert result is True
@pytest.mark.integration
def test_assertion_circuit_breaker_with_no_error(pytestconfig):
with patch("gql.client.Client.execute") as mock_gql_client:
test_resources_dir = pytestconfig.rootpath / "tests/integration/circuit_breaker"
f = open(
f"{test_resources_dir}/assertion_gql_response_with_no_error.json",
)
data = json.load(f)
mock_gql_client.side_effect = [lastUpdatedResponseBeforeLastAssertion, data]
config = AssertionCircuitBreakerConfig(datahub_host="dummy")
cb = AssertionCircuitBreaker(config)
result = cb.is_circuit_breaker_active(
urn="urn:li:dataset:(urn:li:dataPlatform:postgres,postgres1.postgres.public.foo1,PROD)"
)
assert result is False
@pytest.mark.integration
def test_assertion_circuit_breaker_updated_at_after_last_assertion(pytestconfig):
with patch("gql.client.Client.execute") as mock_gql_client:
test_resources_dir = pytestconfig.rootpath / "tests/integration/circuit_breaker"
f = open(
f"{test_resources_dir}/assertion_gql_response_with_no_error.json",
)
data = json.load(f)
mock_gql_client.side_effect = [lastUpdatedResponseAfterLastAssertion, data]
config = AssertionCircuitBreakerConfig(datahub_host="dummy")
cb = AssertionCircuitBreaker(config)
result = cb.is_circuit_breaker_active(
urn="urn:li:dataset:(urn:li:dataPlatform:postgres,postgres1.postgres.public.foo1,PROD)"
)
assert result is True
@pytest.mark.integration
def test_assertion_circuit_breaker_assertion_with_active_assertion(pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/circuit_breaker"
with patch("gql.client.Client.execute") as mock_gql_client:
f = open(
f"{test_resources_dir}/assertion_gql_response.json",
)
data = json.load(f)
mock_gql_client.side_effect = [lastUpdatedResponseBeforeLastAssertion, data]
config = AssertionCircuitBreakerConfig(datahub_host="dummy")
cb = AssertionCircuitBreaker(config)
result = cb.is_circuit_breaker_active(
urn="urn:li:dataset:(urn:li:dataPlatform:postgres,postgres1.postgres.public.foo1,PROD)"
)
assert result is True # add assertion here