diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py index 9ab9c76c30..7a2dbda8b4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py @@ -50,6 +50,7 @@ from datahub.ingestion.source.sql.sql_config import ( BasicSQLAlchemyConfig, make_sqlalchemy_uri, ) +from datahub.ingestion.source.sql.sql_report import SQLSourceReport from datahub.metadata.schema_classes import ( BooleanTypeClass, NumberTypeClass, @@ -78,6 +79,11 @@ class SQLServerConfig(BasicSQLAlchemyConfig): include_stored_procedures_code: bool = Field( default=True, description="Include information about object code." ) + procedure_pattern: AllowDenyPattern = Field( + default=AllowDenyPattern.allow_all(), + description="Regex patterns for stored procedures to filter in ingestion." + "Specify regex to match the entire procedure name in database.schema.procedure_name format. e.g. to match all procedures starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'", + ) include_jobs: bool = Field( default=True, description="Include ingest of MSSQL Jobs. Requires access to the 'msdb' and 'sys' schema.", @@ -164,6 +170,8 @@ class SQLServerSource(SQLAlchemySource): If you do use pyodbc, make sure to change the source type from `mssql` to `mssql-odbc` so that we pull in the right set of dependencies. This will be needed in most cases where encryption is required, such as managed SQL Server services in Azure. """ + report: SQLSourceReport + def __init__(self, config: SQLServerConfig, ctx: PipelineContext): super().__init__(config, ctx, "mssql") # Cache the table and column descriptions @@ -416,10 +424,16 @@ class SQLServerSource(SQLAlchemySource): data_flow = MSSQLDataFlow(entity=mssql_default_job) with inspector.engine.connect() as conn: procedures_data_list = self._get_stored_procedures(conn, db_name, schema) - procedures = [ - StoredProcedure(flow=mssql_default_job, **procedure_data) - for procedure_data in procedures_data_list - ] + procedures: List[StoredProcedure] = [] + for procedure_data in procedures_data_list: + procedure_full_name = f"{db_name}.{schema}.{procedure_data['name']}" + if not self.config.procedure_pattern.allowed(procedure_full_name): + self.report.report_dropped(procedure_full_name) + continue + procedures.append( + StoredProcedure(flow=mssql_default_job, **procedure_data) + ) + if procedures: yield from self.construct_flow_workunits(data_flow=data_flow) for procedure in procedures: diff --git a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json index 3836e587ef..1d702214fe 100644 --- a/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json +++ b/metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json @@ -113,11 +113,11 @@ "aspect": { "json": { "customProperties": { - "job_id": "4130c37d-146c-43da-a671-dd9a413a44b3", + "job_id": "2a055367-5e6a-4162-b3a9-dd60f52c79a8", "job_name": "Weekly Demo Data Backup", "description": "No description available.", - "date_created": "2024-11-22 12:58:03.260000", - "date_modified": "2024-11-22 12:58:03.440000", + "date_created": "2024-11-26 07:22:19.640000", + "date_modified": "2024-11-26 07:22:19.773000", "step_id": "1", "step_name": "Set database to read only", "subsystem": "TSQL", @@ -2282,8 +2282,8 @@ "code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n", "input parameters": "['@ID']", "parameter @ID": "{'type': 'int'}", - "date_created": "2024-11-22 12:58:03.137000", - "date_modified": "2024-11-22 12:58:03.137000" + "date_created": "2024-11-26 07:22:19.510000", + "date_modified": "2024-11-26 07:22:19.510000" }, "externalUrl": "", "name": "DemoData.Foo.Proc.With.SpecialChar", @@ -2298,34 +2298,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)", - "changeType": "UPSERT", - "aspectName": "dataJobInfo", - "aspect": { - "json": { - "customProperties": { - "procedure_depends_on": "{'DemoData.Foo.age_dist': 'USER_TABLE', 'DemoData.Foo.Items': 'USER_TABLE', 'DemoData.Foo.Persons': 'USER_TABLE', 'DemoData.Foo.SalesReason': 'USER_TABLE'}", - "depending_on_procedure": "{}", - "code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n", - "input parameters": "[]", - "date_created": "2024-11-22 12:58:03.140000", - "date_modified": "2024-11-22 12:58:03.140000" - }, - "externalUrl": "", - "name": "DemoData.Foo.NewProc", - "type": { - "string": "MSSQL_STORED_PROCEDURE" - } - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "mssql-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "container", "entityUrn": "urn:li:container:250ce23f940485303fa5e5d4f5194975", @@ -2713,22 +2685,6 @@ "lastRunId": "no-run-id-provided" } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)", - "changeType": "UPSERT", - "aspectName": "status", - "aspect": { - "json": { - "removed": false - } - }, - "systemMetadata": { - "lastObserved": 1615443388097, - "runId": "mssql-test", - "lastRunId": "no-run-id-provided" - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)", diff --git a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_with_filter.yml b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_with_filter.yml index 3749499074..703f60b277 100644 --- a/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_with_filter.yml +++ b/metadata-ingestion/tests/integration/sql_server/source_files/mssql_no_db_with_filter.yml @@ -9,6 +9,9 @@ source: database_pattern: deny: - NewData + procedure_pattern: + deny: + - DemoData.Foo.NewProc sink: type: file