mirror of
https://github.com/datahub-project/datahub.git
synced 2025-06-27 05:03:31 +00:00
feat: enable SCSI for datasets (#1986)
* enable SCSI for datasets * Update scsi-onboarding-guide.md
This commit is contained in:
parent
3407dab090
commit
70ddb09d29
1
docker/datahub-gms/env/docker.env
vendored
1
docker/datahub-gms/env/docker.env
vendored
@ -1,3 +1,4 @@
|
||||
DATASET_ENABLE_SCSI=false
|
||||
EBEAN_DATASOURCE_USERNAME=datahub
|
||||
EBEAN_DATASOURCE_PASSWORD=datahub
|
||||
EBEAN_DATASOURCE_HOST=mysql:3306
|
||||
|
@ -25,4 +25,19 @@ insert into metadata_aspect (urn, aspect, version, metadata, createdon, createdb
|
||||
'{"skills":[],"teams":[],"pictureLink":"https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web/packages/data-portal/public/assets/images/default_avatar.png"}',
|
||||
now(),
|
||||
'urn:li:principal:datahub'
|
||||
);
|
||||
);
|
||||
|
||||
-- create metadata index table
|
||||
CREATE TABLE metadata_index (
|
||||
`id` BIGINT NOT NULL AUTO_INCREMENT,
|
||||
`urn` VARCHAR(200) NOT NULL,
|
||||
`aspect` VARCHAR(150) NOT NULL,
|
||||
`path` VARCHAR(150) NOT NULL,
|
||||
`longVal` BIGINT,
|
||||
`stringVal` VARCHAR(200),
|
||||
`doubleVal` DOUBLE,
|
||||
CONSTRAINT id_pk PRIMARY KEY (id),
|
||||
INDEX longIndex (`urn`,`aspect`,`path`,`longVal`),
|
||||
INDEX stringIndex (`urn`,`aspect`,`path`,`stringVal`),
|
||||
INDEX doubleIndex (`urn`,`aspect`,`path`,`doubleVal`)
|
||||
);
|
||||
|
276
docs/how/scsi-onboarding-guide.md
Normal file
276
docs/how/scsi-onboarding-guide.md
Normal file
@ -0,0 +1,276 @@
|
||||
# How to onboard to Strongly Consistent Secondary Index (SCSI)?
|
||||
|
||||
## 1. Create urn path extractor for your entity
|
||||
This is to provide the parts of urn that need to be indexed as well as the logic to obtain the same from the urn. Refer to [DatasetUrnPathExtractor](https://github.com/linkedin/datahub/tree/master/gms/impl/src/main/java/com/linkedin/metadata/urn/dataset/DatasetUrnPathExtractor.java) as an example.
|
||||
|
||||
## 2. Add appropriate docker environment variable to enable SCSI for your entity
|
||||
Enable SCSI by adding your variable in docker environment [file](https://github.com/linkedin/datahub/tree/master/docker/datahub-gms/env/docker.env) of datahub-gms. Each entity has it's own environment variable. If corresponding variable of your entity is already defined in the docker environment file, then make sure it is set (in order to enable SCSI).
|
||||
|
||||
## 3. Enable SCSI in local DAO
|
||||
Import the docker environment variable in your local DAO factory to enable SCSI. Refer to [DatasetDaoFactory](https://github.com/linkedin/datahub/tree/master/gms/factories/src/main/java/com/linkedin/gms/factory/dataset/DatasetDaoFactory.java) as an example.
|
||||
|
||||
## 4. Define Storage Config and use while instantiating your DAO
|
||||
Other than the urn parts, you may want to index certain fields of an aspect. The indexable fields of aspects of a given entity are configured in a file in JSON format which must be provided during your local DAO instantiation. Refer to the storage config for [dataset](https://github.com/linkedin/datahub/tree/master/gms/factories/src/main/resources/datasetStorageConfig.json).
|
||||
|
||||
## 5. Bootstrap index table for existing urns
|
||||
If you have already enabled SCSI then the write path will ensure that every new urn inserted into the primary document store (i.e. `metadata_aspect` table), also gets inserted into the index table. However for urns that already exist in the `metadata_aspect` table, you will need to bootstrap the index table. Refer to the bootstrap [script](https://github.com/linkedin/datahub/tree/master/datahub/gms/database/scripts/index/dataset-bootstrap.sql) for datasets as an example.
|
||||
|
||||
## 6. Add finder method at the resource level
|
||||
[BaseEntityResource](https://github.com/linkedin/datahub-gma/blob/master/restli-resources/src/main/java/com/linkedin/metadata/restli/BaseEntityResource.java) currently exposes Finder resource method called filter that returns a list of entities that satisfy the filter conditions specified in query parameters. Please refer to [Datasets](https://github.com/linkedin/datahub/blob/master/gms/impl/src/main/java/com/linkedin/metadata/resources/dataset/Datasets.java) resource to understand how to override the filter method.
|
||||
Once you have the resource method defined, you could as well expose client methods that take different input arguments. Please refer to listUrnsFromIndex and filter methods in [Datasets](https://github.com/linkedin/datahub/blob/master/gms/client/src/main/java/com/linkedin/dataset/client/Datasets.java) client for reference.
|
||||
|
||||
Once you have onboarded to SCSI for your entity, you can test the changes as described below
|
||||
|
||||
## Testing your changes with some sample API calls
|
||||
|
||||
For the steps below, we assume you have already enabled SCSI by following the steps mentioned above.
|
||||
|
||||
Run the ingestion script if you haven't already using
|
||||
```
|
||||
./docker/ingestion/ingestion.sh
|
||||
```
|
||||
Connect to the MySQL server and you should be able to see the records.
|
||||
```
|
||||
mysql> select * from metadata_index;
|
||||
+----+--------------------------------------------------------------------+------------------------------------+------------------------+---------+---------------------------+-----------+
|
||||
| id | urn | aspect | path | longVal | stringVal | doubleVal |
|
||||
+----+--------------------------------------------------------------------+------------------------------------+------------------------+---------+---------------------------+-----------+
|
||||
| 1 | urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform/platformName | NULL | kafka | NULL |
|
||||
| 2 | urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /origin | NULL | PROD | NULL |
|
||||
| 3 | urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /datasetName | NULL | SampleKafkaDataset | NULL |
|
||||
| 4 | urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform | NULL | urn:li:dataPlatform:kafka | NULL |
|
||||
| 5 | urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform/platformName | NULL | hdfs | NULL |
|
||||
| 6 | urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /origin | NULL | PROD | NULL |
|
||||
| 7 | urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /datasetName | NULL | SampleHdfsDataset | NULL |
|
||||
| 8 | urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform | NULL | urn:li:dataPlatform:hdfs | NULL |
|
||||
| 9 | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform/platformName | NULL | hive | NULL |
|
||||
| 10 | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /origin | NULL | PROD | NULL |
|
||||
| 11 | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /datasetName | NULL | SampleHiveDataset | NULL |
|
||||
| 12 | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform | NULL | urn:li:dataPlatform:hive | NULL |
|
||||
+----+--------------------------------------------------------------------+------------------------------------+------------------------+---------+---------------------------+-----------+
|
||||
12 rows in set (0.01 sec)
|
||||
```
|
||||
|
||||
In the following section we will try some API calls, now that the urn parts are ingested
|
||||
|
||||
### Get list of dataset urns
|
||||
Note that the results are paginated
|
||||
|
||||
```
|
||||
curl "http://localhost:8080/datasets?q=filter&aspects=List()" -X GET -H 'X-RestLi-Protocol-Version: 2.0.0' -H 'X-RestLi-Method: finder' | jq
|
||||
|
||||
{
|
||||
"elements": [
|
||||
{
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)",
|
||||
"origin": "PROD",
|
||||
"name": "SampleHdfsDataset",
|
||||
"platform": "urn:li:dataPlatform:hdfs"
|
||||
},
|
||||
{
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
|
||||
"origin": "PROD",
|
||||
"name": "SampleHiveDataset",
|
||||
"platform": "urn:li:dataPlatform:hive"
|
||||
},
|
||||
{
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)",
|
||||
"origin": "PROD",
|
||||
"name": "SampleKafkaDataset",
|
||||
"platform": "urn:li:dataPlatform:kafka"
|
||||
}
|
||||
],
|
||||
"paging": {
|
||||
"count": 10,
|
||||
"start": 0,
|
||||
"links": []
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Get list of dataset urns after a given urn
|
||||
|
||||
```
|
||||
curl "http://localhost:8080/datasets?q=filter&aspects=List()&urn=urn%3Ali%3Adataset%3A%28urn%3Ali%3AdataPlatform%3Ahdfs%2CSampleHdfsDataset%2CPROD%29" -X GET -H 'X-RestLi-Protocol-Version: 2.0.0' -H 'X-RestLi-Method: finder' | jq
|
||||
|
||||
{
|
||||
"elements": [
|
||||
{
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
|
||||
"origin": "PROD",
|
||||
"name": "SampleHiveDataset",
|
||||
"platform": "urn:li:dataPlatform:hive"
|
||||
},
|
||||
{
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)",
|
||||
"origin": "PROD",
|
||||
"name": "SampleKafkaDataset",
|
||||
"platform": "urn:li:dataPlatform:kafka"
|
||||
}
|
||||
],
|
||||
"paging": {
|
||||
"count": 10,
|
||||
"start": 0,
|
||||
"links": []
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Get all datasets along with aspects `Status` and `Ownership` (if they exist)
|
||||
|
||||
```
|
||||
curl "http://localhost:8080/datasets?q=filter&aspects=List(com.linkedin.common.Status,com.linkedin.common.Ownership)" -X GET -H 'X-RestLi-Protocol-Version: 2.0.0' -H 'X-RestLi-Method: finder' | jq
|
||||
|
||||
{
|
||||
"elements": [
|
||||
{
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)",
|
||||
"ownership": {
|
||||
"owners": [
|
||||
{
|
||||
"owner": "urn:li:corpuser:jdoe",
|
||||
"type": "DATAOWNER"
|
||||
},
|
||||
{
|
||||
"owner": "urn:li:corpuser:datahub",
|
||||
"type": "DATAOWNER"
|
||||
}
|
||||
],
|
||||
"lastModified": {
|
||||
"actor": "urn:li:corpuser:jdoe",
|
||||
"time": 1581407189000
|
||||
}
|
||||
},
|
||||
"origin": "PROD",
|
||||
"name": "SampleHdfsDataset",
|
||||
"platform": "urn:li:dataPlatform:hdfs"
|
||||
},
|
||||
{
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
|
||||
"ownership": {
|
||||
"owners": [
|
||||
{
|
||||
"owner": "urn:li:corpuser:jdoe",
|
||||
"type": "DATAOWNER"
|
||||
},
|
||||
{
|
||||
"owner": "urn:li:corpuser:datahub",
|
||||
"type": "DATAOWNER"
|
||||
}
|
||||
],
|
||||
"lastModified": {
|
||||
"actor": "urn:li:corpuser:jdoe",
|
||||
"time": 1581407189000
|
||||
}
|
||||
},
|
||||
"origin": "PROD",
|
||||
"name": "SampleHiveDataset",
|
||||
"platform": "urn:li:dataPlatform:hive"
|
||||
},
|
||||
{
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)",
|
||||
"ownership": {
|
||||
"owners": [
|
||||
{
|
||||
"owner": "urn:li:corpuser:jdoe",
|
||||
"type": "DATAOWNER"
|
||||
},
|
||||
{
|
||||
"owner": "urn:li:corpuser:datahub",
|
||||
"type": "DATAOWNER"
|
||||
}
|
||||
],
|
||||
"lastModified": {
|
||||
"actor": "urn:li:corpuser:jdoe",
|
||||
"time": 1581407189000
|
||||
}
|
||||
},
|
||||
"origin": "PROD",
|
||||
"name": "SampleKafkaDataset",
|
||||
"platform": "urn:li:dataPlatform:kafka"
|
||||
}
|
||||
],
|
||||
"paging": {
|
||||
"count": 10,
|
||||
"start": 0,
|
||||
"links": []
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Testing the Storage Config:
|
||||
|
||||
The storage config for datasets looks like the following:
|
||||
```
|
||||
{
|
||||
"aspectStorageConfigMap": {
|
||||
"com.linkedin.common.Status": {
|
||||
"pathStorageConfigMap": {
|
||||
"/removed": {
|
||||
"strongConsistentSecondaryIndex": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
which means that the `removed` field of `Status` aspect should be indexed in SCSI.
|
||||
None of the dataset urns ingested so far, has a `Status` aspect. Let us try to ingest a new dataset, with several metadata aspects including the `Status` aspect
|
||||
|
||||
```
|
||||
curl 'http://localhost:8080/datasets?action=ingest' -X POST -H 'X-RestLi-Protocol-Version:2.0.0' --data '{"snapshot": {"aspects":[{"com.linkedin.common.Ownership":{"owners":[{"owner":"urn:li:corpuser:fbar","type":"DATAOWNER"}],"lastModified":{"time":0,"actor":"urn:li:corpuser:fbar"}}},{"com.linkedin.common.Status":{"removed":false}},{"com.linkedin.schema.SchemaMetadata":{"schemaName":"FooEvent","platform":"urn:li:dataPlatform:foo","version":0,"created":{"time":0,"actor":"urn:li:corpuser:fbar"},"lastModified":{"time":0,"actor":"urn:li:corpuser:fbar"},"hash":"","platformSchema":{"com.linkedin.schema.KafkaSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"MetadataChangeEvent\",\"namespace\":\"com.linkedin.mxe\",\"doc\":\"Kafka event for proposing a metadata change for an entity.\",\"fields\":[{\"name\":\"auditHeader\",\"type\":{\"type\":\"record\",\"name\":\"KafkaAuditHeader\",\"namespace\":\"com.linkedin.avro2pegasus.events\",\"doc\":\"Header\"}}]}"}},"fields":[{"fieldPath":"foo","description":"Bar","nativeDataType":"string","type":{"type":{"com.linkedin.schema.StringType":{}}}}]}}],"urn":"urn:li:dataset:(urn:li:dataPlatform:presto,SamplePrestoDataset,PROD)"}}'
|
||||
```
|
||||
|
||||
You should be able to see the urn parts of the newly ingested urn in the index table, along with the `removed` field of `Status` aspect.
|
||||
|
||||
```
|
||||
mysql> select * from metadata_index;
|
||||
+----+----------------------------------------------------------------------+------------------------------------+------------------------+---------+----------------------------+-----------+
|
||||
| id | urn | aspect | path | longVal | stringVal | doubleVal |
|
||||
+----+----------------------------------------------------------------------+------------------------------------+------------------------+---------+----------------------------+-----------+
|
||||
| 1 | urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform/platformName | NULL | kafka | NULL |
|
||||
| 2 | urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /origin | NULL | PROD | NULL |
|
||||
| 3 | urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /datasetName | NULL | SampleKafkaDataset | NULL |
|
||||
| 4 | urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform | NULL | urn:li:dataPlatform:kafka | NULL |
|
||||
| 5 | urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform/platformName | NULL | hdfs | NULL |
|
||||
| 6 | urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /origin | NULL | PROD | NULL |
|
||||
| 7 | urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /datasetName | NULL | SampleHdfsDataset | NULL |
|
||||
| 8 | urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform | NULL | urn:li:dataPlatform:hdfs | NULL |
|
||||
| 9 | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform/platformName | NULL | hive | NULL |
|
||||
| 10 | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /origin | NULL | PROD | NULL |
|
||||
| 11 | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /datasetName | NULL | SampleHiveDataset | NULL |
|
||||
| 12 | urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform | NULL | urn:li:dataPlatform:hive | NULL |
|
||||
| 13 | urn:li:dataset:(urn:li:dataPlatform:presto,SamplePrestoDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform/platformName | NULL | presto | NULL |
|
||||
| 14 | urn:li:dataset:(urn:li:dataPlatform:presto,SamplePrestoDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /origin | NULL | PROD | NULL |
|
||||
| 15 | urn:li:dataset:(urn:li:dataPlatform:presto,SamplePrestoDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /datasetName | NULL | SamplePrestoDataset | NULL |
|
||||
| 16 | urn:li:dataset:(urn:li:dataPlatform:presto,SamplePrestoDataset,PROD) | com.linkedin.common.urn.DatasetUrn | /platform | NULL | urn:li:dataPlatform:presto | NULL |
|
||||
| 17 | urn:li:dataset:(urn:li:dataPlatform:presto,SamplePrestoDataset,PROD) | com.linkedin.common.Status | /removed | NULL | false | NULL |
|
||||
+----+----------------------------------------------------------------------+------------------------------------+------------------------+---------+----------------------------+-----------+
|
||||
17 rows in set (0.00 sec)
|
||||
```
|
||||
|
||||
Next, let's try some API calls to test the filter conditions.
|
||||
|
||||
### Get all dataset urns that are non-removed i.e. `removed=false`
|
||||
|
||||
```
|
||||
curl "http://localhost:8080/datasets?q=filter&aspects=List()&filter=(criteria:List((aspect:com.linkedin.common.Status,pathParams:(path:%2Fremoved,value:("boolean":false)))))" -X GET -H 'X-RestLi-Protocol-Version: 2.0.0' -H 'X-RestLi-Method: finder' | jq
|
||||
|
||||
{
|
||||
"elements": [
|
||||
{
|
||||
"urn": "urn:li:dataset:(urn:li:dataPlatform:presto,SamplePrestoDataset,PROD)",
|
||||
"origin": "PROD",
|
||||
"name": "SamplePrestoDataset",
|
||||
"platform": "urn:li:dataPlatform:presto"
|
||||
}
|
||||
],
|
||||
"paging": {
|
||||
"count": 10,
|
||||
"start": 0,
|
||||
"links": []
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
You can try similar API calls to return metadata aspects of urns that meet the filter criteria.
|
@ -27,6 +27,26 @@
|
||||
} ]
|
||||
} ],
|
||||
"finders" : [ {
|
||||
"name" : "filter",
|
||||
"doc" : "Retrieves the values for multiple entities obtained after filtering urns from local secondary index. Here the value is\n made up of latest versions of specified aspects. If no aspects are provided, value model will not contain any metadata aspect.\n\n <p>If no filter conditions are provided, then it returns values of given entity type.",
|
||||
"parameters" : [ {
|
||||
"name" : "filter",
|
||||
"type" : "com.linkedin.metadata.query.IndexFilter",
|
||||
"optional" : true,
|
||||
"doc" : "{@link IndexFilter} that defines the filter conditions"
|
||||
}, {
|
||||
"name" : "aspects",
|
||||
"type" : "{ \"type\" : \"array\", \"items\" : \"string\" }",
|
||||
"optional" : true,
|
||||
"doc" : "list of aspects to be returned in the VALUE model"
|
||||
}, {
|
||||
"name" : "urn",
|
||||
"type" : "string",
|
||||
"optional" : true,
|
||||
"doc" : "last urn of the previous fetched page. For the first page, this should be set as NULL"
|
||||
} ],
|
||||
"pagingSupported" : true
|
||||
}, {
|
||||
"name" : "search",
|
||||
"parameters" : [ {
|
||||
"name" : "input",
|
||||
|
@ -1129,6 +1129,57 @@
|
||||
"doc" : "A list of criteria the filter applies to the query"
|
||||
} ]
|
||||
}, {
|
||||
"type" : "record",
|
||||
"name" : "IndexCriterion",
|
||||
"namespace" : "com.linkedin.metadata.query",
|
||||
"doc" : "A criterion for matching a field with given value",
|
||||
"fields" : [ {
|
||||
"name" : "aspect",
|
||||
"type" : "string",
|
||||
"doc" : "FQCN of the aspect class in the index table that this criterion refers to e.g. com.linkedin.common.Status"
|
||||
}, {
|
||||
"name" : "pathParams",
|
||||
"type" : {
|
||||
"type" : "record",
|
||||
"name" : "IndexPathParams",
|
||||
"doc" : "Model combining index path, value and the condition for the criterion to be satisfied",
|
||||
"fields" : [ {
|
||||
"name" : "path",
|
||||
"type" : "string",
|
||||
"doc" : "Corresponding path column of the index table that this criterion refers to e.g. /removed (corresponding to field \"removed\" of com.linkedin.common.Status aspect)"
|
||||
}, {
|
||||
"name" : "value",
|
||||
"type" : {
|
||||
"type" : "typeref",
|
||||
"name" : "IndexValue",
|
||||
"doc" : "A union of all supported value types in the index table",
|
||||
"ref" : [ "boolean", "double", "float", "int", "long", "string" ]
|
||||
},
|
||||
"doc" : "Value of the corresponding path of the aspect"
|
||||
}, {
|
||||
"name" : "condition",
|
||||
"type" : "Condition",
|
||||
"doc" : "Condition for the criterion to be satisfied e.g. EQUAL, START_WITH",
|
||||
"default" : "EQUAL"
|
||||
} ]
|
||||
},
|
||||
"doc" : "Corresponding path, value and condition that this criterion refers to",
|
||||
"optional" : true
|
||||
} ]
|
||||
}, {
|
||||
"type" : "record",
|
||||
"name" : "IndexFilter",
|
||||
"namespace" : "com.linkedin.metadata.query",
|
||||
"doc" : "Filters for finding records in the index table",
|
||||
"fields" : [ {
|
||||
"name" : "criteria",
|
||||
"type" : {
|
||||
"type" : "array",
|
||||
"items" : "IndexCriterion"
|
||||
},
|
||||
"doc" : "A list of criteria to filter records from the index table, AND being the logical operator"
|
||||
} ]
|
||||
}, "com.linkedin.metadata.query.IndexPathParams", "com.linkedin.metadata.query.IndexValue", {
|
||||
"type" : "record",
|
||||
"name" : "SearchResultMetadata",
|
||||
"namespace" : "com.linkedin.metadata.query",
|
||||
@ -1255,6 +1306,26 @@
|
||||
} ]
|
||||
} ],
|
||||
"finders" : [ {
|
||||
"name" : "filter",
|
||||
"doc" : "Retrieves the values for multiple entities obtained after filtering urns from local secondary index. Here the value is\n made up of latest versions of specified aspects. If no aspects are provided, value model will not contain any metadata aspect.\n\n <p>If no filter conditions are provided, then it returns values of given entity type.",
|
||||
"parameters" : [ {
|
||||
"name" : "filter",
|
||||
"type" : "com.linkedin.metadata.query.IndexFilter",
|
||||
"optional" : true,
|
||||
"doc" : "{@link IndexFilter} that defines the filter conditions"
|
||||
}, {
|
||||
"name" : "aspects",
|
||||
"type" : "{ \"type\" : \"array\", \"items\" : \"string\" }",
|
||||
"optional" : true,
|
||||
"doc" : "list of aspects to be returned in the VALUE model"
|
||||
}, {
|
||||
"name" : "urn",
|
||||
"type" : "string",
|
||||
"optional" : true,
|
||||
"doc" : "last urn of the previous fetched page. For the first page, this should be set as NULL"
|
||||
} ],
|
||||
"pagingSupported" : true
|
||||
}, {
|
||||
"name" : "search",
|
||||
"parameters" : [ {
|
||||
"name" : "input",
|
||||
|
@ -1,6 +1,7 @@
|
||||
package com.linkedin.dataset.client;
|
||||
|
||||
import com.linkedin.common.urn.DatasetUrn;
|
||||
import com.linkedin.common.urn.Urn;
|
||||
import com.linkedin.data.template.StringArray;
|
||||
import com.linkedin.dataset.Dataset;
|
||||
import com.linkedin.dataset.DatasetKey;
|
||||
@ -8,10 +9,12 @@ import com.linkedin.dataset.DatasetsDoAutocompleteRequestBuilder;
|
||||
import com.linkedin.dataset.DatasetsDoBrowseRequestBuilder;
|
||||
import com.linkedin.dataset.DatasetsDoGetBrowsePathsRequestBuilder;
|
||||
import com.linkedin.dataset.DatasetsDoGetSnapshotRequestBuilder;
|
||||
import com.linkedin.dataset.DatasetsFindByFilterRequestBuilder;
|
||||
import com.linkedin.dataset.DatasetsFindBySearchRequestBuilder;
|
||||
import com.linkedin.dataset.DatasetsRequestBuilders;
|
||||
import com.linkedin.metadata.query.AutoCompleteResult;
|
||||
import com.linkedin.metadata.query.BrowseResult;
|
||||
import com.linkedin.metadata.query.IndexFilter;
|
||||
import com.linkedin.metadata.query.SortCriterion;
|
||||
import com.linkedin.metadata.restli.BaseBrowsableClient;
|
||||
import com.linkedin.metadata.snapshot.DatasetSnapshot;
|
||||
@ -22,6 +25,8 @@ import com.linkedin.restli.client.GetRequest;
|
||||
import com.linkedin.restli.common.CollectionResponse;
|
||||
import com.linkedin.restli.common.ComplexResourceKey;
|
||||
import com.linkedin.restli.common.EmptyRecord;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
@ -132,6 +137,60 @@ public class Datasets extends BaseBrowsableClient<Dataset, DatasetUrn> {
|
||||
return _client.sendRequest(requestBuilder.build()).getResponse().getEntity();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets list of dataset urns from strongly consistent local secondary index
|
||||
*
|
||||
* @param lastUrn last dataset urn of the previous fetched page. For the first page, this will be NULL
|
||||
* @param size size of the page that needs to be fetched
|
||||
* @return list of dataset urns represented as {@link StringArray}
|
||||
* @throws RemoteInvocationException
|
||||
*/
|
||||
@Nonnull
|
||||
public List<String> listUrnsFromIndex(@Nullable DatasetUrn lastUrn, int size) throws RemoteInvocationException {
|
||||
return listUrnsFromIndex(null, lastUrn, size);
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets list of dataset urns from strongly consistent local secondary index given an {@link IndexFilter} specifying filter conditions
|
||||
*
|
||||
* @param indexFilter index filter that defines the filter conditions
|
||||
* @param lastUrn last dataset urn of the previous fetched page. For the first page, this will be NULL
|
||||
* @param size size of the page that needs to be fetched
|
||||
* @return list of dataset urns represented as {@link StringArray}
|
||||
* @throws RemoteInvocationException
|
||||
*/
|
||||
@Nonnull
|
||||
public List<String> listUrnsFromIndex(@Nullable IndexFilter indexFilter, @Nullable DatasetUrn lastUrn, int size)
|
||||
throws RemoteInvocationException {
|
||||
final List<Dataset> response = filter(indexFilter, Collections.emptyList(), lastUrn, size);
|
||||
return response.stream()
|
||||
.map(dataset -> new DatasetUrn(dataset.getPlatform(), dataset.getName(), dataset.getOrigin()))
|
||||
.map(Urn::toString)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets a list of {@link Dataset} whose raw metadata contains the list of dataset urns from strongly consistent
|
||||
* local secondary index that satisfy the filter conditions provided in {@link IndexFilter}
|
||||
*
|
||||
* @param indexFilter {@link IndexFilter} that specifies the filter conditions for urns to be fetched from secondary index
|
||||
* @param aspectNames list of aspects whose value should be retrieved
|
||||
* @param lastUrn last dataset urn of the previous fetched page. For the first page, this will be NULL
|
||||
* @param size size of the page that needs to be fetched
|
||||
* @return collection of {@link Dataset} whose raw metadata contains the list of filtered dataset urns
|
||||
* @throws RemoteInvocationException
|
||||
*/
|
||||
@Nonnull
|
||||
public List<Dataset> filter(@Nullable IndexFilter indexFilter, @Nullable List<String> aspectNames,
|
||||
@Nullable DatasetUrn lastUrn, int size) throws RemoteInvocationException {
|
||||
final DatasetsFindByFilterRequestBuilder requestBuilder =
|
||||
DATASETS_REQUEST_BUILDERS.findByFilter().filterParam(indexFilter).aspectsParam(aspectNames).paginate(0, size);
|
||||
if (lastUrn != null) {
|
||||
requestBuilder.urnParam(lastUrn.toString());
|
||||
}
|
||||
return _client.sendRequest(requestBuilder.build()).getResponseEntity().getElements();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets browse path(s) given dataset urn
|
||||
*
|
||||
|
@ -1,5 +1,6 @@
|
||||
package com.linkedin.gms.factory.dataset;
|
||||
|
||||
import com.linkedin.gms.factory.common.LocalDAOStorageConfigFactory;
|
||||
import com.linkedin.gms.factory.common.TopicConventionFactory;
|
||||
import com.linkedin.common.urn.DatasetUrn;
|
||||
import com.linkedin.metadata.aspect.DatasetAspect;
|
||||
@ -8,8 +9,10 @@ import com.linkedin.metadata.dao.EbeanLocalDAO;
|
||||
import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer;
|
||||
import com.linkedin.metadata.dao.producer.KafkaProducerCallback;
|
||||
import com.linkedin.metadata.snapshot.DatasetSnapshot;
|
||||
import com.linkedin.metadata.urn.dataset.DatasetUrnPathExtractor;
|
||||
import com.linkedin.mxe.TopicConvention;
|
||||
import io.ebean.config.ServerConfig;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
@ -20,6 +23,10 @@ import org.springframework.context.annotation.DependsOn;
|
||||
|
||||
@Configuration
|
||||
public class DatasetDaoFactory {
|
||||
|
||||
@Value("${DATASET_ENABLE_SCSI:false}")
|
||||
private boolean enableSCSI;
|
||||
|
||||
@Autowired
|
||||
ApplicationContext applicationContext;
|
||||
|
||||
@ -31,7 +38,12 @@ public class DatasetDaoFactory {
|
||||
applicationContext.getBean(Producer.class), applicationContext.getBean(TopicConvention.class),
|
||||
new KafkaProducerCallback());
|
||||
|
||||
return new EbeanLocalDAO<>(DatasetAspect.class, producer, applicationContext.getBean(ServerConfig.class),
|
||||
DatasetUrn.class);
|
||||
final EbeanLocalDAO<DatasetAspect, DatasetUrn> dao =
|
||||
new EbeanLocalDAO<>(producer, applicationContext.getBean(ServerConfig.class),
|
||||
LocalDAOStorageConfigFactory.getStorageConfig(DatasetAspect.class, DatasetDaoFactory.class,
|
||||
"datasetStorageConfig.json"), DatasetUrn.class);
|
||||
dao.setUrnPathExtractor(new DatasetUrnPathExtractor());
|
||||
dao.enableLocalSecondaryIndex(enableSCSI);
|
||||
return dao;
|
||||
}
|
||||
}
|
||||
|
11
gms/factories/src/main/resources/datasetStorageConfig.json
Normal file
11
gms/factories/src/main/resources/datasetStorageConfig.json
Normal file
@ -0,0 +1,11 @@
|
||||
{
|
||||
"aspectStorageConfigMap": {
|
||||
"com.linkedin.common.Status": {
|
||||
"pathStorageConfigMap": {
|
||||
"/removed": {
|
||||
"strongConsistentSecondaryIndex": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -19,6 +19,7 @@ import com.linkedin.metadata.dao.utils.ModelUtils;
|
||||
import com.linkedin.metadata.query.AutoCompleteResult;
|
||||
import com.linkedin.metadata.query.BrowseResult;
|
||||
import com.linkedin.metadata.query.Filter;
|
||||
import com.linkedin.metadata.query.IndexFilter;
|
||||
import com.linkedin.metadata.query.SearchResultMetadata;
|
||||
import com.linkedin.metadata.query.SortCriterion;
|
||||
import com.linkedin.metadata.restli.BackfillResult;
|
||||
@ -62,7 +63,7 @@ public final class Datasets extends BaseBrowsableEntityResource<
|
||||
// @formatter:on
|
||||
|
||||
public Datasets() {
|
||||
super(DatasetSnapshot.class, DatasetAspect.class);
|
||||
super(DatasetSnapshot.class, DatasetAspect.class, DatasetUrn.class);
|
||||
}
|
||||
|
||||
@Inject
|
||||
@ -229,6 +230,29 @@ public final class Datasets extends BaseBrowsableEntityResource<
|
||||
return super.search(input, aspectNames, filter, sortCriterion, pagingContext);
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the values for multiple entities obtained after filtering urns from local secondary index. Here the value is
|
||||
* made up of latest versions of specified aspects. If no aspects are provided, value model will not contain any metadata aspect.
|
||||
*
|
||||
* <p>If no filter conditions are provided, then it returns values of given entity type.
|
||||
*
|
||||
* @param indexFilter {@link IndexFilter} that defines the filter conditions
|
||||
* @param aspectNames list of aspects to be returned in the VALUE model
|
||||
* @param lastUrn last urn of the previous fetched page. For the first page, this should be set as NULL
|
||||
* @param pagingContext {@link PagingContext} defining the paging parameters of the request
|
||||
* @return list of values
|
||||
*/
|
||||
@Finder(FINDER_FILTER)
|
||||
@Override
|
||||
@Nonnull
|
||||
public Task<List<Dataset>> filter(
|
||||
@QueryParam(PARAM_FILTER) @Optional @Nullable IndexFilter indexFilter,
|
||||
@QueryParam(PARAM_ASPECTS) @Optional @Nullable String[] aspectNames,
|
||||
@QueryParam(PARAM_URN) @Optional @Nullable String lastUrn,
|
||||
@PagingContextParam @Nonnull PagingContext pagingContext) {
|
||||
return super.filter(indexFilter, aspectNames, lastUrn, pagingContext);
|
||||
}
|
||||
|
||||
@Action(name = ACTION_AUTOCOMPLETE)
|
||||
@Override
|
||||
@Nonnull
|
||||
|
@ -0,0 +1,24 @@
|
||||
package com.linkedin.metadata.urn.dataset;
|
||||
|
||||
import com.linkedin.common.urn.DatasetUrn;
|
||||
import com.linkedin.metadata.dao.scsi.UrnPathExtractor;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
|
||||
public final class DatasetUrnPathExtractor implements UrnPathExtractor<DatasetUrn> {
|
||||
@Nonnull
|
||||
@Override
|
||||
public Map<String, Object> extractPaths(@Nonnull DatasetUrn urn) {
|
||||
return Collections.unmodifiableMap(new HashMap<String, String>() {
|
||||
{
|
||||
put("/platform", urn.getPlatformEntity().toString());
|
||||
put("/datasetName", urn.getDatasetNameEntity());
|
||||
put("/origin", urn.getOriginEntity().toString());
|
||||
put("/platform/platformName", urn.getPlatformEntity().getPlatformNameEntity());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user