MINOR - Clean ingestion dependencies (#15679)

* WIP - MINOR - Clean ingestion dependencies

* test

* test

* Clean imports

* add pyiceberg for test

* Revert "add pyiceberg for test"

This reverts commit ab26942736586f089a57a644ffd727aca200db62.

* add pyiceberg for test

* Remove docker dep

* clean local docker sh

* MINOR - AKS Airflow troubleshooting docs

* Fix action

* clean local docker sh
This commit is contained in:
Pere Miquel Brull 2024-04-11 14:30:40 +02:00 committed by GitHub
parent 6ee491dd00
commit a1404e6b4a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 28 additions and 44 deletions

View File

@ -50,7 +50,7 @@ runs:
- name: Start Server and Ingest Sample Data
uses: nick-fields/retry@v2.8.3
env:
INGESTION_DEPENDENCY: "mysql,elasticsearch"
INGESTION_DEPENDENCY: "mysql,elasticsearch,sample-data"
with:
timeout_minutes: 60
max_attempts: 2

View File

@ -141,14 +141,10 @@ curl --location --request PATCH 'localhost:8080/api/v1/dags/extended_sample_data
echo 'Validate sample data DAG...'
sleep 5
python -m pip install ingestion/
# This validates the sample data DAG flow
make install
python docker/validate_compose.py
until curl -s -f --header "Authorization: Bearer $authorizationToken" "http://localhost:8585/api/v1/tables/name/sample_data.ecommerce_db.shopify.fact_sale"; do
echo 'Waiting on Sample Data Ingestion to complete...\n'
curl -v --header "Authorization: Bearer $authorizationToken" "http://localhost:8585/api/v1/tables"
sleep 5
done
sleep 5
curl --location --request PATCH 'localhost:8080/api/v1/dags/sample_usage' \
--header 'Authorization: Basic YWRtaW46YWRtaW4=' \

View File

@ -19,7 +19,6 @@ def get_last_run_info() -> Tuple[str, str]:
max_retries = 15
retries = 0
dag_runs = None
while retries < max_retries:
log_ansi_encoded_string(message="Waiting for DAG Run data...")
time.sleep(5)
@ -34,15 +33,13 @@ def get_last_run_info() -> Tuple[str, str]:
retries += 1
return None, None
def print_last_run_logs() -> None:
"""
Show the logs
"""
logs = requests.get(
"http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data",
"http://localhost:8080/api/v1/openmetadata/last_dag_logs?dag_id=sample_data&task_id=ingest_using_recipe",
auth=BASIC_AUTH,
timeout=REQUESTS_TIMEOUT
).text
@ -56,14 +53,14 @@ def main():
while retries < max_retries:
dag_run_id, state = get_last_run_info()
if state == "success":
log_ansi_encoded_string(message=f"DAG run: [{dag_run_id}, {state}]")
print(f"DAG run: [{dag_run_id}, {state}]")
print_last_run_logs()
break
else:
log_ansi_encoded_string(
message="Waiting for sample data ingestion to be a success. We'll show some logs along the way.",
print(
"Waiting for sample data ingestion to be a success. We'll show some logs along the way.",
)
log_ansi_encoded_string(message=f"DAG run: [{dag_run_id}, {state}]")
print(f"DAG run: [{dag_run_id}, {state}]")
print_last_run_logs()
time.sleep(10)
retries += 1

View File

@ -42,7 +42,6 @@ VERSIONS = {
"azure-identity": "azure-identity~=1.12",
"sqlalchemy-databricks": "sqlalchemy-databricks~=0.1",
"databricks-sdk": "databricks-sdk>=0.18.0,<0.20.0",
"google": "google>=3.0.0",
"trino": "trino[sqlalchemy]",
"spacy": "spacy==3.5.0",
"looker-sdk": "looker-sdk>=22.20.0",
@ -59,6 +58,7 @@ VERSIONS = {
COMMONS = {
"datalake": {
VERSIONS["avro"],
VERSIONS["boto3"],
VERSIONS["pandas"],
VERSIONS["pyarrow"],
@ -83,38 +83,24 @@ COMMONS = {
},
}
# required library for pii tagging
pii_requirements = {
VERSIONS["spacy"],
VERSIONS["pandas"],
"presidio-analyzer==2.2.32",
}
base_requirements = {
"antlr4-python3-runtime==4.9.2",
VERSIONS["azure-identity"],
"azure-keyvault-secrets", # Azure Key Vault SM
VERSIONS["avro"], # Used in sample data
VERSIONS["boto3"], # Required in base for the secrets manager
"cached-property==1.5.2",
"chardet==4.0.0",
"croniter~=1.3.0",
"cached-property==1.5.2", # LineageParser
"chardet==4.0.0", # Used in the profiler
"cryptography",
"email-validator>=1.0.3",
VERSIONS["google"],
"google-auth>=1.33.0",
VERSIONS["grpc-tools"], # Used in sample data
"idna<3,>=2.5",
"email-validator>=1.0.3", # For the pydantic generated models for Email
"importlib-metadata>=4.13.0", # From airflow constraints
"Jinja2>=2.11.3",
"jsonpatch<2.0, >=1.24",
"jsonschema",
"memory-profiler",
"mypy_extensions>=0.4.3",
VERSIONS["pydantic"],
VERSIONS["pymysql"],
"python-dateutil>=2.8.1",
"python-jose~=3.3",
"PyYAML~=6.0",
"requests>=2.23",
"requests-aws4auth~=1.1", # Only depends on requests as external package. Leaving as base.
@ -123,7 +109,6 @@ base_requirements = {
"collate-sqllineage~=1.3.0",
"tabulate==0.9.0",
"typing-inspect",
"wheel~=0.38.4",
}
@ -202,7 +187,7 @@ plugins: Dict[str, Set[str]] = {
"impyla~=0.18.0",
},
"iceberg": {
"pyiceberg",
"pyiceberg<1",
# Forcing the version of a few packages so it plays nicely with other requirements.
VERSIONS["pydantic"],
VERSIONS["adlfs"],
@ -258,6 +243,7 @@ plugins: Dict[str, Set[str]] = {
},
"sagemaker": {VERSIONS["boto3"]},
"salesforce": {"simple_salesforce==1.11.4"},
"sample-data": {VERSIONS["avro"], VERSIONS["grpc-tools"]},
"sap-hana": {"hdbcli", "sqlalchemy-hana"},
"sas": {},
"singlestore": {VERSIONS["pymysql"]},
@ -267,7 +253,11 @@ plugins: Dict[str, Set[str]] = {
"tableau": {VERSIONS["tableau"], VERSIONS["validators"], VERSIONS["packaging"]},
"trino": {VERSIONS["trino"]},
"vertica": {"sqlalchemy-vertica[vertica-python]>=0.0.5"},
"pii-processor": pii_requirements,
"pii-processor": {
VERSIONS["spacy"],
VERSIONS["pandas"],
"presidio-analyzer==2.2.32",
},
}
dev = {
@ -297,7 +287,6 @@ test = {
"dbt-artifacts-parser",
VERSIONS["sqlalchemy-databricks"],
VERSIONS["databricks-sdk"],
VERSIONS["google"],
VERSIONS["scikit-learn"],
VERSIONS["pyarrow"],
VERSIONS["trino"],
@ -312,6 +301,8 @@ test = {
VERSIONS["snowflake"],
VERSIONS["elasticsearch8"],
VERSIONS["giturlparse"],
VERSIONS["avro"], # Sample Data
VERSIONS["grpc-tools"],
"testcontainers==3.7.1",
}

View File

@ -14,13 +14,8 @@ Avro DataFrame reader
"""
import io
from avro.datafile import DataFileReader
from avro.errors import InvalidAvroBinaryEncoding
from avro.io import DatumReader
from metadata.generated.schema.entity.data.table import Column
from metadata.generated.schema.type.schema import DataTypeTopic
from metadata.parsers.avro_parser import parse_avro_schema
from metadata.readers.dataframe.base import DataFrameReader
from metadata.readers.dataframe.common import dataframe_to_chunks
from metadata.readers.dataframe.models import DatalakeColumnWrapper
@ -51,8 +46,13 @@ class AvroDataFrameReader(DataFrameReader):
Method to parse the avro data from storage sources
"""
# pylint: disable=import-outside-toplevel
from avro.datafile import DataFileReader
from avro.errors import InvalidAvroBinaryEncoding
from avro.io import DatumReader
from pandas import DataFrame, Series
from metadata.parsers.avro_parser import parse_avro_schema
try:
elements = DataFileReader(io.BytesIO(avro_text), DatumReader())
if elements.meta.get(AVRO_SCHEMA):

View File

@ -9,7 +9,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""Validate data insight producer class for entitites."""
"""Validate data insight producer class for entities."""
import random
import uuid
@ -62,7 +62,7 @@ data_insight_config = {
class TestEntityProducer(TestCase):
"""test entity producer"""
"""test entity producer. Note that this test requires the sample data to be ingested."""
@classmethod
def setUpClass(cls):