mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-26 15:10:05 +00:00
MINOR: Initial proposal for the 'developing a connector' user guide (#15335)
* Initial proposal for the 'developing a connector' user guide * Add small introduction to the next steps before the link at the end
This commit is contained in:
parent
8b72fdaed6
commit
22d0b08392
@ -0,0 +1,185 @@
|
||||
---
|
||||
title: Apply UI Changes
|
||||
slug: /developers/contribute/developing-a-new-connector/apply-ui-changes
|
||||
---
|
||||
|
||||
# Apply UI Changes
|
||||
|
||||
To be able to configure your connector from the UI and test it through there as well you will need to modify a file located within [`openmetadata-ui/src/main/resources/ui/src/utils/`](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-ui/src/main/resources/ui/src/utils)
|
||||
|
||||
Which file you need to modify depends on the Source Type you are developing a connector for: `{source_type}ServiceUtils.ts`.
|
||||
|
||||
The change itself is pretty straightforward since you only need to add the JSON Schema connection you created.
|
||||
|
||||
### Example - MySQL
|
||||
|
||||
{% note %}
|
||||
The file will be shortened and parts of it will be replaced with `...` for readability.
|
||||
{% /note %}
|
||||
|
||||
{% codePreview %}
|
||||
|
||||
{% codeInfoContainer %}
|
||||
|
||||
{% codeInfo srNumber=1 %}
|
||||
|
||||
* **import mysqlConnection from ...**: Import your connection from the JSON Schema file.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=2 %}
|
||||
|
||||
* **getDatabaseConfig**: In the `switch` statement within the `getDatabaseConfig` add a new case for your new Connector.
|
||||
|
||||
For example, if you were developing the `myOwnConnection.json` connector, you could add the following case:
|
||||
|
||||
```js
|
||||
case DatabaseServiceType.MyOwn: {
|
||||
schema = myOwnConnection;
|
||||
break;
|
||||
}
|
||||
```
|
||||
|
||||
where
|
||||
- **MyOwn**: Would be the Service Type defined on `myOwnConnection.json`
|
||||
- **myOWnConnection**: Would be the import startement
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% /codeInfoContainer %}
|
||||
|
||||
{% codeBlock fileName="DatabaseServiceUtils.ts" %}
|
||||
```js {% srNumber=1 %}
|
||||
...
|
||||
import mysqlConnection from '../jsons/connectionSchemas/connections/database/mysqlConnection.json';
|
||||
```
|
||||
```js {% srNumber=2 %}
|
||||
|
||||
export const getDatabaseConfig = (type: DatabaseServiceType) => {
|
||||
let schema = {};
|
||||
const uiSchema = { ...COMMON_UI_SCHEMA };
|
||||
switch (type as unknown as DatabaseServiceType) {
|
||||
...
|
||||
case DatabaseServiceType.Mysql: {
|
||||
schema = mysqlConnection;
|
||||
|
||||
break;
|
||||
}
|
||||
...
|
||||
default: {
|
||||
schema = {};
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return cloneDeep({ schema, uiSchema });
|
||||
};
|
||||
|
||||
```
|
||||
|
||||
{% /codeBlock %}
|
||||
{% /codePreview %}
|
||||
|
||||
## UI Documentation to follow along
|
||||
|
||||
If you pay attention when configuring a connector through the UI you will see that there is a follow along documentation to assist you.
|
||||
|
||||
In order to add this feature, you need to create a new file `YourConnector.md` within
|
||||
|
||||
[`openmetadata-ui/src/main/resources/ui/public/locales/en-US/{source_type}`](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-ui/src/main/resources/ui/public/locales/en-US), in the proper Source Type.
|
||||
|
||||
### Example - MySQL
|
||||
|
||||
{% note %}
|
||||
The file will be shortened and parts of it will be replaced with `...` for readability.
|
||||
{% /note %}
|
||||
|
||||
{% codePreview %}
|
||||
|
||||
{% codeInfoContainer %}
|
||||
|
||||
{% codeInfo srNumber=3 %}
|
||||
|
||||
First we give an overview about the Connector and any requirements or important information the user should know
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=4 %}
|
||||
|
||||
Within the `## Connection Details` section you can see that we use some special notation `$(id="{something}")`.
|
||||
|
||||
This is used to sync the documentation here with the property that the user is configuring at a given time (The "follow along" feature if you will).
|
||||
|
||||
In order to make it work properly you need to set the ID of each section to the property of the JSON Schema.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% /codeInfoContainer %}
|
||||
|
||||
{% codeBlock fileName="DatabaseServiceUtils.ts" %}
|
||||
|
||||
```md {% srNumber=3 %}
|
||||
# MySQL
|
||||
|
||||
In this section, we provide guides and references to use the MySQL connector.
|
||||
|
||||
## Requirements
|
||||
To extract metadata the user used in the connection needs to have access to the `INFORMATION_SCHEMA`. By default, a user can see only the rows in the `INFORMATION_SCHEMA` that correspond to objects for which the user has the proper access privileges.
|
||||
|
||||
~~~SQL
|
||||
-- Create user. If <hostName> is ommited, defaults to '%'
|
||||
-- More details https://dev.mysql.com/doc/refman/8.0/en/create-user.html
|
||||
CREATE USER '<username>'[@'<hostName>'] IDENTIFIED BY '<password>';
|
||||
|
||||
-- Grant select on a database
|
||||
GRANT SELECT ON world.* TO '<username>';
|
||||
|
||||
-- Grant select on a database
|
||||
GRANT SELECT ON world.* TO '<username>';
|
||||
|
||||
-- Grant select on a specific object
|
||||
GRANT SELECT ON world.hello TO '<username>';
|
||||
~~~
|
||||
|
||||
$$note
|
||||
OpenMetadata supports MySQL version `8.0.0` and up.
|
||||
$$
|
||||
|
||||
### Profiler & Data Quality
|
||||
Executing the profiler Workflow or data quality tests, will require the user to have `SELECT` permission on the tables/schemas where the profiler/tests will be executed. The user should also be allowed to view information in `tables` for all objects in the database. More information on the profiler workflow setup can be found [here](https://docs.open-metadata.org/connectors/ingestion/workflows/profiler) and data quality tests [here](https://docs.open-metadata.org/connectors/ingestion/workflows/data-quality).
|
||||
|
||||
You can find further information on the MySQL connector in the [docs](https://docs.open-metadata.org/connectors/database/mysql).
|
||||
|
||||
```
|
||||
```md {% srNumber=4 %}
|
||||
## Connection Details
|
||||
|
||||
$$section
|
||||
### Scheme $(id="scheme")
|
||||
SQLAlchemy driver scheme options. If you are unsure about this setting, you can use the default value.
|
||||
$$
|
||||
|
||||
$$section
|
||||
### Username $(id="username")
|
||||
Username to connect to MySQL. This user should have access to the `INFORMATION_SCHEMA` to extract metadata. Other workflows may require different permissions -- refer to the section above for more information.
|
||||
$$
|
||||
|
||||
...
|
||||
```
|
||||
|
||||
{% /codeBlock %}
|
||||
{% /codePreview %}
|
||||
|
||||
## Next Step
|
||||
|
||||
It is possible that you need to implement a small piece of Java code to make everything work perfectly depending on the Connection Schema.
|
||||
You can learn more about it in the next step.
|
||||
|
||||
{%inlineCallout
|
||||
color="violet-70"
|
||||
bold="Create the Java ClassConverter"
|
||||
icon="MdArrowForward"
|
||||
href="/developers/contribute/developing-a-new-connector/create-java-class-converter"%}
|
||||
Learn what is the Java ClassConverter and how to create it
|
||||
{%/inlineCallout%}
|
||||
@ -0,0 +1,178 @@
|
||||
---
|
||||
title: Create the Java ClassConverter
|
||||
slug: /developers/contribute/developing-a-new-connector/create-java-class-converter
|
||||
---
|
||||
|
||||
# Create the Java ClassConverter
|
||||
|
||||
**If and only if you had to use the `oneOf` property type on your connector's JSON Schema you also need to implement a Java ClassConverter to be able to instantiate the correct class from the configuration.**
|
||||
|
||||
Without this, Java doesn't know the proper Class to instantiate and it wouldn't work as expected.
|
||||
|
||||
{% note %}
|
||||
This is necessary even if you are indirectly using a `oneOf` property by referencing another JSON Schema that uses it.
|
||||
{% /note %}
|
||||
|
||||
|
||||
## Implementing your ClassConverter
|
||||
|
||||
In order to implement the `ClassConverter` you need to create a new file within
|
||||
|
||||
[`openmetadata-service/src/main/java/org/openmetadata/service/secrets/converter`](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-service/src/main/java/org/openmetadata/service/secrets/converter)
|
||||
|
||||
There you should create a new public class that extends `ClassConverter`. The easiest way to achieve this is to use another `ClassConverter` as a reference.
|
||||
|
||||
### Example - MysqlConnectionClassConverter.java
|
||||
|
||||
Here we will see how to create a ClassConverter for the MysqlConnection, where we define the `authType` using the `oneOf` attribute.
|
||||
|
||||
{% note %}
|
||||
The file will be shortened and parts of it will be replaced with `...` for readability.
|
||||
{% /note %}
|
||||
|
||||
{% codePreview %}
|
||||
|
||||
{% codeInfoContainer %}
|
||||
|
||||
{% codeInfo srNumber=1 %}
|
||||
|
||||
Before anything else you need to remember to import the needed classes.
|
||||
|
||||
In this example we need to import the `MysqlConnection` itself and both the `IamAuthConfig` and `basicAuth`. It is important to remember that this classes are generated from the JSON Schema and can be found within `openmetadata-spec/target/classes/org/openmetadata/schema/services/connections`.
|
||||
|
||||
If you remember from [Define the JSON Schema](/developers/contribute/developing-a-new-connector/define-json-schema), the MysqlConnection uses `oneOf` to define the `authType` property:
|
||||
|
||||
```json
|
||||
...
|
||||
"authType": {
|
||||
"title": "Auth Configuration Type",
|
||||
"description": "Choose Auth Config Type.",
|
||||
"oneOf": [
|
||||
{
|
||||
"$ref": "./common/basicAuth.json"
|
||||
},
|
||||
{
|
||||
"$ref": "./common/iamAuthConfig.json"
|
||||
}
|
||||
]
|
||||
},
|
||||
...
|
||||
```
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=2 %}
|
||||
|
||||
With the needed imports in place, now it is time to extend the `ClassConverter` class to create the `MysqlConnectionClassConverter`.
|
||||
|
||||
We are overriding the `convert` method and going the following:
|
||||
1. Creating a `MysqlConnection` instance from the json object received
|
||||
2. Getting the `AuthType` configuration and trying to use it to instantiate either a `basicAuth` or a `IamAuthConfig`. The first success will be returned.
|
||||
3. We set the `AuthType` to be this newly instantaited class
|
||||
4. We return the `MysqlConnection` instance.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% /codeInfoContainer %}
|
||||
|
||||
{% codeBlock fileName="DatabaseServiceUtils.ts" %}
|
||||
|
||||
```java {% srNumber=1 %}
|
||||
...
|
||||
package org.openmetadata.service.secrets.converter;
|
||||
|
||||
import java.util.List;
|
||||
import org.openmetadata.schema.services.connections.database.MysqlConnection;
|
||||
import org.openmetadata.schema.services.connections.database.common.IamAuthConfig;
|
||||
import org.openmetadata.schema.services.connections.database.common.basicAuth;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
```
|
||||
```java {% srNumber=2 %}
|
||||
|
||||
/** Converter class to get an `MysqlConnectionClassConverter` object. */
|
||||
public class MysqlConnectionClassConverter extends ClassConverter {
|
||||
|
||||
private static final List<Class<?>> CONFIG_SOURCE_CLASSES =
|
||||
List.of(basicAuth.class, IamAuthConfig.class);
|
||||
|
||||
public MysqlConnectionClassConverter() {
|
||||
super(MysqlConnection.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object convert(Object object) {
|
||||
MysqlConnection mysqlConnection = (MysqlConnection) JsonUtils.convertValue(object, this.clazz);
|
||||
|
||||
tryToConvert(mysqlConnection.getAuthType(), CONFIG_SOURCE_CLASSES)
|
||||
.ifPresent(mysqlConnection::setAuthType);
|
||||
|
||||
return mysqlConnection;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
{% /codeBlock %}
|
||||
{% /codePreview %}
|
||||
|
||||
## Making your ClassConverter visible
|
||||
|
||||
Now that your ClassConverter is implemented you need to add it to the [`ClassconverterFactory.java`](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-service/src/main/java/org/openmetadata/service/secrets/converter/ClassConverterFactory.java) file, located in the same path.
|
||||
|
||||
### Example - MysqlConnectionClassconverter
|
||||
|
||||
{% note %}
|
||||
The file will be shortened and parts of it will be replaced with `...` for readability.
|
||||
{% /note %}
|
||||
|
||||
{% codePreview %}
|
||||
|
||||
{% codeInfoContainer %}
|
||||
|
||||
{% codeInfo srNumber=3 %}
|
||||
|
||||
Before anything else you need to remember to import your `ClassConverter`
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=4 %}
|
||||
|
||||
Now you just need to add a new `Map.entry` to the `converterMap`.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% /codeInfoContainer %}
|
||||
|
||||
{% codeBlock fileName="DatabaseServiceUtils.ts" %}
|
||||
|
||||
```java {% srNumber=3 %}
|
||||
...
|
||||
import org.openmetadata.schema.services.connections.database.MysqlConnection;
|
||||
...
|
||||
```
|
||||
```java {% srNumber=4 %}
|
||||
public final class ClassConverterFactory {
|
||||
...
|
||||
static {
|
||||
converterMap =
|
||||
Map.ofEntries(
|
||||
...
|
||||
Map.entry(MysqlConnection.class, new MysqlConnectionClassConverter()));
|
||||
}
|
||||
...
|
||||
}
|
||||
```
|
||||
|
||||
{% /codeBlock %}
|
||||
{% /codePreview %}
|
||||
|
||||
## Next Step
|
||||
|
||||
Now that the code is ready, let's learn how to test it!
|
||||
|
||||
{%inlineCallout
|
||||
color="violet-70"
|
||||
bold="Test It"
|
||||
icon="MdArrowForward"
|
||||
href="/developers/contribute/developing-a-new-connector/test-it"%}
|
||||
Learn how to test your new connector!
|
||||
{%/inlineCallout%}
|
||||
@ -0,0 +1,380 @@
|
||||
---
|
||||
title: Define the JSON Schema
|
||||
slug: /developers/contribute/developing-a-new-connector/define-json-schema
|
||||
---
|
||||
|
||||
# Define the JSON Schema
|
||||
|
||||
The first step when creating a new connector is to create the [JSON Schema](https://json-schema.org/) definition for the connection itself.
|
||||
|
||||
This is a JSON file that declares the properties we need for the connection to work, and it will be mapped to a `Java Class` on the Server, a `Python Class` on the Ingestion Framework and a `Typescript Class` on the UI. By using it we can guarantee that everywhere we have the same definition.
|
||||
|
||||
These files can be found in the following path:
|
||||
|
||||
[`openmetadata-spec/src/main/resources/json/schema/entity/services/connections`](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections)
|
||||
|
||||
Here you can check what the different service connections look like and get some inspiration on how to create your own.
|
||||
|
||||
{% note %}
|
||||
|
||||
**Breathe**
|
||||
|
||||
It can be overwhelming doing this for the first time, trying to reuse different schemas and get everything right.
|
||||
|
||||
It's a good idea to start little by little and repeat yourself while you get used to working with the definitions.
|
||||
|
||||
{% /note %}
|
||||
|
||||
## Connection File Anatomy
|
||||
|
||||
In order to go through the connection file anatomy, we are going to take a look at the [`mysqlConnection.json`](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mysqlConnection.json)
|
||||
|
||||
{% codePreview %}
|
||||
|
||||
{% codeInfoContainer %}
|
||||
|
||||
{% codeInfo srNumber=1 %}
|
||||
|
||||
* **$id**: Here we are basically referencing the file itself. You will need to change the path to the path for your connection.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=2 %}
|
||||
|
||||
* **title**: Here we need to define the name of the schema. The standard is to use the filename in camelcase. So if you are creating a connection called `ownConnection.json` the title would be `OwnConnection`.
|
||||
* **description**: Here we also add a small description that explains what the JSON Schema is for.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=3 %}
|
||||
|
||||
* **javaType**: Here we also need to define the javaType this JSONSchema will become. This will also depend on the connection name like the title.
|
||||
|
||||
{% note %}
|
||||
**Info**
|
||||
|
||||
Please note that the javaType path is similar to the filepath for the JSON Schema but not the same.
|
||||
|
||||
The standard is as follows:
|
||||
|
||||
|
||||
`org.openmetadata.schema.services.connections.{source_type}.{title}`
|
||||
|
||||
|
||||
where
|
||||
- `{source_type}` depends on the Connector you are building (Database, Dashboard, etc)
|
||||
- `{title}` is the title attribute from this json file.
|
||||
{% /note %}
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=4 %}
|
||||
|
||||
* **definitions**: Here you can place JSON Schemas that you can reference later within the `properties` attribute.
|
||||
On this connector we can see two different definitions:
|
||||
|
||||
- **mySQLType**: This definition is a standard for all connectors and it defines which is the Service Type for a given connection.
|
||||
|
||||
If you are creating a connection called `ownConnection.json` you could create a definition like:
|
||||
```json
|
||||
"ownType": {
|
||||
"description": "Service Type.",
|
||||
"type": "string",
|
||||
"enum": ["Own"],
|
||||
"default": "Own"
|
||||
}
|
||||
```
|
||||
|
||||
- **mySQLScheme**: This definition is specific for the connections that use [SQLAlchemy](https://www.sqlalchemy.org/) underneath and it is used to define which is the driver scheme to be used.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=5 %}
|
||||
|
||||
* **properties**: Here we actually define the attributes that our connection will have. In order to understand better what you need to define here we are going to go through a few of the attributes.
|
||||
|
||||
- **type**: As mentioned in the **definitions** section, we define the Service Type. But in order to actually use it we need to reference it in a property. This is exactly what we do here.
|
||||
|
||||
{% note %}
|
||||
In order to reference another JSON Schema we use the `$ref` attribute. This will basically put the entire JSON Schema in place and update/add any attributes defined here.
|
||||
{% /note %}
|
||||
|
||||
- **authType**: This property is insteresting because it allows us to showcase two different features.
|
||||
- **$ref**: As explained above, this attribute is used to reference another JSON Schema. But in this case you can see it being used within the **oneOf** attribute referencing an external JSON Schema and not a definition.
|
||||
|
||||
{% note %}
|
||||
When referencing a definition we use the following pattern: `#/definitions/myDefinition`
|
||||
When referencing an external JSONSchema we use relative paths: `../common/ownSchema.json`
|
||||
{% /note %}
|
||||
|
||||
- **oneOf**: This property allows us to actually have a list of different types that are valid. It is used when there are multiple different ways a configuration might appear.
|
||||
|
||||
On this example we can see it references both `./common/basicAuth.json` and `./common/iamAuthConfig.json`.
|
||||
It is this way because we could Authenticate to MySQL either by using the `basicAuth` (Username/Password) or by using `iamAuth` if we are actually running [MySQL as a RDS in AWS](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/UsingWithRDS.IAMDBAuth.html)
|
||||
|
||||
- **supportsMetadataExtraction**: We can also see a couple of different properties that showcase the features this connector supports (**supportsMetadataExtraction**, **supportsDBTExtraction**, **supportsProfiler**, **supportsQueryComment**) They are all different features from OpenMetadata that are not necessarily supported by all connectors.
|
||||
|
||||
The most basic case is **supportsMetadataExtraction** and we should always start from there.
|
||||
|
||||
{% note %}
|
||||
Here we can also see `$ref` being used to reference a `definition` on another schema: `../connectionBasicType.json#/definitions/supportsMetadataExtraction`
|
||||
{% /note %}
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=6 %}
|
||||
|
||||
* **additionalProperties**: To avoid werid behavior, we always prevent additionalProperties to be passed to the schema by setting this parameter to false.
|
||||
|
||||
* **required**: Here we can define any properties that are always required or the schema would be invalid otherwise
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% /codeInfoContainer %}
|
||||
|
||||
{% codeBlock fileName="mysqlConnection.json" %}
|
||||
|
||||
```json
|
||||
{
|
||||
```
|
||||
```json {% srNumber=1 %}
|
||||
"$id": "https://open-metadata.org/schema/entity/services/connections/database/mysqlConnection.json",
|
||||
```
|
||||
```json
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
```
|
||||
```json {% srNumber=2 %}
|
||||
"title": "MysqlConnection",
|
||||
"description": "Mysql Database Connection Config",
|
||||
```
|
||||
```json
|
||||
"type": "object",
|
||||
```
|
||||
```json {% srNumber=3 %}
|
||||
"javaType": "org.openmetadata.schema.services.connections.database.MysqlConnection",
|
||||
```
|
||||
```json {% srNumber=4 %}
|
||||
"definitions": {
|
||||
"mySQLType": {
|
||||
"description": "Service type.",
|
||||
"type": "string",
|
||||
"enum": ["Mysql"],
|
||||
"default": "Mysql"
|
||||
},
|
||||
"mySQLScheme": {
|
||||
"description": "SQLAlchemy driver scheme options.",
|
||||
"type": "string",
|
||||
"enum": ["mysql+pymysql"],
|
||||
"default": "mysql+pymysql"
|
||||
}
|
||||
},
|
||||
```
|
||||
```json {% srNumber=5 %}
|
||||
"properties": {
|
||||
"type": {
|
||||
"title": "Service Type",
|
||||
"description": "Service Type",
|
||||
"$ref": "#/definitions/mySQLType",
|
||||
"default": "Mysql"
|
||||
},
|
||||
"scheme": {
|
||||
"title": "Connection Scheme",
|
||||
"description": "SQLAlchemy driver scheme options.",
|
||||
"$ref": "#/definitions/mySQLScheme",
|
||||
"default": "mysql+pymysql"
|
||||
},
|
||||
"username": {
|
||||
"title": "Username",
|
||||
"description": "Username to connect to MySQL. This user should have privileges to read all the metadata in Mysql.",
|
||||
"type": "string"
|
||||
},
|
||||
"authType": {
|
||||
"title": "Auth Configuration Type",
|
||||
"description": "Choose Auth Config Type.",
|
||||
"oneOf": [
|
||||
{
|
||||
"$ref": "./common/basicAuth.json"
|
||||
},
|
||||
{
|
||||
"$ref": "./common/iamAuthConfig.json"
|
||||
}
|
||||
]
|
||||
},
|
||||
"hostPort": {
|
||||
"title": "Host and Port",
|
||||
"description": "Host and port of the MySQL service.",
|
||||
"type": "string"
|
||||
},
|
||||
"databaseName": {
|
||||
"title": "Database Name",
|
||||
"description": "Optional name to give to the database in OpenMetadata. If left blank, we will use default as the database name.",
|
||||
"type": "string"
|
||||
},
|
||||
"databaseSchema": {
|
||||
"title": "Database Schema",
|
||||
"description": "Database Schema of the data source. This is optional parameter, if you would like to restrict the metadata reading to a single schema. When left blank, OpenMetadata Ingestion attempts to scan all the schemas.",
|
||||
"type": "string"
|
||||
},
|
||||
"sslCA": {
|
||||
"title": "SSL CA",
|
||||
"description": "Provide the path to ssl ca file",
|
||||
"type": "string"
|
||||
},
|
||||
"sslCert": {
|
||||
"title": "SSL Client Certificate File",
|
||||
"description": "Provide the path to ssl client certificate file (ssl_cert)",
|
||||
"type": "string"
|
||||
},
|
||||
"sslKey": {
|
||||
"title": "SSL Client Key File",
|
||||
"description": "Provide the path to ssl client certificate file (ssl_key)",
|
||||
"type": "string"
|
||||
},
|
||||
"connectionOptions": {
|
||||
"title": "Connection Options",
|
||||
"$ref": "../connectionBasicType.json#/definitions/connectionOptions"
|
||||
},
|
||||
"connectionArguments": {
|
||||
"title": "Connection Arguments",
|
||||
"$ref": "../connectionBasicType.json#/definitions/connectionArguments"
|
||||
},
|
||||
"supportsMetadataExtraction": {
|
||||
"title": "Supports Metadata Extraction",
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
|
||||
},
|
||||
"supportsDBTExtraction": {
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsDBTExtraction"
|
||||
},
|
||||
"supportsProfiler": {
|
||||
"title": "Supports Profiler",
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
|
||||
},
|
||||
"supportsQueryComment": {
|
||||
"title": "Supports Query Comment",
|
||||
"$ref": "../connectionBasicType.json#/definitions/supportsQueryComment"
|
||||
},
|
||||
"sampleDataStorageConfig": {
|
||||
"title": "Storage Config for Sample Data",
|
||||
"$ref": "../connectionBasicType.json#/definitions/sampleDataStorageConfig"
|
||||
}
|
||||
},
|
||||
```
|
||||
```json {% srNumber=6 %}
|
||||
"additionalProperties": false,
|
||||
"required": ["hostPort", "username"]
|
||||
```
|
||||
```json
|
||||
}
|
||||
```
|
||||
|
||||
{% /codeBlock %}
|
||||
|
||||
{% /codePreview %}
|
||||
|
||||
## Making the new Connection configuration available to the Service
|
||||
|
||||
Once the connection file is properly created, we still need to take one extra step to make it available for the Service.
|
||||
|
||||
{% note %}
|
||||
**Note**
|
||||
|
||||
The connection is part of a Service (Dashboard, Database, Messaging, etc) and this step should be done on the correct service.
|
||||
{% /note %}
|
||||
|
||||
Following with the `mysqlConnection.json` example, we now need to make it available to the `Database Service` by updating the [`databaseService.json`](https://github.com/open-metadata/OpenMetadata/blob/main/openmetadata-spec/src/main/resources/json/schema/entity/services/databaseService.json) file within [`openmetadata-spec/src/main/resources/json/schema/entity/services`](https://github.com/open-metadata/OpenMetadata/tree/main/openmetadata-spec/src/main/resources/json/schema/entity/services)
|
||||
{% note %}
|
||||
The file will be shortened and parts of it will be replaced with `...` for readability.
|
||||
{% /note %}
|
||||
|
||||
{% codePreview %}
|
||||
|
||||
{% codeInfoContainer %}
|
||||
|
||||
{% codeInfo srNumber=7 %}
|
||||
|
||||
* **databaseServiceType**: Here we need to add our connector type to the `enum` and `javaEnums` properties. It should be the same value as the `type` property that we defined on the JSON Schema.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
{% codeInfo srNumber=8 %}
|
||||
|
||||
* **databaseConnection**: Here we need to point to our JSON Schema within the `config` property by adding it to the `oneOf` list.
|
||||
|
||||
{% /codeInfo %}
|
||||
|
||||
|
||||
{% /codeInfoContainer %}
|
||||
|
||||
{% codeBlock fileName="mysqlConnection.json" %}
|
||||
```json
|
||||
{
|
||||
"$id": "https://open-metadata.org/schema/entity/services/databaseService.json",
|
||||
"$schema": "http://json-schema.org/draft-07/schema#",
|
||||
"title": "Database Service",
|
||||
"description": "This schema defines the `Database Service` is a service such as MySQL, BigQuery, Redshift, Postgres, or Snowflake. Alternative terms such as Database Cluster, Database Server instance are also used for database service.",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.entity.services.DatabaseService",
|
||||
"javaInterfaces": [
|
||||
"org.openmetadata.schema.EntityInterface",
|
||||
"org.openmetadata.schema.ServiceEntityInterface"
|
||||
],
|
||||
"definitions": {
|
||||
```
|
||||
```json {% srNumber=7 %}
|
||||
"databaseServiceType": {
|
||||
"description": "Type of database service such as MySQL, BigQuery, Snowflake, Redshift, Postgres...",
|
||||
"javaInterfaces": ["org.openmetadata.schema.EnumInterface"],
|
||||
"type": "string",
|
||||
"enum": [
|
||||
...
|
||||
"Mysql"
|
||||
],
|
||||
"javaEnums": [
|
||||
...
|
||||
{
|
||||
"name": "Mysql"
|
||||
}
|
||||
]
|
||||
},
|
||||
```
|
||||
```json {% srNumber=8 %}
|
||||
"databaseConnection": {
|
||||
"type": "object",
|
||||
"description": "Database Connection.",
|
||||
"javaInterfaces": [
|
||||
"org.openmetadata.schema.ServiceConnectionEntityInterface"
|
||||
],
|
||||
"properties": {
|
||||
"config": {
|
||||
"mask": true,
|
||||
"oneOf": [
|
||||
...
|
||||
{
|
||||
"$ref": "./connections/database/mysqlConnection.json"
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
}
|
||||
```
|
||||
```json
|
||||
},
|
||||
...
|
||||
}
|
||||
```
|
||||
{% /codeBlock %}
|
||||
|
||||
{% /codePreview %}
|
||||
|
||||
## Next Step
|
||||
|
||||
Now that you have your Connection defined in the JSON Schema, we can proceed to actually implement the Python Code to perform the Ingestion.
|
||||
|
||||
{%inlineCallout
|
||||
color="violet-70"
|
||||
bold="Develop the Ingesion Code"
|
||||
icon="MdArrowForward"
|
||||
href="/developers/contribute/developing-a-new-connector/develop-ingestion-code"%}
|
||||
Learn what you need to implement for the Connector's logic
|
||||
{%/inlineCallout%}
|
||||
@ -0,0 +1,560 @@
|
||||
---
|
||||
title: Develop the Ingestion Code
|
||||
slug: /developers/contribute/developing-a-new-connector/develop-ingestion-code
|
||||
---
|
||||
|
||||
# Develop the Ingestion Code
|
||||
|
||||
We recommend you to take some time to understand how the Ingestion Framework works by reading [this small article](https://blog.open-metadata.org/how-we-built-the-ingestion-framework-1af0b6ff5c81).
|
||||
|
||||
The main takes for developing a new connector are:
|
||||
- To understand that each of our Source Types (Databases, Dashboards, etc) have a Topology attached.
|
||||
- To understand that the process flow is implemented as a generator chain, going through each step.
|
||||
|
||||
## Service Topology
|
||||
|
||||
The Topology defines a series of Nodes and Stages that get executed in a hierarchical way and describe how we extract the needed data from the sources.
|
||||
|
||||
Starting from the Root node we process the entities in a depth first approach, following the topology tree through the node's children.
|
||||
|
||||
From the Service Topology you can understand what methods you need to implement:
|
||||
- **producer**: Methods that will fetch the entities we need to process
|
||||
- **processor**: Methods that will `yield` a given `Entity`
|
||||
- **post_process**: Methods that will `yield` a given `Entity` but are ran after all entities from that node were processed.
|
||||
|
||||
### Example - DatabaseServiceTopology
|
||||
|
||||
Can be found in [`ingestion/src/metadata/ingestion/source/database/database_service.py`](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/source/database/database_service.py)
|
||||
|
||||
```python
|
||||
class DatabaseServiceTopology(ServiceTopology):
|
||||
"""
|
||||
Defines the hierarchy in Database Services.
|
||||
service -> db -> schema -> table.
|
||||
|
||||
We could have a topology validator. We can only consume
|
||||
data that has been produced by any parent node.
|
||||
"""
|
||||
|
||||
root = TopologyNode(
|
||||
producer="get_services",
|
||||
stages=[
|
||||
NodeStage(
|
||||
type_=DatabaseService,
|
||||
context="database_service",
|
||||
processor="yield_create_request_database_service",
|
||||
overwrite=False,
|
||||
must_return=True,
|
||||
cache_entities=True,
|
||||
),
|
||||
],
|
||||
children=["database"],
|
||||
# Note how we have `yield_view_lineage` and `yield_stored_procedure_lineage`
|
||||
# as post_processed. This is because we cannot ensure proper lineage processing
|
||||
# until we have finished ingesting all the metadata from the source.
|
||||
post_process=["yield_view_lineage", "yield_procedure_lineage_and_queries"],
|
||||
)
|
||||
database = TopologyNode(
|
||||
producer="get_database_names",
|
||||
stages=[
|
||||
NodeStage(
|
||||
type_=OMetaTagAndClassification,
|
||||
context="tags",
|
||||
processor="yield_database_tag_details",
|
||||
nullable=True,
|
||||
store_all_in_context=True,
|
||||
),
|
||||
NodeStage(
|
||||
type_=Database,
|
||||
context="database",
|
||||
processor="yield_database",
|
||||
consumer=["database_service"],
|
||||
cache_entities=True,
|
||||
use_cache=True,
|
||||
),
|
||||
],
|
||||
children=["databaseSchema"],
|
||||
)
|
||||
databaseSchema = TopologyNode(
|
||||
producer="get_database_schema_names",
|
||||
stages=[
|
||||
NodeStage(
|
||||
type_=OMetaTagAndClassification,
|
||||
context="tags",
|
||||
processor="yield_database_schema_tag_details",
|
||||
nullable=True,
|
||||
store_all_in_context=True,
|
||||
),
|
||||
NodeStage(
|
||||
type_=DatabaseSchema,
|
||||
context="database_schema",
|
||||
processor="yield_database_schema",
|
||||
consumer=["database_service", "database"],
|
||||
cache_entities=True,
|
||||
use_cache=True,
|
||||
),
|
||||
],
|
||||
children=["table", "stored_procedure"],
|
||||
post_process=["mark_tables_as_deleted", "mark_stored_procedures_as_deleted"],
|
||||
)
|
||||
table = TopologyNode(
|
||||
producer="get_tables_name_and_type",
|
||||
stages=[
|
||||
NodeStage(
|
||||
type_=OMetaTagAndClassification,
|
||||
context="tags",
|
||||
processor="yield_table_tag_details",
|
||||
nullable=True,
|
||||
store_all_in_context=True,
|
||||
),
|
||||
NodeStage(
|
||||
type_=Table,
|
||||
context="table",
|
||||
processor="yield_table",
|
||||
consumer=["database_service", "database", "database_schema"],
|
||||
use_cache=True,
|
||||
),
|
||||
NodeStage(
|
||||
type_=OMetaLifeCycleData,
|
||||
processor="yield_life_cycle_data",
|
||||
nullable=True,
|
||||
),
|
||||
],
|
||||
)
|
||||
stored_procedure = TopologyNode(
|
||||
producer="get_stored_procedures",
|
||||
stages=[
|
||||
NodeStage(
|
||||
type_=StoredProcedure,
|
||||
context="stored_procedures",
|
||||
processor="yield_stored_procedure",
|
||||
consumer=["database_service", "database", "database_schema"],
|
||||
store_all_in_context=True,
|
||||
store_fqn=True,
|
||||
use_cache=True,
|
||||
),
|
||||
],
|
||||
)
|
||||
```
|
||||
|
||||
## Service Source
|
||||
|
||||
Now that you understand how the Ingestion Process works, you need to understand the Service Source.
|
||||
|
||||
A Service Source is an abstract class that is the base for any Connector from that Source Type.
|
||||
They tend to have a lot of methods and are pretty overwhelming at first glance but you don't need to worry. You'll need to check which abstract methods you need to implement in your connector.
|
||||
|
||||
{% note %}
|
||||
**Hint**
|
||||
|
||||
You can start slow, yielding nothing with `yield from []` to see the whole flow running and then slowly implement the features you want.
|
||||
|
||||
On this note, also remember that you don't need to implement everything. You could contribute by start and implement just the Metadata Extraction features but without extracting Owner/Tags or deal with Lineage.
|
||||
{% /note %}
|
||||
|
||||
### Example - DatabaseServiceSource
|
||||
|
||||
Can be found in [`ingestion/src/metadata/ingestion/source/database/database_service.py`](https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/src/metadata/ingestion/source/database/database_service.py)
|
||||
```python
|
||||
class DatabaseServiceSource(
|
||||
TopologyRunnerMixin, Source, ABC
|
||||
): # pylint: disable=too-many-public-methods
|
||||
"""
|
||||
Base class for Database Services.
|
||||
It implements the topology and context.
|
||||
"""
|
||||
|
||||
source_config: DatabaseServiceMetadataPipeline
|
||||
config: WorkflowSource
|
||||
database_source_state: Set = set()
|
||||
stored_procedure_source_state: Set = set()
|
||||
# Big union of types we want to fetch dynamically
|
||||
service_connection: DatabaseConnection.__fields__["config"].type_
|
||||
|
||||
# When processing the database, the source will update the inspector if needed
|
||||
inspector: Inspector
|
||||
|
||||
topology = DatabaseServiceTopology()
|
||||
context = TopologyContext.create(topology)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.service_connection.type.name
|
||||
|
||||
def prepare(self):
|
||||
"""By default, there is no preparation needed"""
|
||||
|
||||
def get_services(self) -> Iterable[WorkflowSource]:
|
||||
yield self.config
|
||||
|
||||
def yield_create_request_database_service(
|
||||
self, config: WorkflowSource
|
||||
) -> Iterable[Either[CreateDatabaseServiceRequest]]:
|
||||
yield Either(
|
||||
right=self.metadata.get_create_service_from_source(
|
||||
entity=DatabaseService, config=config
|
||||
)
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
def get_database_names(self) -> Iterable[str]:
|
||||
"""
|
||||
Prepares the database name to be sent to stage.
|
||||
Filtering happens here.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_database_schema_names(self) -> Iterable[str]:
|
||||
"""
|
||||
Prepares the database schema name to be sent to stage.
|
||||
Filtering happens here.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_tables_name_and_type(self) -> Optional[Iterable[Tuple[str, str]]]:
|
||||
"""
|
||||
Prepares the table name to be sent to stage.
|
||||
Filtering happens here.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def yield_database(
|
||||
self, database_name: str
|
||||
) -> Iterable[Either[CreateDatabaseRequest]]:
|
||||
"""
|
||||
From topology.
|
||||
Prepare a database request and pass it to the sink.
|
||||
|
||||
Also, update the self.inspector value to the current db.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def yield_database_schema(
|
||||
self, schema_name: str
|
||||
) -> Iterable[Either[CreateDatabaseSchemaRequest]]:
|
||||
"""
|
||||
From topology.
|
||||
Prepare a database request and pass it to the sink.
|
||||
|
||||
Also, update the self.inspector value to the current db.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def yield_tag(
|
||||
self, schema_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
"""
|
||||
From topology. To be run for each schema
|
||||
"""
|
||||
|
||||
def yield_database_tag(
|
||||
self, database_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
"""
|
||||
From topology. To be run for each database
|
||||
"""
|
||||
|
||||
def yield_table_tags(
|
||||
self, table_name_and_type: Tuple[str, TableType]
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
"""
|
||||
From topology. To be run for each table
|
||||
"""
|
||||
|
||||
def yield_table_tag_details(
|
||||
self, table_name_and_type: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
"""
|
||||
From topology. To be run for each table
|
||||
"""
|
||||
if self.source_config.includeTags:
|
||||
yield from self.yield_table_tags(table_name_and_type) or []
|
||||
|
||||
def yield_database_schema_tag_details(
|
||||
self, schema_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
"""
|
||||
From topology. To be run for each schema
|
||||
"""
|
||||
if self.source_config.includeTags:
|
||||
yield from self.yield_tag(schema_name) or []
|
||||
|
||||
def yield_database_tag_details(
|
||||
self, database_name: str
|
||||
) -> Iterable[Either[OMetaTagAndClassification]]:
|
||||
"""
|
||||
From topology. To be run for each database
|
||||
"""
|
||||
if self.source_config.includeTags:
|
||||
yield from self.yield_database_tag(database_name) or []
|
||||
|
||||
@abstractmethod
|
||||
def yield_view_lineage(self) -> Iterable[Either[AddLineageRequest]]:
|
||||
"""
|
||||
From topology.
|
||||
Parses view definition to get lineage information
|
||||
"""
|
||||
|
||||
def update_table_constraints(
|
||||
self, table_constraints: List[TableConstraint], foreign_columns: []
|
||||
) -> List[TableConstraint]:
|
||||
"""
|
||||
process the table constraints of all tables
|
||||
transform SQLAlchemy returned foreign_columns into list of TableConstraint.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def yield_table(
|
||||
self, table_name_and_type: Tuple[str, TableType]
|
||||
) -> Iterable[Either[CreateTableRequest]]:
|
||||
"""
|
||||
From topology.
|
||||
Prepare a table request and pass it to the sink.
|
||||
|
||||
Also, update the self.inspector value to the current db.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def get_stored_procedures(self) -> Iterable[Any]:
|
||||
"""List stored procedures to process"""
|
||||
|
||||
@abstractmethod
|
||||
def yield_stored_procedure(
|
||||
self, stored_procedure: Any
|
||||
) -> Iterable[Either[CreateStoredProcedureRequest]]:
|
||||
"""Process the stored procedure information"""
|
||||
|
||||
@abstractmethod
|
||||
def yield_procedure_lineage_and_queries(
|
||||
self,
|
||||
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
|
||||
"""Extracts the lineage information from Stored Procedures"""
|
||||
|
||||
def get_raw_database_schema_names(self) -> Iterable[str]:
|
||||
"""
|
||||
fetch all schema names without any filtering.
|
||||
"""
|
||||
yield from self.get_database_schema_names()
|
||||
|
||||
def get_tag_by_fqn(self, entity_fqn: str) -> Optional[List[TagLabel]]:
|
||||
"""
|
||||
Pick up the tags registered in the context
|
||||
searching by entity FQN
|
||||
"""
|
||||
|
||||
tag_labels = []
|
||||
for tag_and_category in self.context.tags or []:
|
||||
if tag_and_category.fqn and tag_and_category.fqn.__root__ == entity_fqn:
|
||||
tag_label = get_tag_label(
|
||||
metadata=self.metadata,
|
||||
tag_name=tag_and_category.tag_request.name.__root__,
|
||||
classification_name=tag_and_category.classification_request.name.__root__,
|
||||
)
|
||||
if tag_label:
|
||||
tag_labels.append(tag_label)
|
||||
return tag_labels or None
|
||||
|
||||
def get_database_tag_labels(self, database_name: str) -> Optional[List[TagLabel]]:
|
||||
"""
|
||||
Method to get schema tags
|
||||
This will only get executed if the tags context
|
||||
is properly informed
|
||||
"""
|
||||
database_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Database,
|
||||
service_name=self.context.database_service,
|
||||
database_name=database_name,
|
||||
)
|
||||
return self.get_tag_by_fqn(entity_fqn=database_fqn)
|
||||
|
||||
def get_schema_tag_labels(self, schema_name: str) -> Optional[List[TagLabel]]:
|
||||
"""
|
||||
Method to get schema tags
|
||||
This will only get executed if the tags context
|
||||
is properly informed
|
||||
"""
|
||||
schema_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=DatabaseSchema,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=schema_name,
|
||||
)
|
||||
return self.get_tag_by_fqn(entity_fqn=schema_fqn)
|
||||
|
||||
def get_tag_labels(self, table_name: str) -> Optional[List[TagLabel]]:
|
||||
"""
|
||||
This will only get executed if the tags context
|
||||
is properly informed
|
||||
"""
|
||||
table_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Table,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
table_name=table_name,
|
||||
skip_es_search=True,
|
||||
)
|
||||
return self.get_tag_by_fqn(entity_fqn=table_fqn)
|
||||
|
||||
def get_column_tag_labels(
|
||||
self, table_name: str, column: dict
|
||||
) -> Optional[List[TagLabel]]:
|
||||
"""
|
||||
This will only get executed if the tags context
|
||||
is properly informed
|
||||
"""
|
||||
col_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Column,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
table_name=table_name,
|
||||
column_name=column["name"],
|
||||
)
|
||||
return self.get_tag_by_fqn(entity_fqn=col_fqn)
|
||||
|
||||
def register_record(self, table_request: CreateTableRequest) -> None:
|
||||
"""
|
||||
Mark the table record as scanned and update the database_source_state
|
||||
"""
|
||||
table_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=Table,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
table_name=table_request.name.__root__,
|
||||
skip_es_search=True,
|
||||
)
|
||||
|
||||
self.database_source_state.add(table_fqn)
|
||||
|
||||
def register_record_stored_proc_request(
|
||||
self, stored_proc_request: CreateStoredProcedureRequest
|
||||
) -> None:
|
||||
"""
|
||||
Mark the table record as scanned and update the database_source_state
|
||||
"""
|
||||
table_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=StoredProcedure,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=self.context.database_schema,
|
||||
procedure_name=stored_proc_request.name.__root__,
|
||||
)
|
||||
|
||||
self.stored_procedure_source_state.add(table_fqn)
|
||||
|
||||
def _get_filtered_schema_names(
|
||||
self, return_fqn: bool = False, add_to_status: bool = True
|
||||
) -> Iterable[str]:
|
||||
for schema_name in self.get_raw_database_schema_names():
|
||||
schema_fqn = fqn.build(
|
||||
self.metadata,
|
||||
entity_type=DatabaseSchema,
|
||||
service_name=self.context.database_service,
|
||||
database_name=self.context.database,
|
||||
schema_name=schema_name,
|
||||
)
|
||||
if filter_by_schema(
|
||||
self.source_config.schemaFilterPattern,
|
||||
schema_fqn if self.source_config.useFqnForFiltering else schema_name,
|
||||
):
|
||||
if add_to_status:
|
||||
self.status.filter(schema_fqn, "Schema Filtered Out")
|
||||
continue
|
||||
yield schema_fqn if return_fqn else schema_name
|
||||
|
||||
def get_owner_ref(self, table_name: str) -> Optional[EntityReference]:
|
||||
"""
|
||||
Method to process the table owners
|
||||
"""
|
||||
try:
|
||||
if self.source_config.includeOwners:
|
||||
owner_name = self.inspector.get_table_owner(
|
||||
connection=self.connection, # pylint: disable=no-member
|
||||
table_name=table_name,
|
||||
schema=self.context.database_schema,
|
||||
)
|
||||
owner_ref = self.metadata.get_reference_by_name(name=owner_name)
|
||||
return owner_ref
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(f"Error processing owner for table {table_name}: {exc}")
|
||||
return None
|
||||
|
||||
def mark_tables_as_deleted(self):
|
||||
"""
|
||||
Use the current inspector to mark tables as deleted
|
||||
"""
|
||||
if not self.context.__dict__.get("database"):
|
||||
raise ValueError(
|
||||
"No Database found in the context. We cannot run the table deletion."
|
||||
)
|
||||
|
||||
if self.source_config.markDeletedTables:
|
||||
logger.info(
|
||||
f"Mark Deleted Tables set to True. Processing database [{self.context.database}]"
|
||||
)
|
||||
schema_fqn_list = self._get_filtered_schema_names(
|
||||
return_fqn=True, add_to_status=False
|
||||
)
|
||||
|
||||
for schema_fqn in schema_fqn_list:
|
||||
yield from delete_entity_from_source(
|
||||
metadata=self.metadata,
|
||||
entity_type=Table,
|
||||
entity_source_state=self.database_source_state,
|
||||
mark_deleted_entity=self.source_config.markDeletedTables,
|
||||
params={"database": schema_fqn},
|
||||
)
|
||||
|
||||
def mark_stored_procedures_as_deleted(self):
|
||||
"""
|
||||
Use the current inspector to mark Stored Procedures as deleted
|
||||
"""
|
||||
if self.source_config.markDeletedStoredProcedures:
|
||||
logger.info(
|
||||
f"Mark Deleted Stored Procedures Processing database [{self.context.database}]"
|
||||
)
|
||||
|
||||
schema_fqn_list = self._get_filtered_schema_names(
|
||||
return_fqn=True, add_to_status=False
|
||||
)
|
||||
|
||||
for schema_fqn in schema_fqn_list:
|
||||
yield from delete_entity_from_source(
|
||||
metadata=self.metadata,
|
||||
entity_type=StoredProcedure,
|
||||
entity_source_state=self.stored_procedure_source_state,
|
||||
mark_deleted_entity=self.source_config.markDeletedStoredProcedures,
|
||||
params={"databaseSchema": schema_fqn},
|
||||
)
|
||||
|
||||
def yield_life_cycle_data(self, _) -> Iterable[Either[OMetaLifeCycleData]]:
|
||||
"""
|
||||
Get the life cycle data of the table
|
||||
"""
|
||||
|
||||
def test_connection(self) -> None:
|
||||
test_connection_fn = get_test_connection_fn(self.service_connection)
|
||||
test_connection_fn(self.metadata, self.connection_obj, self.service_connection)
|
||||
```
|
||||
|
||||
## Next Step
|
||||
|
||||
With the Code ready to go, we can now proceed to make a small change in the UI to be able to configure the Connector properly from there.
|
||||
|
||||
|
||||
{%inlineCallout
|
||||
color="violet-70"
|
||||
bold="Apply the UI Changes"
|
||||
icon="MdArrowForward"
|
||||
href="/developers/contribute/developing-a-new-connector/apply-ui-changes"%}
|
||||
Learn what you need to do to be able see the Connector properly in the UI
|
||||
{%/inlineCallout%}
|
||||
@ -0,0 +1,46 @@
|
||||
---
|
||||
title: Developing a new Connector
|
||||
slug: /developers/contribute/developing-a-new-connector
|
||||
---
|
||||
|
||||
# Developing a New Connector
|
||||
|
||||
## Overview
|
||||
|
||||
There is an ever-increasing source of metadata and it is possible that OpenMetadata doesn't already have implemented the one you need.
|
||||
|
||||
On this guide we will go through the steps needed in order to be able to contribute by developing a new connector yourself by using the already implemented MySQL connector as an example.
|
||||
|
||||
{% note %}
|
||||
|
||||
**Developing a new Connector vs Custom Connectors**
|
||||
|
||||
Developing a new connector makes sense if it could be used by many other users. **If you are dealing with a custom solution used only in your case, it makes more sense to actually create your own Custom Connector by following [this guide](/connectors/custom-connectors)**
|
||||
|
||||
{% /note %}
|
||||
|
||||
## Prerequisite
|
||||
|
||||
Before starting developing your own connector you need to have the developing environment properly setup and OpenMetadata up and running locally for testing purposes.
|
||||
|
||||
Please follow the instructions [here](/developers/contribute/build-code-and-run-tests)
|
||||
|
||||
## Steps
|
||||
|
||||
1. [Define the JSON Schema](/developers/contribute/developing-a-new-connector/define-json-schema)
|
||||
2. [Develop the Ingestion Code](/developers/contribute/developing-a-new-connector/develop-ingestion-code)
|
||||
3. [Apply UI Changes](/developers/contribute/developing-a-new-connector/apply-ui-changes)
|
||||
4. [Create the Java ClassConverter](/developers/contribute/developing-a-new-connector/create-java-class-converter)
|
||||
5. [Test it](/developers/contribute/developing-a-new-connector/test-it)
|
||||
6. [Update the Documentation](/developers/contribute/developing-a-new-connector/update-documentation)
|
||||
|
||||
## References
|
||||
|
||||
### Previous Webinars
|
||||
|
||||
There are some interesting webiners that cover an overview of the ingestion framework as well as a hands-on workshop for creating a new connector.
|
||||
|
||||
Note that while specific technicalities might be outdated (patterns for naming files or functions), the overall structure, flow and abstractions described in the videos are still relevant. For any nuances you can look at the current state of the ingestion code to see what is the best practice.
|
||||
|
||||
1. [How does the Ingestion Framework Work?](https://youtu.be/i7DhG_gZMmE)
|
||||
2. [How to Create your Own OpenMetadata Connector](https://youtu.be/ZvA4wuvINFA)
|
||||
@ -0,0 +1,49 @@
|
||||
---
|
||||
title: Test It
|
||||
slug: /developers/contribute/developing-a-new-connector/test-it
|
||||
---
|
||||
|
||||
# Test It
|
||||
|
||||
In order to test your new connector you need to run `make generate` from the project's root in order to generate the propert Python Classes from the JSON Schemas you created and modified.
|
||||
|
||||
## Unit Tests
|
||||
|
||||
If you want to test the whole package you could always run the following commands from the project's root:
|
||||
|
||||
```bash
|
||||
make install_test
|
||||
make coverage
|
||||
```
|
||||
|
||||
This could be slow and in order to iterate faster you could just run the tests you created for your connector by running `pytest {path_to_your_tests}`.
|
||||
|
||||
## Run the Connector from the CLI
|
||||
|
||||
In order to test the connector using the CLI you first need to have the OpenMetadata stack running locally.
|
||||
The easiest way to do is to check how to do it [here](/developers/contribute/build-code-and-run-tests).
|
||||
|
||||
With it up and running you can install the ingestion pacakge locally and use the CLI directly:
|
||||
|
||||
```bash
|
||||
metadata ingest -c {your_yaml_file}
|
||||
```
|
||||
|
||||
## Run the Connector from the UI
|
||||
|
||||
In order to test the connector using the UI you first need to have the OpenMetadata stack running locally.
|
||||
The easiest way to do is to check how to do it [here](/developers/contribute/build-code-and-run-tests).
|
||||
|
||||
With it up and running you can configure the connector from the UI itself.
|
||||
|
||||
## Next Step
|
||||
|
||||
Now that it's all working correctly, let's learn how to update the documentation for everyone else that will use the connector!
|
||||
|
||||
{%inlineCallout
|
||||
color="violet-70"
|
||||
bold="Update the Documentation"
|
||||
icon="MdArrowForward"
|
||||
href="/developers/contribute/developing-a-new-connector/update-documentation"%}
|
||||
Learn how to create the documentation for your new Connector
|
||||
{%/inlineCallout%}
|
||||
@ -0,0 +1,38 @@
|
||||
---
|
||||
title: Update the Documentation
|
||||
slug: /developers/contribute/developing-a-new-connector/update-documentation
|
||||
---
|
||||
|
||||
# Update the Documentation
|
||||
|
||||
One important part of developing a new connector is to document how it works after it is done.
|
||||
|
||||
The documentation for connectors can be found within `openmetadata-docs/content/v{version}-SNAPSHOT/connectors`
|
||||
where `{version}` depends on the time of reading.
|
||||
|
||||
There you need to create a new folder within the proper Source Type you are building a connector for (Database, Dashboard, MLModel, etc) and create two files:
|
||||
|
||||
- **index.md**: It explains how to configure the connector using the UI.
|
||||
- **yaml.md**: It explains how to configure the connector using a YAML file.
|
||||
|
||||
Again the best way to create the documentation is to use another connector's documentation as a base since they all follow the same structure.
|
||||
|
||||
Once the documentation is done, it's important to add it to the proper indexes and menus:
|
||||
|
||||
- `openmetadata-docs/content/v{version}-SNAPSHOT/menu.md`
|
||||
- `openmetadata-docs/content/v{version}-SNAPSHOT/connectors/index.md`
|
||||
- `openmetadata-docs/content/v{version}-SNAPSHOT/connectors/{source_type}/index.md`
|
||||
|
||||
This will guarantee that the connector is shown in the menus.
|
||||
|
||||
## How to test the Documentation
|
||||
|
||||
You can check your changes in the documentation by building it locally using `make docker-docs`. This will pull the OpenMetadata documentation Docker images and mount the project as a volume.
|
||||
|
||||
You should be able to see the documentation page on `http://localhost:3000`.
|
||||
|
||||
{% note %}
|
||||
**Attention**
|
||||
|
||||
Beware that any version that is suffixed with `-SNAPSHOT` is not shown. So in order to check it out you will need to remove the suffic and add it again afterwards.
|
||||
{% /note %}
|
||||
@ -451,7 +451,7 @@ site_menu:
|
||||
- category: Connectors / Database / BigTable / Run Externally
|
||||
url: /connectors/database/bigtable/yaml
|
||||
- category: Connectors / Database / BigTable / Roles
|
||||
url: /connectors/database/bigtable/roles
|
||||
url: /connectors/database/bigtable/roles
|
||||
- category: Connectors / Database / Clickhouse
|
||||
url: /connectors/database/clickhouse
|
||||
- category: Connectors / Database / Clickhouse / Run Externally
|
||||
@ -1807,6 +1807,20 @@ site_menu:
|
||||
url: /developers/contribute/build-code-and-run-tests/openmetadata-ui
|
||||
- category: Developers / Contribute / Build the Code and Run Tests / Cypress Integration Tests
|
||||
url: /developers/contribute/build-code-and-run-tests/cypress-integration-tests
|
||||
- category: Developers / Contribute / Developing a New Connector
|
||||
url: /developers/contribute/developing-a-new-connector
|
||||
- category: Developers / Contribute / Developing a New Connector / 1. Define the JSON Schema
|
||||
url: /developers/contribute/developing-a-new-connector/define-json-schema
|
||||
- category: Developers / Contribute / Developing a New Connector / 2. Develop the Ingestion Code
|
||||
url: /developers/contribute/developing-a-new-connector/develop-ingestion-code
|
||||
- category: Developers / Contribute / Developing a New Connector / 3. Apply UI Changes
|
||||
url: /developers/contribute/developing-a-new-connector/apply-ui-changes
|
||||
- category: Developers / Contribute / Developing a New Connector / 4. Create the Java ClassConverter
|
||||
url: /developers/contribute/developing-a-new-connector/create-java-class-converter
|
||||
- category: Developers / Contribute / Developing a New Connector / 5. Test it
|
||||
url: /developers/contribute/developing-a-new-connector/test-it
|
||||
- category: Developers / Contribute / Developing a New Connector / 6. Update the Documentation
|
||||
url: /developers/contribute/developing-a-new-connector/update-documentation
|
||||
- category: Developers / Contribute / UX Style Guide
|
||||
url: /developers/contribute/ux-style-guide
|
||||
- category: Developers / Webhooks
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user