feat(ingest/mssql): allow filtering by procedure_pattern (#11953)

This commit is contained in:
Mayuri Nehate 2024-11-26 21:46:42 +05:30 committed by GitHub
parent 07033a7015
commit 2e3b4294bb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 26 additions and 53 deletions

View File

@ -50,6 +50,7 @@ from datahub.ingestion.source.sql.sql_config import (
BasicSQLAlchemyConfig, BasicSQLAlchemyConfig,
make_sqlalchemy_uri, make_sqlalchemy_uri,
) )
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
BooleanTypeClass, BooleanTypeClass,
NumberTypeClass, NumberTypeClass,
@ -78,6 +79,11 @@ class SQLServerConfig(BasicSQLAlchemyConfig):
include_stored_procedures_code: bool = Field( include_stored_procedures_code: bool = Field(
default=True, description="Include information about object code." default=True, description="Include information about object code."
) )
procedure_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description="Regex patterns for stored procedures to filter in ingestion."
"Specify regex to match the entire procedure name in database.schema.procedure_name format. e.g. to match all procedures starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
)
include_jobs: bool = Field( include_jobs: bool = Field(
default=True, default=True,
description="Include ingest of MSSQL Jobs. Requires access to the 'msdb' and 'sys' schema.", description="Include ingest of MSSQL Jobs. Requires access to the 'msdb' and 'sys' schema.",
@ -164,6 +170,8 @@ class SQLServerSource(SQLAlchemySource):
If you do use pyodbc, make sure to change the source type from `mssql` to `mssql-odbc` so that we pull in the right set of dependencies. This will be needed in most cases where encryption is required, such as managed SQL Server services in Azure. If you do use pyodbc, make sure to change the source type from `mssql` to `mssql-odbc` so that we pull in the right set of dependencies. This will be needed in most cases where encryption is required, such as managed SQL Server services in Azure.
""" """
report: SQLSourceReport
def __init__(self, config: SQLServerConfig, ctx: PipelineContext): def __init__(self, config: SQLServerConfig, ctx: PipelineContext):
super().__init__(config, ctx, "mssql") super().__init__(config, ctx, "mssql")
# Cache the table and column descriptions # Cache the table and column descriptions
@ -416,10 +424,16 @@ class SQLServerSource(SQLAlchemySource):
data_flow = MSSQLDataFlow(entity=mssql_default_job) data_flow = MSSQLDataFlow(entity=mssql_default_job)
with inspector.engine.connect() as conn: with inspector.engine.connect() as conn:
procedures_data_list = self._get_stored_procedures(conn, db_name, schema) procedures_data_list = self._get_stored_procedures(conn, db_name, schema)
procedures = [ procedures: List[StoredProcedure] = []
StoredProcedure(flow=mssql_default_job, **procedure_data) for procedure_data in procedures_data_list:
for procedure_data in procedures_data_list procedure_full_name = f"{db_name}.{schema}.{procedure_data['name']}"
] if not self.config.procedure_pattern.allowed(procedure_full_name):
self.report.report_dropped(procedure_full_name)
continue
procedures.append(
StoredProcedure(flow=mssql_default_job, **procedure_data)
)
if procedures: if procedures:
yield from self.construct_flow_workunits(data_flow=data_flow) yield from self.construct_flow_workunits(data_flow=data_flow)
for procedure in procedures: for procedure in procedures:

View File

@ -113,11 +113,11 @@
"aspect": { "aspect": {
"json": { "json": {
"customProperties": { "customProperties": {
"job_id": "4130c37d-146c-43da-a671-dd9a413a44b3", "job_id": "2a055367-5e6a-4162-b3a9-dd60f52c79a8",
"job_name": "Weekly Demo Data Backup", "job_name": "Weekly Demo Data Backup",
"description": "No description available.", "description": "No description available.",
"date_created": "2024-11-22 12:58:03.260000", "date_created": "2024-11-26 07:22:19.640000",
"date_modified": "2024-11-22 12:58:03.440000", "date_modified": "2024-11-26 07:22:19.773000",
"step_id": "1", "step_id": "1",
"step_name": "Set database to read only", "step_name": "Set database to read only",
"subsystem": "TSQL", "subsystem": "TSQL",
@ -2282,8 +2282,8 @@
"code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n", "code": "CREATE PROCEDURE [Foo].[Proc.With.SpecialChar] @ID INT\nAS\n SELECT @ID AS ThatDB;\n",
"input parameters": "['@ID']", "input parameters": "['@ID']",
"parameter @ID": "{'type': 'int'}", "parameter @ID": "{'type': 'int'}",
"date_created": "2024-11-22 12:58:03.137000", "date_created": "2024-11-26 07:22:19.510000",
"date_modified": "2024-11-22 12:58:03.137000" "date_modified": "2024-11-26 07:22:19.510000"
}, },
"externalUrl": "", "externalUrl": "",
"name": "DemoData.Foo.Proc.With.SpecialChar", "name": "DemoData.Foo.Proc.With.SpecialChar",
@ -2298,34 +2298,6 @@
"lastRunId": "no-run-id-provided" "lastRunId": "no-run-id-provided"
} }
}, },
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
"changeType": "UPSERT",
"aspectName": "dataJobInfo",
"aspect": {
"json": {
"customProperties": {
"procedure_depends_on": "{'DemoData.Foo.age_dist': 'USER_TABLE', 'DemoData.Foo.Items': 'USER_TABLE', 'DemoData.Foo.Persons': 'USER_TABLE', 'DemoData.Foo.SalesReason': 'USER_TABLE'}",
"depending_on_procedure": "{}",
"code": "CREATE PROCEDURE [Foo].[NewProc]\n AS\n BEGIN\n --insert into items table from salesreason table\n insert into Foo.Items (ID, ItemName)\n SELECT TempID, Name\n FROM Foo.SalesReason;\n\n\n IF OBJECT_ID('Foo.age_dist', 'U') IS NULL\n BEGIN\n -- Create and populate if table doesn't exist\n SELECT Age, COUNT(*) as Count\n INTO Foo.age_dist\n FROM Foo.Persons\n GROUP BY Age\n END\n ELSE\n BEGIN\n -- Update existing table\n TRUNCATE TABLE Foo.age_dist;\n\n INSERT INTO Foo.age_dist (Age, Count)\n SELECT Age, COUNT(*) as Count\n FROM Foo.Persons\n GROUP BY Age\n END\n\n SELECT ID, Age INTO #TEMPTABLE FROM NewData.FooNew.PersonsNew\n \n UPDATE DemoData.Foo.Persons\n SET Age = t.Age\n FROM DemoData.Foo.Persons p\n JOIN #TEMPTABLE t ON p.ID = t.ID\n\n END\n",
"input parameters": "[]",
"date_created": "2024-11-22 12:58:03.140000",
"date_modified": "2024-11-22 12:58:03.140000"
},
"externalUrl": "",
"name": "DemoData.Foo.NewProc",
"type": {
"string": "MSSQL_STORED_PROCEDURE"
}
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{ {
"entityType": "container", "entityType": "container",
"entityUrn": "urn:li:container:250ce23f940485303fa5e5d4f5194975", "entityUrn": "urn:li:container:250ce23f940485303fa5e5d4f5194975",
@ -2713,22 +2685,6 @@
"lastRunId": "no-run-id-provided" "lastRunId": "no-run-id-provided"
} }
}, },
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),NewProc)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1615443388097,
"runId": "mssql-test",
"lastRunId": "no-run-id-provided"
}
},
{ {
"entityType": "dataJob", "entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(mssql,DemoData.Foo.stored_procedures,PROD),Proc.With.SpecialChar)",

View File

@ -9,6 +9,9 @@ source:
database_pattern: database_pattern:
deny: deny:
- NewData - NewData
procedure_pattern:
deny:
- DemoData.Foo.NewProc
sink: sink:
type: file type: file