mirror of
https://github.com/datahub-project/datahub.git
synced 2025-08-30 20:15:56 +00:00
1 line
17 KiB
JavaScript
1 line
17 KiB
JavaScript
"use strict";(self.webpackChunkdocs_website=self.webpackChunkdocs_website||[]).push([[7527],{15680:(e,t,n)=>{n.d(t,{xA:()=>d,yg:()=>f});var a=n(96540);function o(e,t,n){return t in e?Object.defineProperty(e,t,{value:n,enumerable:!0,configurable:!0,writable:!0}):e[t]=n,e}function r(e,t){var n=Object.keys(e);if(Object.getOwnPropertySymbols){var a=Object.getOwnPropertySymbols(e);t&&(a=a.filter((function(t){return Object.getOwnPropertyDescriptor(e,t).enumerable}))),n.push.apply(n,a)}return n}function i(e){for(var t=1;t<arguments.length;t++){var n=null!=arguments[t]?arguments[t]:{};t%2?r(Object(n),!0).forEach((function(t){o(e,t,n[t])})):Object.getOwnPropertyDescriptors?Object.defineProperties(e,Object.getOwnPropertyDescriptors(n)):r(Object(n)).forEach((function(t){Object.defineProperty(e,t,Object.getOwnPropertyDescriptor(n,t))}))}return e}function s(e,t){if(null==e)return{};var n,a,o=function(e,t){if(null==e)return{};var n,a,o={},r=Object.keys(e);for(a=0;a<r.length;a++)n=r[a],t.indexOf(n)>=0||(o[n]=e[n]);return o}(e,t);if(Object.getOwnPropertySymbols){var r=Object.getOwnPropertySymbols(e);for(a=0;a<r.length;a++)n=r[a],t.indexOf(n)>=0||Object.prototype.propertyIsEnumerable.call(e,n)&&(o[n]=e[n])}return o}var l=a.createContext({}),u=function(e){var t=a.useContext(l),n=t;return e&&(n="function"==typeof e?e(t):i(i({},t),e)),n},d=function(e){var t=u(e.components);return a.createElement(l.Provider,{value:t},e.children)},c="mdxType",p={inlineCode:"code",wrapper:function(e){var t=e.children;return a.createElement(a.Fragment,{},t)}},g=a.forwardRef((function(e,t){var n=e.components,o=e.mdxType,r=e.originalType,l=e.parentName,d=s(e,["components","mdxType","originalType","parentName"]),c=u(n),g=o,f=c["".concat(l,".").concat(g)]||c[g]||p[g]||r;return n?a.createElement(f,i(i({ref:t},d),{},{components:n})):a.createElement(f,i({ref:t},d))}));function f(e,t){var n=arguments,o=t&&t.mdxType;if("string"==typeof e||o){var r=n.length,i=new Array(r);i[0]=g;var s={};for(var l in t)hasOwnProperty.call(t,l)&&(s[l]=t[l]);s.originalType=e,s[c]="string"==typeof e?e:o,i[1]=s;for(var u=2;u<r;u++)i[u]=n[u];return a.createElement.apply(null,i)}return a.createElement.apply(null,n)}g.displayName="MDXCreateElement"},5924:(e,t,n)=>{n.r(t),n.d(t,{assets:()=>d,contentTitle:()=>l,default:()=>f,frontMatter:()=>s,metadata:()=>u,toc:()=>c});n(96540);var a=n(15680);function o(e,t,n){return t in e?Object.defineProperty(e,t,{value:n,enumerable:!0,configurable:!0,writable:!0}):e[t]=n,e}function r(e,t){return t=null!=t?t:{},Object.getOwnPropertyDescriptors?Object.defineProperties(e,Object.getOwnPropertyDescriptors(t)):function(e,t){var n=Object.keys(e);if(Object.getOwnPropertySymbols){var a=Object.getOwnPropertySymbols(e);t&&(a=a.filter((function(t){return Object.getOwnPropertyDescriptor(e,t).enumerable}))),n.push.apply(n,a)}return n}(Object(t)).forEach((function(n){Object.defineProperty(e,n,Object.getOwnPropertyDescriptor(t,n))})),e}function i(e,t){if(null==e)return{};var n,a,o=function(e,t){if(null==e)return{};var n,a,o={},r=Object.keys(e);for(a=0;a<r.length;a++)n=r[a],t.indexOf(n)>=0||(o[n]=e[n]);return o}(e,t);if(Object.getOwnPropertySymbols){var r=Object.getOwnPropertySymbols(e);for(a=0;a<r.length;a++)n=r[a],t.indexOf(n)>=0||Object.prototype.propertyIsEnumerable.call(e,n)&&(o[n]=e[n])}return o}const s={title:"Adding Stateful Ingestion to a Source",slug:"/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source",custom_edit_url:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source.md"},l="Adding Stateful Ingestion to a Source",u={unversionedId:"metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source",id:"version-1.1.0/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source",title:"Adding Stateful Ingestion to a Source",description:"Currently, datahub supports the Stale Metadata Removal and",source:"@site/versioned_docs/version-1.1.0/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source.md",sourceDirName:"metadata-ingestion/docs/dev_guides",slug:"/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source",permalink:"/docs/1.1.0/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source",draft:!1,editUrl:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source.md",tags:[],version:"1.1.0",frontMatter:{title:"Adding Stateful Ingestion to a Source",slug:"/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source",custom_edit_url:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source.md"},sidebar:"overviewSidebar",previous:{title:"Classification",permalink:"/docs/1.1.0/metadata-ingestion/docs/dev_guides/classification"},next:{title:"SQL Profiling",permalink:"/docs/1.1.0/metadata-ingestion/docs/dev_guides/sql_profiles"}},d={},c=[{value:"Adding Stale Metadata Removal to a Source",id:"adding-stale-metadata-removal-to-a-source",level:2},{value:"1. Modify the source config",id:"1-modify-the-source-config",level:3},{value:"2. Modify the source report",id:"2-modify-the-source-report",level:3},{value:"3. Modify the source",id:"3-modify-the-source",level:3},{value:"Adding Redundant Run Elimination to a Source",id:"adding-redundant-run-elimination-to-a-source",level:2},{value:"1. Modifying the SourceConfig",id:"1-modifying-the-sourceconfig",level:3},{value:"2. Modifying the SourceReport",id:"2-modifying-the-sourcereport",level:3},{value:"3. Modifying the Source",id:"3-modifying-the-source",level:3},{value:"3.1 Instantiate RedundantRunSkipHandler in the <code>__init__</code> method of the source.",id:"31-instantiate-redundantrunskiphandler-in-the-__init__-method-of-the-source",level:4},{value:"3.2 Checking if the current run should be skipped.",id:"32-checking-if-the-current-run-should-be-skipped",level:4},{value:"3.3 Updating the state for the current run.",id:"33-updating-the-state-for-the-current-run",level:4}],p={toc:c},g="wrapper";function f(e){var{components:t}=e,n=i(e,["components"]);return(0,a.yg)(g,r(function(e){for(var t=1;t<arguments.length;t++){var n=null!=arguments[t]?arguments[t]:{},a=Object.keys(n);"function"==typeof Object.getOwnPropertySymbols&&(a=a.concat(Object.getOwnPropertySymbols(n).filter((function(e){return Object.getOwnPropertyDescriptor(n,e).enumerable})))),a.forEach((function(t){o(e,t,n[t])}))}return e}({},p,n),{components:t,mdxType:"MDXLayout"}),(0,a.yg)("h1",{id:"adding-stateful-ingestion-to-a-source"},"Adding Stateful Ingestion to a Source"),(0,a.yg)("p",null,"Currently, datahub supports the ",(0,a.yg)("a",{parentName:"p",href:"/docs/1.1.0/metadata-ingestion/docs/dev_guides/stateful#stale-entity-removal"},"Stale Metadata Removal")," and\nthe ",(0,a.yg)("a",{parentName:"p",href:"/docs/1.1.0/metadata-ingestion/docs/dev_guides/stateful#redundant-run-elimination"},"Redunant Run Elimination")," use-cases on top of the more generic stateful ingestion\ncapability available for the sources. This document describes how to add support for these two use-cases to new sources."),(0,a.yg)("h2",{id:"adding-stale-metadata-removal-to-a-source"},"Adding Stale Metadata Removal to a Source"),(0,a.yg)("p",null,"Adding the stale metadata removal use-case to a new source involves modifying the source config, source report, and the source itself."),(0,a.yg)("p",null,"For a full example of all changes required: ",(0,a.yg)("a",{parentName:"p",href:"https://github.com/datahub-project/datahub/pull/9118"},"Adding stale metadata removal to the MongoDB source"),"."),(0,a.yg)("p",null,"The ",(0,a.yg)("a",{parentName:"p",href:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stale_entity_removal_handler.py"},"datahub.ingestion.source.state.stale_entity_removal_handler")," module provides the supporting infrastructure for all the steps described\nabove and substantially simplifies the implementation on the source side. Below is a detailed explanation of each of these\nsteps along with examples."),(0,a.yg)("h3",{id:"1-modify-the-source-config"},"1. Modify the source config"),(0,a.yg)("p",null,"The source's config must inherit from ",(0,a.yg)("inlineCode",{parentName:"p"},"StatefulIngestionConfigBase"),", and should declare a field named ",(0,a.yg)("inlineCode",{parentName:"p"},"stateful_ingestion")," of type ",(0,a.yg)("inlineCode",{parentName:"p"},"Optional[StatefulStaleMetadataRemovalConfig]"),"."),(0,a.yg)("p",null,"Example:"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},"from datahub.ingestion.source.state.stale_entity_removal_handler import (\n StatefulStaleMetadataRemovalConfig,\n StatefulIngestionConfigBase,\n)\n\nclass MySourceConfig(StatefulIngestionConfigBase):\n # ...<other config params>...\n\n stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = None\n")),(0,a.yg)("h3",{id:"2-modify-the-source-report"},"2. Modify the source report"),(0,a.yg)("p",null,"The report class of the source should inherit from ",(0,a.yg)("inlineCode",{parentName:"p"},"StaleEntityRemovalSourceReport")," instead of ",(0,a.yg)("inlineCode",{parentName:"p"},"SourceReport"),"."),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},"from datahub.ingestion.source.state.stale_entity_removal_handler import (\n StaleEntityRemovalSourceReport,\n)\n\n@dataclass\nclass MySourceReport(StatefulIngestionReport):\n # <other fields here>\n pass\n")),(0,a.yg)("h3",{id:"3-modify-the-source"},"3. Modify the source"),(0,a.yg)("ol",null,(0,a.yg)("li",{parentName:"ol"},"The source must inherit from ",(0,a.yg)("inlineCode",{parentName:"li"},"StatefulIngestionSourceBase")," instead of ",(0,a.yg)("inlineCode",{parentName:"li"},"Source"),"."),(0,a.yg)("li",{parentName:"ol"},"The source should contain a custom ",(0,a.yg)("inlineCode",{parentName:"li"},"get_workunit_processors")," method.")),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},"from datahub.ingestion.source.state.stateful_ingestion_base import StatefulIngestionSourceBase\nfrom datahub.ingestion.source.state.stale_entity_removal_handler import StaleEntityRemovalHandler\n\nclass MySource(StatefulIngestionSourceBase):\n def __init__(self, config: MySourceConfig, ctx: PipelineContext):\n super().__init__(config, ctx)\n\n self.config = config\n self.report = MySourceReport()\n\n # other initialization code here\n\n def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:\n return [\n *super().get_workunit_processors(),\n StaleEntityRemovalHandler.create(\n self, self.config, self.ctx\n ).workunit_processor,\n ]\n\n # other methods here\n")),(0,a.yg)("h2",{id:"adding-redundant-run-elimination-to-a-source"},"Adding Redundant Run Elimination to a Source"),(0,a.yg)("p",null,"This use-case applies to the sources that drive ingestion by querying logs over a specified duration via the config(such\nas snowflake usage, bigquery usage etc.). It typically involves expensive and long-running queries. To add redundant\nrun elimination to a new source to prevent the expensive reruns for the same time range(potentially due to a user\nerror or a scheduler malfunction), the following steps\nare required."),(0,a.yg)("ol",null,(0,a.yg)("li",{parentName:"ol"},"Update the ",(0,a.yg)("inlineCode",{parentName:"li"},"SourceConfig")),(0,a.yg)("li",{parentName:"ol"},"Update the ",(0,a.yg)("inlineCode",{parentName:"li"},"SourceReport")),(0,a.yg)("li",{parentName:"ol"},"Modify the ",(0,a.yg)("inlineCode",{parentName:"li"},"Source")," to",(0,a.yg)("ol",{parentName:"li"},(0,a.yg)("li",{parentName:"ol"},"Instantiate the RedundantRunSkipHandler object."),(0,a.yg)("li",{parentName:"ol"},"Check if the current run should be skipped."),(0,a.yg)("li",{parentName:"ol"},"Update the state for the current run(start & end times).")))),(0,a.yg)("p",null,"The ",(0,a.yg)("a",{parentName:"p",href:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py"},"datahub.ingestion.source.state.redundant_run_skip_handler"),"\nmodules provides the supporting infrastructure required for all the steps described above."),(0,a.yg)("p",null,"NOTE: The handler currently uses a simple state,\nthe ",(0,a.yg)("a",{parentName:"p",href:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/usage_common_state.py"},"BaseUsageCheckpointState"),",\nacross all sources it supports (unlike the StaleEntityRemovalHandler)."),(0,a.yg)("h3",{id:"1-modifying-the-sourceconfig"},"1. Modifying the SourceConfig"),(0,a.yg)("p",null,"The ",(0,a.yg)("inlineCode",{parentName:"p"},"SourceConfig")," must inherit from the ",(0,a.yg)("a",{parentName:"p",href:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/redundant_run_skip_handler.py#L23"},"StatefulRedundantRunSkipConfig")," class."),(0,a.yg)("p",null,"Examples:"),(0,a.yg)("ol",null,(0,a.yg)("li",{parentName:"ol"},"Snowflake Usage")),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},"from datahub.ingestion.source.state.redundant_run_skip_handler import (\n StatefulRedundantRunSkipConfig,\n)\nclass SnowflakeStatefulIngestionConfig(StatefulRedundantRunSkipConfig):\n pass\n")),(0,a.yg)("h3",{id:"2-modifying-the-sourcereport"},"2. Modifying the SourceReport"),(0,a.yg)("p",null,"The ",(0,a.yg)("inlineCode",{parentName:"p"},"SourceReport")," must inherit from the ",(0,a.yg)("a",{parentName:"p",href:"https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py#L102"},"StatefulIngestionReport")," class.\nExamples:"),(0,a.yg)("ol",null,(0,a.yg)("li",{parentName:"ol"},"Snowflake Usage")),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},"@dataclass\nclass SnowflakeUsageReport(BaseSnowflakeReport, StatefulIngestionReport):\n # <members specific to snowflake usage report>\n")),(0,a.yg)("h3",{id:"3-modifying-the-source"},"3. Modifying the Source"),(0,a.yg)("p",null,"The source must inherit from ",(0,a.yg)("inlineCode",{parentName:"p"},"StatefulIngestionSourceBase"),"."),(0,a.yg)("h4",{id:"31-instantiate-redundantrunskiphandler-in-the-__init__-method-of-the-source"},"3.1 Instantiate RedundantRunSkipHandler in the ",(0,a.yg)("inlineCode",{parentName:"h4"},"__init__")," method of the source."),(0,a.yg)("p",null,"The source should instantiate an instance of the ",(0,a.yg)("inlineCode",{parentName:"p"},"RedundantRunSkipHandler")," in its ",(0,a.yg)("inlineCode",{parentName:"p"},"__init__")," method.\nExamples:\nSnowflake Usage"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},"from datahub.ingestion.source.state.redundant_run_skip_handler import (\n RedundantRunSkipHandler,\n)\nclass SnowflakeUsageSource(StatefulIngestionSourceBase):\n\n def __init__(self, config: SnowflakeUsageConfig, ctx: PipelineContext):\n super(SnowflakeUsageSource, self).__init__(config, ctx)\n self.config: SnowflakeUsageConfig = config\n self.report: SnowflakeUsageReport = SnowflakeUsageReport()\n # Create and register the stateful ingestion use-case handlers.\n self.redundant_run_skip_handler = RedundantRunSkipHandler(\n source=self,\n config=self.config,\n pipeline_name=self.ctx.pipeline_name,\n run_id=self.ctx.run_id,\n )\n")),(0,a.yg)("h4",{id:"32-checking-if-the-current-run-should-be-skipped"},"3.2 Checking if the current run should be skipped."),(0,a.yg)("p",null,"The sources can query if the current run should be skipped using ",(0,a.yg)("inlineCode",{parentName:"p"},"should_skip_this_run")," method of ",(0,a.yg)("inlineCode",{parentName:"p"},"RedundantRunSkipHandler"),". This should done from the ",(0,a.yg)("inlineCode",{parentName:"p"},"get_workunits")," method, before doing any other work."),(0,a.yg)("p",null,"Example code:"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"},"def get_workunits(self) -> Iterable[MetadataWorkUnit]:\n # Skip a redundant run\n if self.redundant_run_skip_handler.should_skip_this_run(\n cur_start_time_millis=datetime_to_ts_millis(self.config.start_time)\n ):\n return\n # Generate the workunits.\n")),(0,a.yg)("h4",{id:"33-updating-the-state-for-the-current-run"},"3.3 Updating the state for the current run."),(0,a.yg)("p",null,"The source should use the ",(0,a.yg)("inlineCode",{parentName:"p"},"update_state")," method of ",(0,a.yg)("inlineCode",{parentName:"p"},"RedundantRunSkipHandler")," to update the current run's state if the run has not been skipped. This step can be performed in the ",(0,a.yg)("inlineCode",{parentName:"p"},"get_workunits")," if the run has not been skipped."),(0,a.yg)("p",null,"Example code:"),(0,a.yg)("pre",null,(0,a.yg)("code",{parentName:"pre",className:"language-python"}," def get_workunits(self) -> Iterable[MetadataWorkUnit]:\n # Skip a redundant run\n if self.redundant_run_skip_handler.should_skip_this_run(\n cur_start_time_millis=self.config.start_time\n ):\n return\n\n # Generate the workunits.\n # <code for generating the workunits>\n # Update checkpoint state for this run.\n self.redundant_run_skip_handler.update_state(\n start_time_millis=self.config.start_time,\n end_time_millis=self.config.end_time,\n )\n")))}f.isMDXComponent=!0}}]); |