Fix query log parser (#8920)

This commit is contained in:
Pere Miquel Brull 2022-11-21 13:55:55 +01:00 committed by GitHub
parent edadfba9b3
commit 83ef3315c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 58 deletions

View File

@ -1,4 +1,4 @@
query,database_name,schema_name,user_name,start_time,end_time,aborted
select * from sales;,default,information_schema,root,2011-10-05 14:48:00,2011-10-05 14:48:01,TRUE
select * from marketing;,default,information_schema,root,2011-10-05 14:48:00,2011-10-05 14:48:01,FALSE
insert into marketing select * from sales;,default,information_schema,root,2011-10-05 14:48:00,2011-10-05 14:48:01,TRUE
query,database_name,schema_name
"select * from sales",default,information_schema
"select * from marketing",default,information_schema
"insert into marketing select * from sales",default,information_schema
1 query database_name schema_name user_name start_time end_time aborted
2 select * from sales; select * from sales default information_schema root 2011-10-05 14:48:00 2011-10-05 14:48:01 TRUE
3 select * from marketing; select * from marketing default information_schema root 2011-10-05 14:48:00 2011-10-05 14:48:01 FALSE
4 insert into marketing select * from sales; insert into marketing select * from sales default information_schema root 2011-10-05 14:48:00 2011-10-05 14:48:01 TRUE

View File

@ -5,7 +5,9 @@ slug: /connectors/ingestion/workflows/lineage/lineage-workflow-query-logs
# Lineage Workflow Through Query Logs
The following database connectors supports lineage workflow in OpenMetadata:
In order to extract lineage information, OpenMetadata parses the queries that have run against the database. This query
log information is available from WITHIN the database in the following services:
- [BigQuery](/connectors/database/bigquery)
- [Snowflake](/connectors/database/snowflake)
- [MSSQL](/connectors/database/mssql)
@ -14,16 +16,25 @@ The following database connectors supports lineage workflow in OpenMetadata:
- [Databricks](/connectors/database/databricks)
- [Postgres](/connectors/database/postgres)
If you are using any other database connector, direct execution of lineage workflow is not possible. This is mainly because these database connectors does not maintain query execution logs which is required for lineage workflow. This documentation will help you to learn, how to execute the lineage workflow using a query log file for all the database connectors.
If you are using any other database connector, direct execution of lineage workflow is not possible.
This is mainly because these database connectors does not maintain query execution logs which is required for lineage workflow.
If you are interested in running the lineage workflow for a connector not listed above, this documentation will help
you to execute the lineage workflow using a query log file. This can be arbitrarily executed for **any** database connector.
## Query Log File
A query log file is a CSV file which contains the following information.
- **query:** This field contains the literal query that has been executed in the database.
- **user_name (optional):** Enter the database user name which has executed this query.
- **start_time (optional):** Enter the query execution start time in YYYY-MM-DD HH:MM:SS format.
- **end_time (optional):** Enter the query execution end time in YYYY-MM-DD HH:MM:SS format.
- **aborted (optional):** This field accepts values as true or false and indicates whether the query was aborted during execution
A query log file is a standard CSV file which contains the following information.
<Note>
A standard CSV should be comma separated, and each row represented as a single line in the file.
</Note>
- **query_text:** This field contains the literal query that has been executed in the database. It is quite possible
that your query has commas `,` inside. Then, wrap each query in quotes `"<query>"` to not have any clashes
with the comma as a separator.
- **database_name (optional):** Enter the database name on which the query was executed.
- **schema_name (optional):** Enter the schema name to which the query is associated.
@ -33,46 +44,36 @@ Checkout a sample query log file [here](https://github.com/open-metadata/OpenMet
In order to run a Lineage Workflow we need to make sure that Metadata Ingestion Workflow for corresponding service has already been executed. We will follow the steps to create a JSON configuration able to collect the query log file and execute the lineage workflow.
### 1. Create a configuration file using template YAML
Create a new file called `query_log_lineage.yaml` in the current directory. Note that the current directory should be the openmetadata directory.
Copy and paste the configuration template below into the `query_log_lineage.yaml` the file you created.
```yaml
source:
type: query-log-lineage
serviceName: local_mysql
serviceConnection:
config:
type: Mysql
username: openmetadata_user
password: openmetadata_password
hostPort: localhost:3306
connectionOptions: {}
connectionArguments: {}
serviceName: <name>
sourceConfig:
config:
type: DatabaseLineage
queryLogFilePath: <path to query log file>
processor:
type: query-parser
sink:
type: metadata-rest
config: {}
stage:
type: table-lineage
config:
filename: /tmp/query_log_lineage
bulkSink:
type: metadata-lineage
config:
filename: /tmp/query_log_lineage
workflowConfig:
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
```
The `serviceName` and `serviceConnection` used in the above config has to be the same as used during Metadata Ingestion.
The sourceConfig is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryLineagePipeline.json).
- queryLogFilePath: Enter the file path of query log csv file.
The `serviceName` should be a service already ingested in OpenMetadata.
- **queryLogFilePath**: Enter the file path of query log csv file.
### 2. Run with the CLI
First, we will need to save the YAML file. Afterward, and with all requirements installed, we can run:
```yaml
metadata ingest -c <path-to-yaml>
```
Note that from connector-to-connector, this recipe will always be the same. By updating the YAML configuration, you will be able to extract metadata from different sources.

View File

@ -5,7 +5,9 @@ slug: /connectors/ingestion/workflows/usage/usage-workflow-query-logs
# Usage Workflow Through Query Logs
The following database connectors supports usage workflow in OpenMetadata:
In order to extract usage information, OpenMetadata parses the queries that have run against the database. This query
log information is available from WITHIN the database in the following services:
- [BigQuery](/connectors/database/bigquery)
- [Snowflake](/connectors/database/snowflake)
- [MSSQL](/connectors/database/mssql)
@ -14,13 +16,25 @@ The following database connectors supports usage workflow in OpenMetadata:
- [Databricks](/connectors/database/databricks)
- [Postgres](/connectors/database/postgres)
If you are using any other database connector, direct execution of usage workflow is not possible. This is mainly because these database connectors does not maintain query execution logs which is required for usage workflow. This documentation will help you to learn, how to execute the usage workflow using a query log file for all the database connectors.
If you are using any other database connector, direct execution of lineage workflow is not possible.
This is mainly because these database connectors does not maintain query execution logs which is required for lineage workflow.
If you are interested in running the usage workflow for a connector not listed above, this documentation will help
you to execute the lineage workflow using a query log file. This can be arbitrarily executed for **any** database connector.
## Query Log File
A query log file is a CSV file which contains the following information.
- **query:** This field contains the literal query that has been executed in the database.
- **user_name (optional):** Enter the database user name which has executed this query.
A query log file is a standard CSV file which contains the following information.
<Note>
A standard CSV should be comma separated, and each row represented as a single line in the file.
</Note>
- **query_text:** This field contains the literal query that has been executed in the database. It is quite possible
that your query has commas `,` inside. Then, wrap each query in quotes `"<query>"` to not have any clashes
with the comma as a separator.- **user_name (optional):** Enter the database user name which has executed this query.
- **start_time (optional):** Enter the query execution start time in YYYY-MM-DD HH:MM:SS format.
- **end_time (optional):** Enter the query execution end time in YYYY-MM-DD HH:MM:SS format.
- **aborted (optional):** This field accepts values as true or false and indicates whether the query was aborted during execution
@ -33,22 +47,17 @@ Checkout a sample query log file [here](https://github.com/open-metadata/OpenMet
In order to run a Usage Workflow we need to make sure that Metadata Ingestion Workflow for corresponding service has already been executed. We will follow the steps to create a JSON configuration able to collect the query log file and execute the usage workflow.
### 1. Create a configuration file using template YAML
Create a new file called `query_log_usage.yaml` in the current directory. Note that the current directory should be the openmetadata directory.
Copy and paste the configuration template below into the `query_log_usage.yaml` the file you created.
```yaml
source:
type: query-log-usage
serviceName: local_mysql
serviceConnection:
config:
type: Mysql
username: openmetadata_user
password: openmetadata_password
hostPort: localhost:3306
connectionOptions: {}
connectionArguments: {}
serviceName: <name>
sourceConfig:
config:
type: DatabaseUsage
queryLogFilePath: <path to query log file>
processor:
type: query-parser
@ -56,23 +65,26 @@ processor:
stage:
type: table-usage
config:
filename: /tmp/query_log_usage
filename: "/tmp/query-usage"
bulkSink:
type: metadata-usage
config:
filename: /tmp/query_log_usage
filename: "/tmp/query-usage"
workflowConfig:
openMetadataServerConfig:
hostPort: <OpenMetadata host and port>
authProvider: <OpenMetadata auth provider>
```
The `serviceName` and `serviceConnection` used in the above config has to be the same as used during Metadata Ingestion.
The sourceConfig is defined [here](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/databaseServiceQueryUsagePipeline.json).
- queryLogFilePath: Enter the file path of query log csv file.
The `serviceName` should be a service already ingested in OpenMetadata.
- **queryLogFilePath**: Enter the file path of query log csv file.
### 2. Run with the CLI
First, we will need to save the YAML file. Afterward, and with all requirements installed, we can run:
```yaml
metadata ingest -c <path-to-yaml>
```
Note that from connector-to-connector, this recipe will always be the same. By updating the YAML configuration, you will be able to extract metadata from different sources.

View File

@ -43,6 +43,7 @@
"PinotDB",
"Datalake",
"DomoDatabase",
"QueryLog",
"CustomDatabase"
],
"javaEnums": [
@ -130,6 +131,9 @@
{
"name": "DomoDatabase"
},
{
"name": "QueryLog"
},
{
"name": "CustomDatabase"
}

View File

@ -51,7 +51,7 @@
"type": "string"
}
},
"required": ["sql", "serviceName", "tables", "databaseName"]
"required": ["sql", "serviceName", "tables"]
}
},
"properties": {

View File

@ -78,6 +78,6 @@
"type": "string"
}
},
"required": ["table", "date", "databaseName", "serviceName"],
"required": ["table", "date", "serviceName"],
"additionalProperties": false
}

View File

@ -149,10 +149,12 @@ export const excludedService = [
MetadataServiceType.OpenMetadata,
];
export const IGNORED_DB_SERVICES: Array<string> = ['QueryLog'];
export const serviceTypes: Record<ServiceTypes, Array<string>> = {
databaseServices: (Object.values(DatabaseServiceType) as string[]).sort(
customServiceComparator
),
databaseServices: (Object.values(DatabaseServiceType) as string[])
.filter((key: string) => !IGNORED_DB_SERVICES.includes(key))
.sort(customServiceComparator),
messagingServices: (Object.values(MessagingServiceType) as string[]).sort(
customServiceComparator
),