Claudio Benfatto aeefde4fa1
feat(ingestion): Kafka stateful ingestion (#4028)
* test: test stateful ingestion for kafka

test: some more advancement

test: some improvements

refactoring

* refactor: remove some linter modifications

* tests: add unit tests for kafka state

* refactor: minor changes

* tests: improve test coverage

* fix: fix naming

* style: fix format with black

* fix: fix broken test

* revert: revert smoke tests to master

* feat: add reporting to kafka source

* tests: add smoke tests for kafka reporting

* revert: revert changes to the smoke tests

* test: add kafka integration test for stateful ingestion

* docs: update documentation on kafka source

* fix: return empty string when no platform instance

* revert: remove unwanted file

* fix: solve problem with platform instance

* chore: use console sink instead of file

* fix: disable complexity check for _extract_record

* fix: remove if condition in get_platform_instance_id

* chore: remove unneeded integration test

* test: test platform instance in kafka source unit tests
2022-02-15 07:18:36 -08:00

40 lines
837 B
Python

import logging
import os
import time
import pytest
from tests.test_helpers.docker_helpers import docker_compose_runner # noqa: F401
from tests.test_helpers.state_helpers import mock_datahub_graph # noqa: F401
try:
# See https://github.com/spulec/freezegun/issues/98#issuecomment-590553475.
import pandas # noqa: F401
except ImportError:
pass
# Enable debug logging.
logging.getLogger().setLevel(logging.DEBUG)
os.putenv("DATAHUB_DEBUG", "1")
# Disable telemetry
os.putenv("DATAHUB_TELEMETRY_ENABLED", "false")
@pytest.fixture
def mock_time(monkeypatch):
def fake_time():
return 1615443388.0975091
monkeypatch.setattr(time, "time", fake_time)
yield
def pytest_addoption(parser):
parser.addoption(
"--update-golden-files",
action="store_true",
default=False,
)