diff --git a/.github/actions/setup-openmetadata-test-environment/action.yml b/.github/actions/setup-openmetadata-test-environment/action.yml index fded07a9bfb..f401eccda74 100644 --- a/.github/actions/setup-openmetadata-test-environment/action.yml +++ b/.github/actions/setup-openmetadata-test-environment/action.yml @@ -50,7 +50,7 @@ runs: - name: Start Server and Ingest Sample Data uses: nick-fields/retry@v2.8.3 env: - INGESTION_DEPENDENCY: "mysql,elasticsearch" + INGESTION_DEPENDENCY: "mysql,elasticsearch,sample-data" with: timeout_minutes: 60 max_attempts: 2 diff --git a/docker/run_local_docker.sh b/docker/run_local_docker.sh index 06e19c6c63a..665428af74a 100755 --- a/docker/run_local_docker.sh +++ b/docker/run_local_docker.sh @@ -141,14 +141,10 @@ curl --location --request PATCH 'localhost:8080/api/v1/dags/extended_sample_data echo 'Validate sample data DAG...' sleep 5 -python -m pip install ingestion/ +# This validates the sample data DAG flow +make install python docker/validate_compose.py -until curl -s -f --header "Authorization: Bearer $authorizationToken" "http://localhost:8585/api/v1/tables/name/sample_data.ecommerce_db.shopify.fact_sale"; do - echo 'Waiting on Sample Data Ingestion to complete...\n' - curl -v --header "Authorization: Bearer $authorizationToken" "http://localhost:8585/api/v1/tables" - sleep 5 -done sleep 5 curl --location --request PATCH 'localhost:8080/api/v1/dags/sample_usage' \ --header 'Authorization: Basic YWRtaW46YWRtaW4=' \ diff --git a/docker/validate_compose.py b/docker/validate_compose.py index 812d5e49596..ca805b86653 100644 --- a/docker/validate_compose.py +++ b/docker/validate_compose.py @@ -19,7 +19,6 @@ def get_last_run_info() -> Tuple[str, str]: max_retries = 15 retries = 0 - dag_runs = None while retries < max_retries: log_ansi_encoded_string(message="Waiting for DAG Run data...") time.sleep(5) @@ -34,15 +33,13 @@ def get_last_run_info() -> Tuple[str, str]: retries += 1 return None, None - - def print_last_run_logs() -> None: """ Show the logs """ logs = requests.get( - "http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data", + "http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data&task_id=ingest_using_recipe", auth=BASIC_AUTH, timeout=REQUESTS_TIMEOUT ).text @@ -56,14 +53,14 @@ def main(): while retries < max_retries: dag_run_id, state = get_last_run_info() if state == "success": - log_ansi_encoded_string(message=f"DAG run: [{dag_run_id}, {state}]") + print(f"DAG run: [{dag_run_id}, {state}]") print_last_run_logs() break else: - log_ansi_encoded_string( - message="Waiting for sample data ingestion to be a success. We'll show some logs along the way.", + print( + "Waiting for sample data ingestion to be a success. We'll show some logs along the way.", ) - log_ansi_encoded_string(message=f"DAG run: [{dag_run_id}, {state}]") + print(f"DAG run: [{dag_run_id}, {state}]") print_last_run_logs() time.sleep(10) retries += 1 diff --git a/ingestion/setup.py b/ingestion/setup.py index 03555dbbf98..42b9ff0c2f4 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -42,7 +42,6 @@ VERSIONS = { "azure-identity": "azure-identity~=1.12", "sqlalchemy-databricks": "sqlalchemy-databricks~=0.1", "databricks-sdk": "databricks-sdk>=0.18.0,<0.20.0", - "google": "google>=3.0.0", "trino": "trino[sqlalchemy]", "spacy": "spacy==3.5.0", "looker-sdk": "looker-sdk>=22.20.0", @@ -59,6 +58,7 @@ VERSIONS = { COMMONS = { "datalake": { + VERSIONS["avro"], VERSIONS["boto3"], VERSIONS["pandas"], VERSIONS["pyarrow"], @@ -83,38 +83,24 @@ COMMONS = { }, } -# required library for pii tagging -pii_requirements = { - VERSIONS["spacy"], - VERSIONS["pandas"], - "presidio-analyzer==2.2.32", -} base_requirements = { "antlr4-python3-runtime==4.9.2", VERSIONS["azure-identity"], "azure-keyvault-secrets", # Azure Key Vault SM - VERSIONS["avro"], # Used in sample data VERSIONS["boto3"], # Required in base for the secrets manager - "cached-property==1.5.2", - "chardet==4.0.0", - "croniter~=1.3.0", + "cached-property==1.5.2", # LineageParser + "chardet==4.0.0", # Used in the profiler "cryptography", - "email-validator>=1.0.3", - VERSIONS["google"], - "google-auth>=1.33.0", - VERSIONS["grpc-tools"], # Used in sample data - "idna<3,>=2.5", + "email-validator>=1.0.3", # For the pydantic generated models for Email "importlib-metadata>=4.13.0", # From airflow constraints "Jinja2>=2.11.3", "jsonpatch<2.0, >=1.24", - "jsonschema", "memory-profiler", "mypy_extensions>=0.4.3", VERSIONS["pydantic"], VERSIONS["pymysql"], "python-dateutil>=2.8.1", - "python-jose~=3.3", "PyYAML~=6.0", "requests>=2.23", "requests-aws4auth~=1.1", # Only depends on requests as external package. Leaving as base. @@ -123,7 +109,6 @@ base_requirements = { "collate-sqllineage~=1.3.0", "tabulate==0.9.0", "typing-inspect", - "wheel~=0.38.4", } @@ -202,7 +187,7 @@ plugins: Dict[str, Set[str]] = { "impyla~=0.18.0", }, "iceberg": { - "pyiceberg", + "pyiceberg<1", # Forcing the version of a few packages so it plays nicely with other requirements. VERSIONS["pydantic"], VERSIONS["adlfs"], @@ -258,6 +243,7 @@ plugins: Dict[str, Set[str]] = { }, "sagemaker": {VERSIONS["boto3"]}, "salesforce": {"simple_salesforce==1.11.4"}, + "sample-data": {VERSIONS["avro"], VERSIONS["grpc-tools"]}, "sap-hana": {"hdbcli", "sqlalchemy-hana"}, "sas": {}, "singlestore": {VERSIONS["pymysql"]}, @@ -267,7 +253,11 @@ plugins: Dict[str, Set[str]] = { "tableau": {VERSIONS["tableau"], VERSIONS["validators"], VERSIONS["packaging"]}, "trino": {VERSIONS["trino"]}, "vertica": {"sqlalchemy-vertica[vertica-python]>=0.0.5"}, - "pii-processor": pii_requirements, + "pii-processor": { + VERSIONS["spacy"], + VERSIONS["pandas"], + "presidio-analyzer==2.2.32", + }, } dev = { @@ -297,7 +287,6 @@ test = { "dbt-artifacts-parser", VERSIONS["sqlalchemy-databricks"], VERSIONS["databricks-sdk"], - VERSIONS["google"], VERSIONS["scikit-learn"], VERSIONS["pyarrow"], VERSIONS["trino"], @@ -312,6 +301,8 @@ test = { VERSIONS["snowflake"], VERSIONS["elasticsearch8"], VERSIONS["giturlparse"], + VERSIONS["avro"], # Sample Data + VERSIONS["grpc-tools"], "testcontainers==3.7.1", } diff --git a/ingestion/src/metadata/readers/dataframe/avro.py b/ingestion/src/metadata/readers/dataframe/avro.py index 3775594bac7..bff8dfce208 100644 --- a/ingestion/src/metadata/readers/dataframe/avro.py +++ b/ingestion/src/metadata/readers/dataframe/avro.py @@ -14,13 +14,8 @@ Avro DataFrame reader """ import io -from avro.datafile import DataFileReader -from avro.errors import InvalidAvroBinaryEncoding -from avro.io import DatumReader - from metadata.generated.schema.entity.data.table import Column from metadata.generated.schema.type.schema import DataTypeTopic -from metadata.parsers.avro_parser import parse_avro_schema from metadata.readers.dataframe.base import DataFrameReader from metadata.readers.dataframe.common import dataframe_to_chunks from metadata.readers.dataframe.models import DatalakeColumnWrapper @@ -51,8 +46,13 @@ class AvroDataFrameReader(DataFrameReader): Method to parse the avro data from storage sources """ # pylint: disable=import-outside-toplevel + from avro.datafile import DataFileReader + from avro.errors import InvalidAvroBinaryEncoding + from avro.io import DatumReader from pandas import DataFrame, Series + from metadata.parsers.avro_parser import parse_avro_schema + try: elements = DataFileReader(io.BytesIO(avro_text), DatumReader()) if elements.meta.get(AVRO_SCHEMA): diff --git a/ingestion/tests/integration/data_insight/producer/test_producers.py b/ingestion/tests/integration/data_insight/producer/test_producers.py index d62948906e1..af9bffae03d 100644 --- a/ingestion/tests/integration/data_insight/producer/test_producers.py +++ b/ingestion/tests/integration/data_insight/producer/test_producers.py @@ -9,7 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Validate data insight producer class for entitites.""" +"""Validate data insight producer class for entities.""" import random import uuid @@ -62,7 +62,7 @@ data_insight_config = { class TestEntityProducer(TestCase): - """test entity producer""" + """test entity producer. Note that this test requires the sample data to be ingested.""" @classmethod def setUpClass(cls):