From 74d6d35881e7d4b3dd164ad69b671bd13595bb9c Mon Sep 17 00:00:00 2001 From: vanmeete Date: Fri, 29 Apr 2022 12:27:02 +0200 Subject: [PATCH] feat(ingestion): add Pulsar source (#4721) --- datahub-web-react/src/images/pulsarlogo.png | Bin 0 -> 27623 bytes metadata-ingestion/setup.py | 2 + metadata-ingestion/source_docs/pulsar.md | 176 +++++ .../src/datahub/ingestion/source/pulsar.py | 642 ++++++++++++++++++ .../datahub/ingestion/source_config/pulsar.py | 111 +++ .../datahub/ingestion/source_report/pulsar.py | 33 + .../tests/unit/test_pulsar_source.py | 239 +++++++ .../main/resources/boot/data_platforms.json | 10 + 8 files changed, 1213 insertions(+) create mode 100644 datahub-web-react/src/images/pulsarlogo.png create mode 100644 metadata-ingestion/source_docs/pulsar.md create mode 100644 metadata-ingestion/src/datahub/ingestion/source/pulsar.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source_report/pulsar.py create mode 100644 metadata-ingestion/tests/unit/test_pulsar_source.py diff --git a/datahub-web-react/src/images/pulsarlogo.png b/datahub-web-react/src/images/pulsarlogo.png new file mode 100644 index 0000000000000000000000000000000000000000..6f4095575b04cc085c35029585cd859f223b4629 GIT binary patch literal 27623 zcmdqJgsp^4W%9jx zE%A^l+Twj3m63-~^B!@!81%M4>`RBPuDFjSq0zM!WEC%O9bx_awUqaZjpJPKcHOIO zR1?eArp)FX)`Q$`-C76iN3a2vkCspGH1gn^Eht}S-Lq79T9Bn8XpNHW*j!lrZTi;6 zC;P;GkH;qwOE$qfb}d4&?Y?xa-CD+BR8JZ7_J`FEi}z^|vlJf?C9T3eB6!lzO@TtiNlzeWGiPimNAOj)~=c965$O(tBQ65)wjLA1IMp z=~}QpELD0PMY+)@eJUR-2Ny^)u@Z9Q`;UkUu}E+cIzO8pUJMt%O^MTD{`|y}^4cHl zp%%v=mKo1oT(lgHzR`@iBcwJvZ)(P9>DsRue1VFY^fm&MO-QI_%QKbOrNY~kM934m z039;gCcKvpPR`{48L|+%b^Bv{2?^ZZ*KgZ5w>Ptf=gwuu*0%^`ue%}MQId!Pv*15f zRaSspVE*~9y)Y4cLg=Dm=ngD6h4~Nb0rxv^@FBqqxVj?23Jwv;O|CWGkJJ#zEeKrU zzOMK5pIMvWOpmnA?ccIC+IEETYOFj>)zQ&UyFPCkO?H=*y>2fnQ@A~*!kwdrpC1;V z2IqeLw5z)!*uiV#cgrM|mR1$3lA84}^d1q1<8bqZpv6V&W9+oC$`M{O#pZ~N-P4ZU zjMZQ$9P$6ZfAg#$O?JS-KaBHF82M-?EuGc{TT)pbvq z_?%&$?3D3nORdAS2wfhPzioUW9}n%vx`)V{6n-0CaptW+E%9)czd@T07faA|dlD@YWT`HGiY3mq>A>2LjHDi`OJ#bOzzM0lB^0Tp~HTZ(<=sETA zk0tmvL!t_T5dwv4FE)up&)#HA?yqI@d5o{gp5~Y;6G?NSNNy1 z5*t32$IT~I7lSJ25v|$C%JuKK{?6m3yUR!A&r zJNw2&6EigdlOVU#QAPxm00Ku$|2TJWn>VyFOK@RyX;L)O!1srR9=`QG5|53lfkWYB z5CpV7A-r_=b~uqUwi*5(tAqyhKv?u=sbds>VeiID{N>TcbEspM&X-XhX>C!c1Zq3u zM*KIM)BW_3U<-_8`STpa$YY<~3bovBJo{^I9O5+;u{%lj!P@;w+xFVTeLKS8&u*h1 z{@26icJ1G)5AN!(iWa*(M7o!~Ke4zebhkcHSdRV{+zk>BE!HBndhaf0cD;)ZnMjf& z6V&$P zBPzj`4(;1QLf>iR>LadxOutUw&xO5uU3!eZB%wI$Q1s&rSIS5Hf8v&Gc3o)EwTYq> zJ7`KYoCZ*D`(@`e71HG|*B%u!5W(X%2oZG41!(PhVo%t9$;LWizrOK^lQ#Lw%im6l(tVjAx-~^q&7L{gu%? z$3keVN8a2PQ1Q-(jVtgVTRo|Lemk~TPyGA&IivHTzC#n8ce@wPm4g4?Y*F4>qiZAf zehVAN=0SeOX>eKDAFwQxytHv%L!&KpRSi!bsqNp=X*8nA+f^#Q*XL1iWr*p=t0mHG zME=*kQ(@!X`p;)Vmh;uio=NcFUm<(Cw@Y5q??gIz(fMq`jK<0QHm9?JIPLFp)=N{> zw^bzgvvm=KC>zK-{z|@PCi<1dTrB)j)8pv^Lv*Gyv``-cp`KgzmV_DRcVCH0aDM)W zETi80r91DFu=P1l>tT5pThZYnm2~Fxjj30vwD+C0JVvO^5&W?sU5hI#MG7%7p%HtlOfzdxKCPJFX$3?j4P17gdreqj)plX5AG8 z#?dXkJ04IP96w|EmXV5R5y1=t`cBxtAIIO-UordI^$`iqxRDS=5}WlWZY%AdOxy(J z{ff&X*|=nl5n}wF3h0rWXWKV)t}H?N-bO6jC?pOib27d_4|_G`&hBI2{bX05c53O{ zQBO0$9juGch3V6*?EKy4a80>=15s?3G_m9#8r4|g8~GO4aP9IJ zaZB!ON6C>v#T5sYNSfBi%e(5Vm)@^_u@J;(p4ep*5>UoEF>6M7^kUWay!M|+c4Lx= zfHV`b!K#RHq1RYjYxG0Q1@UE&_k8X2Y2cN==RLfFe$VM(Q`?mkqiPyeTh z&RfCkK z;<;$VWx(3=Yf;*@K$Rjx%USEgF8D^4iek7vFagmgQB#nIhy#NW16h(ILGlZ2RmqyB z#2`5r#Z5~hxIO6{)-Cun`)uycmE0UM83@Qgwtio~gz{&_63$n_j;E^S>`uMoRqe z2_K1kaU_KwdfQD&3s`cuXT8Pb2yI>oboZZPN+44e40;waqi$S8o}AUWPxE}4p+A+x4|>F=2f?+!oF|qs99Mp?K*s#} z`^@S#2Wo9HzV00lZYB4ApB^wk?~*b&Os6w9G5Gcg`2}Kg*8`v<-17K2a3PRQiV0tA z^>L%MRaIJ8tf>*Devg+eufUpzKPj1BX!FH^5bkF&|1d;Qz#x-cI2;P2&IVR%IYG-I zDqo}6JH%DeIXf$GHD|xDB!XChIb`TB=7G+UB~c`agG##6yx`!nb8pHAQy|aZU3Lg! zd^7T5!pECa-|r|z7f-Y3M}HsS2mgM{OYh6>au5s48N5 zG=$eJJ2uUqSYn(WQ6eDuUW_({_5otO4eM=*nmA z8sMFVazG~@2_tx^W0x%fy7m`OpqeSRR1>)XAOn(3n$mrq%xU`2vDig5W8Z%eAOwe|BHvmNW z@j=L{ipay?uGyT(c!GcFi0Uc4wE(0^$lN#Y)2}t)NRhnvOYC=9Rrz~Q8(#9=xDHyJ z?-92mNmyw7;IPM(u#=>TF!k2Ado|UNo5Qs=ud5aZ|~xAr?-?ll1e;%dMnWSDyGfO_gttjXLj`kv`@8vqwXgc zJ;LfA;q4NXB06{=H7;EwXBCrw^wx^OhttiS*B znk)Qo=4KJ^EF`hb;~dmSoFfH8wh;b;ul=2$-kGX~dcwLds)v)BX%w>*(Wk*QIK*eJ z^)Z6UWIyp3Z(3c!&Q6@EiS2MLwf`^}S$UXErDIE)h1GgJC%w@VG5s5}J!r8$HxI+lYlR2_EE}VQyb(in-~*n%zR-nzw_~I3YJcQv(+1iC)T>PVY;Lucfbbs z>z${DYuC1jL^J7_EG%>`tHrlf*QIfJAI-bxhZq>szgks78*QGaE5Hi5X0@JtAVJjG zwBg)Cr0Y)EX5qEXQt(?cR9V|3xI6uhDPKtq%;a@{@;>ws+V+WFoySMsPDnsJ43i49 zPZ*y)UKP#lPC;u_OiGO5W^?181@<=I0lcIsObJU-M;xXFBL^R(1EbaN!?sSq8 zvA24B0|K|Vkn-G-CBIN#?d3y1Bfl7OmN)m8LeU|O9K_m=4VOYm@~0XuD}tdycfPoo zC0kXoq@ZlcXfBO|kDBUV={fO#+ANz&(>3+dlWsY;tke#Y{BP-dvouSgHJ?0?9h0^c z{hvpvH7q4~dTnI@m@2tCALeNXnLN7&--&sMSa;@@YFXJyot$Fyj1;t9 z661uvOcV!;L(Jz4`bhni=PQcL=N{%se=bS&B`ML?}CD_qoZdV2b96L0uQZs%#*f*Jml8 zEb`MaX1dtV3gngS$4$cY!!?@;{y<_3Pc5nN6Ei1m`$9p=eY7ylHqyR4-ZV=)ZP};b zb3ko|<3B`8>UqY{Rtefmz=<&7ykyNJjre9S-`992o~@Hez zLSXdD-SRwMHQf}KDN*mHr~xUXZB#_<`;61GUY(q0MCy$g4!&&4h<7?kAG7yzkTA1! zH!UK5+)OY4k(d69euxH=U9Zs7EJJjw+@a(#C;~tf#m1#j zkBa4nMZ(tKJBURhw{o3n`<-hB-zN_I)UZ^sx5Cs46Jl?}vD;>Pf_jb(isQUe@9%4` zukxWnlWnheu?mMixiZYG^X!SM3BG2vg8cEyoT8a#SffR0EWz-nQIlBWqZ(m)1(7eJ z>6);xyyUKvG=C*@)tZ}PYp8L6nCJW$6>aPiJ--D z`qmoSH~Wo2wn$4`bqBa{3F4=klFX_2+j6Mj4!TmQX07gy8T* zrFoNQS#h~!b~2s-_KHys>yO8`cj7RL*C{eA15LMZY(cC$2DzInl%a=+HPPBjR0o=` z8g#^?xPfGD9Nr%z=8o!bk3a7{`Du-?3^tuCYVBO3U-q!RlHvjs)EPxW)=|CJyLL#= zwyji)43qE|e z>Q*7Ga41Gr3SuQUS4so>sEDdqGP8bRtvlgH_kB3~5`S1k{e6vsndTm2t76z1#|#O^jf&L)_m-oOo~IzaCF$5E z*kO0AK7G`}MAM~>V}0}f$4rv6``V(r4(iagK+u(PFL=oi99ph*C|;|nzUMA>!;ZAR zWH`ZJ6W6&$!Bm}!N80st>FE7!D2yEwh)(>dXfu8;bh4K54x$XmFYG>mI7GFq z7g2hX>sx=Nyb^-3j*|v!fKSVdil@!#G8&JsPm=W=xb3)EM8nqCw@rUb4*wJQ?LZm! zL5;#}rs-YD^Bxj=8U}*JUJ^3`?>^OlOfQtJ=etXYR zc(%P33`;I54T!E9y;?2FG))l!JGwsMJA+_qOJ7J837`IIrl`SzzW>6561_e=ajbTw zQ|gKOSPsG<$Ed%bmyY@qe~KqRYHa~A_$@CZ&W|TCcg{4S zmUM=&q%*$Z9sJL;(N)Oljl~y={X}%U6tC>{$C%hrpHbXOenDW6Q!zP|q1Z{6*Q&nP z=NA%Zm=nu>Z)Y;xVW*Ps5+X(lhqkiV22Z0}N**nEh_kS~Q)4}={EoIpH6|CmlMEN3 z`FHGBVS4u>Io9n5D^C;8zfFgHW9)Ybse!GpY_<07a9lxCy}}QgjUzom;qmr|ke?`s zO@GHY@fSNqB&R4m{L>!xVmb=Omw5_S%3GV-g%?p&dYh>2$%e~w@$kJ>b*Cp12xsFQ zCu)yBKVOJmVvcj7Jde&%qi)i6>_QXV*H%m{iRLSX% zdi{fI+u>sUPbb$M*DSicSy3PD(}+AEkZ9?(+}CJm&M?>zT~w`D@jd1ntP7 zwNg9ziJEm!sw==l}OxO5Y0uk;5c zvBP>k3Y15jDCb;-~U%5&kz=6?yTu|G%l8PF>SPu0Aig5}y{$ zNs8AtnW##MI5_+G!DSzG3X2J>Tlx^k^$lI#x-sY~&1$VgBH7Ed+>=zY#Dr<5QXgoy zdlLlbHKbI1?g^TGzM+CYcx$2;6nVMWeb`hEQlOOTKc?`I4+Bza#AZ+i}C9oyP2cb02skS{-f zS<9!zg2g*U{vnQ!bz)@coT!s7EgnbgK2l5^lB=s&{=rvz7C z!86?)+K9c&ua!bP7pFneo5IHmi|y|^Qp43Z$T?s$=+Bqzxr!}BzK>UN2sOaYKYA%c zt>fv@x9%#kxI}Z#$3nJ|1_|Jl;$ zht$8gy0;io=uZ?y2Jq(8s%B|nQ>_-ze-TJM6x=;fcxQ2}yB48l^UseuSo3-ts|5oU z-C{ZHovelb2w+^|z{L6yN?LyWqht8(h{0R1IGl#6mrM9Sc>~~hV^@B2aBtNmIHf3gjti_w8jD z1Gb0MF3u;rZc6oehhNgt(KDBkL*5gM{OJLrj8B*hI-cRV`HW7;DQcQXq7y?#2{9me z!NS)haY!c7L*dc<@(dLbaumy&w^9$d4x(zD0@hgM&((f~g2Vf+T!oO#pTwFUSz=#sDt5f&0-v^#vmsdn9~0}xu{%a z7}ILIQZQ~j9quYaj=*2%KQTWeNeSFMotu2>kJIGz$%Q>Co0@BzvaSt8`wQgg{Z14VxS} z+*+aO|IGq)+(1_*7Zpn;5p7**$g$jA<^or`n*w&2)!%VcOqX{cRv&%e0yxn+Or~_F z`qI3`{7j%$!Wphp7@9RBj2OCAUl~)V5Va>hTHoPWN$Gzr7lS=V`x9vmTxN@?bSVe< zAhq=S_RjhIW%zA3iyE?k;j-;64LAK6baGiN(twv1v`3rR;=@V$bn);>w2ZYDOhf`8 zmrHHrN`&eJhbI?phja1mI$S`{;z)6}lY%u3a|*J)znAX_5nH>gnukwrI$bg6!LC^N5Jh?f!brR81r-hS6}-5 z(SQY;NGL3`;;S_6k)+T7JuIN_=w7khob5Gu(8G~@$woICaR_;h($MqN>HeR_8Zk5K ztae_QI9`wlAoe!$V!Fk$oh) z>jP@9&B=ac*{Tc6^BaSun-jzW#s0|B;mb0iW>8~YoAbuvFK#Bwafz)og*6JS2tCDz zx3UvwTvId5u|9Z$UkKWvIKg?zqo9=GkoXLI%w_rr(F7=$XWujPQ%dhj4*R3KnT$ar zyRMjqwl?vnH$-J(3PSaPUzYtWTZ8 zTPi2BX&bG5Z6op=3d#Ehgf;H3(L7$!4WT`6_dXy5C!tZ;!d}k1}W+j`z1}%MK5WjwrKmk za@1z!9qkQ3$(PwgRnk{z_C#;qVaTy&o^Fp32Z&%vZx|uq2b{^%Q)ns=QP2}>DlNd{ z?{n9@u+Dgks8VphFOr1SuEir}McX2u!``}0C?)cZU{otGV9*P-DV7%}l+LjtmYhjq zGOLd!-XF(t-PyjOs!|dGBK%6jnS{Us>ts<>)=uVLsCh1LG11hwO!n_I3!rctDxr|J zkq`3!PJt-N_k+d_qvrXvM2&-ctcRaQ-_BbRb${LJ1Omp`v5jY{9zp?)2d;NY9G?ow zew9f#hkGZ(?4Qb^#HGAwAJ{9kA{rrES;KN;k>Eb=GnK>y3BsP0Cj(oTj}>0O3U+Jp zmk%X>BmJ?KfuGqxuIhrj#piD?HK5rH=vDO8F;v2=vXCx(&ZPMf0sWn&N<`5hiu(~5OsiY4Rg>Bc`O8OxtY=T|2==G_qu-#to_i_LkRc(6u_RyJ9$Y3i<5r1X}?J{ zdH(OHjy9ectQdVA{GLJlyymJgKNGz)%iy~>1$KSyBrAIdEB9(wd#^_Q&lw<%mzPN= zYFU8WISdh+evEwGZ?&LUotNCO9oEEd5&rM<2?eXy!lEkn1hseE3DHtl0@i0ZBSz+1h^EF|4o_8c^ zv!j+~{+&l}nHzmaRD}Xkn0D{7kmukqdAuIO0N^VmOnX6vw)!V@x+9Zf=YP;=)o z(kAC>*|)@#hrM@2RYorW<8Tl;%DXzZ(Z}9QMFEhd&#Kun$khJzcaQOGW+xepNm=1R z&)ehzu}uD*ZzcJ3C3vWb_Sh7?O`=<;f4d{Cn`(8pVg7ajyI(IzQOtgn<|y?-_e z(FaLnb5?gp10`@J9W0a2VspoP8pPBMBo zSpDC*sgoXtjDf8Vhlcw4I4#Dno*!aux{N5-;4NF9Md|SF{IkqgN8Yz|BB9V(IZW61 z+T(v8S99?8X~xdeJwP0w@?X%TKobA0uFK%*S@junErc;N*6NkZ7PBh4tYBtzC=hk4 z){ZNPu>F6}SB=TZWqG0xp# zM*nIAXrW+gdf^*chRr4A%~N_5g{=w>HQ|b+o+B=+2iBexjtQECqHz*Sum*8{W(CdB zaL?_I_+g0-<>K#ujp{(dbP**OpRM)d8yQ%do%|y94ey8Rh@3M{gZm-guMNV$x0++Q z$lkSc_MQ@Z<&(<(0Go|xf$u+^x`iKWSp;&Vvh<%`Td@(ze#Y8`6HEXxJrB^La^$7- zKEjYpB6vxbGXF?g<7S^AXgj@^dvX)au-T#Wj9=)!n*()Ied>>N7h|i}##}5P%E3je zdoi8Qe+iyFpn!JZ<>Ptl1L?xSVO9wbtwrXlF!#@T(W@M8g-b{{=g4U^hC%hrL6PZ> z^yB~9K+!~aBrKMZ*ND#gvsd6|F~-Xba1_a930(MMsj0x)qn)Wxtyb1ZJ^vOdC){T9#NkeCaka zF$6F9D~m+6*mX>~WMAkYHV^SfQt=+x`E7>Baf2qmnej7V{XjGC-~aiQ)kjK5F03@BiZVjg;%4*iyOQ3pqXtpMo;!MK zsjQqT>jt{*f!kXe){R1lavb>wE)eJIr)1JA)sNyi(ht4`$>Zwl-feoilXnzz9|Yav z;=MO3i#%w16xF>rMWZ=!P|)YmL5npi2A@v(RuC{-zbU9rT9OZ6I$PxGBOhV~8HV5W zevL~;Coi-TFBZ$1E`J%?eiF1Nn*aadbABDYi_gJ_?C zVL344$={WR?83KuNb?r|X&N}ulR0Bz)7>Lsw5{d%Y$c(_q4>G>KBd|%Si(Xo4{0v} z*|+WMicjDk2{#CZC`Xa`4D%2R7+n6dCb)RKL5PwGiZy*fDSgS8b`Ud73VRe zvR+9?qC@QZ7Vvji>fkTN#p&RlH(Ks;9Fmg+ix2A;m>}9WTdi(!f}*S?I0e#Zl=uED zuP*HnuuS?#3e-=&U56zR+OSlG_sE>8!u*>>L&oU=fbQ#f1tlfS^UuH1m&D70fz(wW z28I1`6>I5j!uQS)E^%_eugv@tr}^$d2|wt{7=38zGB8gcs3n|pjKOa$Ady{IFh|gv zC|C!+!@aGdY=u9Efkv%vc)K{+w(aIw~f+lN} zsg&dt31VED^W=!crJSRLTVm`QhDxsm2B`1Tet@z9t>1^NYg}yVtnhN1pSLO?fp$^<+p1miSB~^8l(y=) zX1CSNP**TGRjkN#(+X_}(OWjX)Qz7w7>Gp(fF|4IX93WJftQ!RxXwa$l*Cc;xfO!Z zOOVD?iGLt&WwpW?E{U@=$3t*xYA?07yj?`{y8G%A|DO8Xl`9#Cz15&#I@_e55Dl`( z#uGO(7PjM<|8FH*1`}LqB!$OBkfrkNXms!r%N_dRJC(E1NQCSijK!yHif#bp|&t5$%`2e)5Befr}g>Zh;CvHTcvv!<9&?JKc)K_vg1 z8@<-|Hu zPX+wG>wNBJ$S27~mBFG@fX)F*wW+RLgWohup|>oKjS0XGYv3Lb_k6M$%S607EgHaV zn&~(G;vE!)s&2ykHH=Bi^BmL#KA}SI8RS;*7(O8cW1K|r(aq1dm4aLUZ?AHfiTL|z zhgh?{MXk|Xnz36TX1*}Tv^En=tfP3YJj@<1(=y{4n0#9ge4>>eqe-6(ogIn(P4*Z& z(Brv>&!sEFD!~YoE|N%9EO;poGjwttk#}Ml;9REK^n*fg>C!jG%cv{+4R82Urdr0d zm_PD=FLqs@;Q^wKYiCnye)W@X%9ZtYn{x+0gJBefS;tZ+SL%I4UEv*&zL8YPMQoSO zoM$~IgBZaB&02XA6_R=PE>8Gpmie<3Ntz;qoRTYRr1x}J{~1KNoqq0ALMM(gDghOx z3oNQGt_HL4mKLCjFj0{elbx%rl-_q+As<+do91Jy8c=|m8=k%*p6VW4rvPI=8Ir*y zNmPsS8W-q}kUn|dbI%`?XL)N43rir=(E)v5iOv4oBuV+%QBn};&w_3fIXP~4NzT%9 zcHOHurp@k);*gXhIwcnBq2KR>mf|`ygVs*k1B5O=NbFl-f`&P-U*`IPxk*wC0U&+x zWAIyjC^>|HA@@cFmG^wa`B4pkv0f)Gx6t!&9&v%1aUzUlLZEN&&>hY9^?KpO=o`a~ zr`O=x{Lgm63Ln2OGf|o(UUH$gP9dD1CwV2Zl0a3(3GmWL$S9dmXbqKg~Im= zeF%be)#VMjIav*5(Ta zRo9=?#En3vjBkJ!#^m`GhxI7(fCU(u+rukfW44CfkP)++%+CIz2Qd0qa) z+|>)s(TD@I>3a2Ou^QUFDp}|2`8|Yjl!>6ew2@;hO)$l9WMU^IYk?2f&gFJo$VI9r z7iEzo3%_Mb1}Ov#Q!o!qDTr7^;rr}H4t>nY#(4|)$U^FClt0MVp;D_0Q|AwXfF2mS zC(=AlLAsCa_4DtX0VoEvbJJ3LPO#*%9xGxMCeU53XIBqK$4Zce!OfGlwAop-ui}97 zjmIZ{s8CZP(b9`sqhA06RA6H}cl4uSf#Ic_NHzztH^u|42B3$h&Hk=7Ts4GHRd>J6 zJ^cnyv)%NcZ*kEov0#t~9y{;-@GU4_?4N-+E7fHW!qk|b(paRZDj8%n>o@aI7YXQg zvuBoIcIo}4a3@w>>!(3~W_>a;Jpwu}eg}*^!>hR=k_e9v4!&1LWLu7Gd>pNPNc7U! zwQEv03@S=E(D)kusQT8k?%-1*`+D2@mqo8$Yb$hO=X`uc+!fAmA!7W16RQTiBX)Lv zCQPV`h1U;X$!N}sQ)UiUuoX;9+x3|}Y#zZ<+@V5_q%bLo92aJdG}>yKX=oPwq+G8c-h)MnM`jzvumu>%Cb`w z;KX}-{GW4NDb2C187vK&TWgZSF=+l1y`yCUk$xUkV#m!>mwq3RlB|Xmc!oUMSCxb} zl&;NvgKj(iHdxhu|MvF1ZW|EpyG{=73ce#{#iV=fu~C15j-cnXfr2)!Bd15j7y&9q zKXHWws_Mc;8eGK+=^uba&x_VncP|vWY$Au>(JMKvr+0M%K#5KQDjOs1*2XRu;cz9E{j(!|$vcLCx+ z((3#}-=n^l1|X?KIcaGz*sDoTbIby`HF(nQQw|?~sGzk=m~h0*_w0!nbRdW)>l5h> zP0^@_0C9A+G^@6vpRn#xjB!;{N=#DpXwY$RRoF*k-eH-j<&z{(0nucRTx|(lSKtJT zb;J0FL~pJbJ~jC3efaVy1SHV?ibDRtOqY6&l65ue{8db3WS`ixIwrr7VHx#gt#@-D zQ$>ube0}70XAbX?`iMi;5>qxXrBdF?fxjtdHcmHvkA}Gxnw&4{KA0I+dOeKi%FuPl zrzL~M!Z|h-hYYM+s%h9mNB6$tRRvCgfu+IUY#DJh zAle9}8VXsb@DkteK=>)lX?6RoUn~u>HOpE=z#_84_P>9cdCV7EJ{b>fWmQ_u4K$N*ca_4g!L?(p^@c;B=Hi zPnrR*@EvB&7>B>lHYbIU;vpB7F;94LG>k+AH^B=KzhRM3pdotM zQ)TvfHlJ`<#P*upvA4d1{}2TNDpS8HTY>G=s4(#cd;T>y5TJql0xdpCJ6sslF)y0< zF(MwirfEQ7#hERnK!vyU&R45}7C4Uf2Ge=x#zN4y)=q981}+6qvOv%5WJHgNK!fkn z!Qy=)FdAnsc(D|g6Y@Ja#n_UTxIv+-I+NJ4To?-XP1kru1mNSylG-d*mbCO>v1R2F28(D^ll=x0Oa@(}JevYPPID4;SMx+|pGDX0OW?lQ@G!gMys;Vy1p~HE z0*@k9{frjp0ht`lUI;6??mv26FQ8h)dA+$g660c|@C?pCW+~HfVWMJ|rV;uynuEop>$LV%&E+JUwN(Rpf{o z)njOX-_2Fu`1Bns6h2-3R5}F<@Yehep~Z#!b2gY*)2cEynxVYoWAPSd|C3>Lf0n@( zRMl`Pv(=1ml=mY!O8!1w8T=mYUWZY{hMN8}d3QfEimsaX?)mme8LfqlD?im`GH!6u z>h{INhu@q#%$H)ojRc0ahf0s{qOlytBf|C-(bL1>)sx^2JUz|%wCeZQ{7X$e%%@Jx z1=P1>lYqe;bp$^ryWNz-owJBZ*NQ@Tc@D?T1Im@R&P!lRaK|{u-Wuk(0@~i-Sw()D zrS~zRr?}oxFni?(4j=%8lZKh8O`%h3%Mz+A0?lef92-)%ZIYx`t z*|UG!#YBS`IjRxpnw>%Lyk24A&y;k67baJX#=fN++Gv(Q2;wY~XzMur0C%L%@!<&1 zj$yoQ0+ecUQ4Wd^&{~|}x=^*XPlQ7QNBC9OviHm#0_TNZi&q*jn)$>-?eyD`AppDaK*tV6&XtoDjob|H zxIht2VmY4okh5>U?&AdJ=_6aGku39oZ`5K`9cY>m>Li*%bvz0+X!2k0PAFJlwHSP>D~bCXSY zO1}XIRM?o^p1Udh|mT~|49XUQXJKtE)A#?=s{x=Iy z)vCCzl!o5SYG~@k$P^c%B!Kl75Jj64K8PfwsNzzMGrMj?P7tyN71RD}8k=@8g73gK ze`}naq*MC~!NZa2&Z}EP|1Es7dN}fFFkjN?Rw@gc)j~Xae3>R3&$PT6+zLP^3NdWv zJMy9VfZjx9>(nk=vVt8EDc2mh2C2SV zTm#}iCOa3zz@EC_gHW(e#Z;=%FOUy(9^jw_-(Zm0znolD5?biB<0)z@u}uf@eOZuy zOmoAZ)|)t1!9#FiQKo$R3E)OzHEMvhQBwsHcqg^^MwUypPP$)Hefou-o=p13+SkyF z@Z-~!gEr}xGr>1c_l4OqeuD;MUP7^#(~nHFIVeWLaMHQlayZ0Hz_37r`L-OXcVXoD zjs4ajt;17^nd=B>jLF!{E-Gv$fYdQ$dIc0ovups(?>85>P5-_+%=wygnM!XCB|J>&mOb`26$6{vpI4}PWm+NOKp(I zJSGzOmTfi|By>%HU6S3{!JV2XYztDfwP+^Bzet6I3-2jh2rJPwchak9tT{!i0xFBh za=Gg`((~{rZ8fySsI@+1wc;HEc{)gR)2`-7!B6c2FApJGSO>MAUg<}na7=d>$3S?3 zYv%jdnPI58?gV0U(%?D1B;=Q28c2~&X`dJfHL%UXowvTkpI@8%4wI`80WSa7GCg@& zmHjvhaLo5P-GLZRa2r6~#{bv6bb`L3i!@rvB1dnJmR#A*MBmuN2JoM~I;G%)Q!^wO zo77WnHMR`F_=b(|ub`VkT|B-NGI(tbwJ^+kzKFdS>JB3*RA@_3r{Ol_7r9uhzB~#Q z29I;uHs3m|pk%!1LLx7BQ1$x!RqFtL>{=#+7w`(Gj1JHvJ_3xX`HCFB&@=|`7P>*M zW2kv~R{|d6(3iNL)A@&*V;eF3^X2x@n8*wdnuNIA?|Au=0k~39PWP@9O|P>$D?#uL z>xar3Ad{P0p=*M3MI>H7E3?`oIjic-?%a|Ry#st5DpPvnKN)A-*ct^r&8HU=GIDcH z0Qcj~?a3?WF95-*poS={+zK_^3O&d{3WRbfJ5d}*U{P_sALEcCSs9R6n-`g%HuV zP>T#_)!Exlv(w10zgK#JBsj=ni4WQD{NBum_PP;Q!&2Ykkt&LC@cf}8RlBWvK$804 z`3d`_C%T277y@m z!Vf-K9I}09fAX~Z1KW3*P#G0ZuG9}-l<%KVkNQf!<&a~(+2&;BP}Ue>Zw;|EO5TcI zN`&oPoETVSbmHoU-evdE0Xz$H+717aW+<2QnvI)$*V03geC{+xyu<-S{|z;=R7C%? zr!#TS*yQym4=sMJx9s5(c3Q++VmOOKkrBPuuXLdoF#j6E{1U^t)VBV(<3y-kAL9~1 zqe#p%dyce+RZ|Bz{%LAt5rIoze$Vui%o5)9agd|Dmn*j~S@M&NZyNtbS)otz zOPa5r;*5t!rcSnlOQOIa5EX|$RqJ+zp8L~842RBHiT%OLqsEa&q&D;)T`K(y*{n#uKwbpFu}pzX@%)(a}vH&T8G&D^aS-(nmk}g*QEb)WsHq|HdHvetAG$g zbl~*X5vNgejIpJ{Mfjg6kud#v7i>oo&@!rP!9zvQ?6n_oB4>SKezM0q5lriQ=Yv=J z%x>|C^IHtZ;e(YyQvGI5)kAExHCB8<(~cg+$$ESXr#GZ}M|& zcoT|?BYzA}9yP=ER>5l`cHcm^W-wJ&!?YRNU?7jE6IC}(R@ZuBPIN%}&2J_o@T4nn zi(h`Lk)`G3-nY^Nvx*eYUWX6ts!bk$M^70!SoR-aEp;VY1FdaBUcdaK_;L znK7tx<~*^k67285J;6RfykYrkA_ec!P3StYMSTFO6mtpmawrpv&Jvi%zC{{0|H|v} z%f`LoICDdJ@OIpECwt#>W{d2akbv8bOlY4UaWdu5i6kQQzRKMeqvn1aTu?;NZz4(? z?zoM%G;jY0;1cxt&qwfnmO_>>CxMd7y>6@$Tic(1{+fiH>iWkFAR{&EI zYa>x`Q_|-hFmd33$JKH(6mIm(l7=!q!Y5F0yVZq-#3E_u>IDBkne$gOH!JX+Xp?@_ z0P6Bi0bol2QPqMqH~pPn+*lr=R6)li>0Z?x^09&U7)pRmAo8O1+?E(VjrEt^FbnOD zT1`9URC4t?{9$03p3nz|oTM)HE&_Pka(C-~0rC{{|7v>@cPPK_tz#!-$yy9bLdX)s zr|fGfB@xP&eInTzOH{Uu7+Vq(CCR>&Wk!XuM<@m(dxasggueG3`h35?Kj7!_csw)b zz4zR6&wkFmFI8N5y7GN1dLa`eBjF6rgOqD@_dYdtJpvTfPaYDY7NR~tZIN4tP=Vc+ zESC8E&q{B;oaLcQxW$4IH5%TfVwNa?QuoCpn*r-4-80e8)3wpx7IVixD`d<%E4(#f zmu7aB5~^RoX?gvA>1HS(dB?cAq3Y-p$@&&% z=6OMakPiNcxep^Ak2`{+c48l`7M-{hYS>4f1i{L9YJRKyEJ%3ufJ5ijIlhKwB0~0e zo;6R<#Zpx`aL01Si!TTMrd(q}rNNIh0^vCRAX=&OT4sKxiqB#C*oU2xYS|?6s~wYA z&!vo1sKcf&W#b5WtYq+tE3=`1EtJVLU4fLqVbDt@R|Y+UNcKphrairbaX$vNG3m#x83 zQ+d@N4B>>s{UUXlOAfCnqvbr?jg~+ts`ezUr(hr3oAaJXQOiXO@gn*06B$ARQ6@pe z>vm{pDD~bGO5`Mj?<6}YoBM%aLQxuT%x&8aY-hV;l79Ebd>{Y>d4?cm&g{|A=1to* z3|6>sKBdn3-A(pa79tI);mEnkb#N0Eh|c=23G!(6R>?0a-i0I7`-gp-#8jn?*yW<$ z(b`@}i)Z(q#z2%Ac8Xk-vFh#TwNr?JBYaQH&VDk&cRYw}YDZ(Z>gm$|IUC~=Y6v=e zu-9JG?N&TZ9)t|v%tLIU%QxLGeOD&hj;cvVi5sgb<+i==Z-lOb>67Osrja9ZHkU6t zfRmqPP+YcB&zNYU${e5;rNIzWvSJ93k{hdL!`t`E@HLi(O#PLUa6OZa8nYavw4DVy4{mx{p!ez zu}TnRGN{~oFV3+jI3*uR&flc$x85gw6OHIRQ2fP5UgRjrb$y|GeI&rM?!Gt+zb6lL zrB*Jd)WjRHjew!;jpS4CfvAFWGhRyM_M-tO2=1uR>TrAt9FVcMO?_QzygM>!tuyIK zruKzb!BZ2V1i_=KnU2K}t-v_R#dIZk5g$eYqxu>YU8>-n{%o3}T7=3s9HSCKQ$<*A zNkAN^R~aj|X`}YQ2~oIpD-7+{Z#(PcJ9%^Lp@cV)Yul50@^jc6@aDmkj6?J`gXy=6 zL8f))p@n-v)ofUDeFSL(g>kwNKmss;r}D!x&jgcfHUSnk+#1mF>jMOnHSo^D_?xQ!C;kz%DEGu`-ks)-g$wG9bV6ZiS zqN`KVOj^fCE4#2=_fcX%FB~0HQ4r@d{v1g|cpDM5{E5OZ*g~a;nk30>R) zeh0U;o3l{MC8^*xJ$9;~`cpU#+LE5B84!zI=Q6Cy3v}(4R81z0!n;Hu9uuNkef}1e zC0?t>; z>`8YA`JQo*fc_*}Uu9y(s!9`*{sba3uIlsgv0 z0>ObZpn__6YWX%0NEf=Mq^VzBJH#=6XAk)#Y>>h;ajEzsBn>%-S!h0ZZ)x95<&av) zaGa=7?OTJzVkIA!C0b%DnqMgy`=;t>T*0yWNam&c%&jz&34pVwsLs1@p}b-=Z@fN^ z7Kt1)Io%O?38V*~9SwqW`nRgMMX7?uqC{*(iaF1hjs6%K}D z>$N`!$`6mRy;1T1Xnv0kf;lYSuNb2;_!C(>v#3Z@o8b`gm?O0oUUTN zBKsN=kcA*nO2P^kO>b*F_aHqHLHQ?}tl26;^a5AvG}MX-W?o(%7E8Bo9{+0H35JAP zm)9KjY>&?6pJyT)`r(wdps}aRj>fsB1UOMIxja<)-JnC^O|*qR>9vg2MX~7uXocT6$rNT z726cngRt1T8yJw*NjnCMFaP)XALGdmBC>(*a8Ik1vvELE0Lza4*7hl`-09QdLJgbi z?D8LK)+xLLJ~!x~54>sB+kV*Fl;4vRO2A*YrwU-?r&Ffh8QC4PXWf~lax9O?CH}%> z`Hb|$**^>+_E?M3*Dds@@Qj4nGDUGF?bb`BNLVX;-Y(otVm}%e$+p{5IC0a2FslB2 ze0uURsGET@p{v;IxX8xBOWXI z`H`U(eO9$7R`9wnug6ozi}YK*DAUP1h2crI$|YBXtW8f0D1+T8(uTrXAMPdC3@rn= zFEHkDENU%jn^769ad3oV8Q&7ka4hiVZ#zt>rjdx`=X89_N5WQFf=9i(*J{>9pwHk5 z^8IN}7n4J%HV(@Vh@ma2EI(QsDAQUSY$?peVUs+=J9Fhy8jsF(pc`XgTkXuDQTcQ3 zL8vNQQw&IJgG1h(H+%Rl<)%JOTK>7SgI?{_pj)MJ7v1D6?m}15HNZKP-uH$pGk%vk zSiD~J7UHPB?g}6~gnDK4Wl}1*dX7qZg#D+x!QlDQn5R+OS0fp!MZYOtu~se}e7Q_v zwReNN!fG=U=$Z>YF#HkTWTmsVRbEHtX-^xzQJ7<$W1f>qDFL{_j86v-`D%gWm_3li z!-l8a-Ga@$Q5>*s#nM7u_`v-t>X(XEroCCLJ128rGQNT37JfEz>2WRlXhDJZdo|x2 z681=jZY*(W31eZsg}Q{>VS4$%{!;T!MzNcW6%StYa@7ZCAQ%wcZuZM{zN?sGC7lRs zNK<09(*-E8)=E8PiP^0;dGi2vy2fLs1j_q9rSzAt9=s~sncohM4AhSapzk~lv-iUx zB#7L2gvam6bo5)x>XsmG%ES0Cy2>)7()^-+9+I(=sHRNF^t5C$BzQluPaZHAq( zwPHuZN6r-S>|>YrbxjmxHKXCBsgMzKWe8@h<*rpsQ5Ubxni|WC2v#qw$mGtuAqLr0 z_@$4J60_yq9~d z6U5Lt1!qXRt&O5u#*1zCx=XB!KaCnG%Zj;wh;J&w2^j>F`l`Qa81&83*%~#jvofMk z*T5Jih=0(1i})^^Uw-eV&zSG{CWM_ugfdC^pD10pXGQ9iY$T8T?Ga50|JyxNHMU(8 zhyDUvV5k0OL-*^dzI%h`eAcP3$!JRRFd@KsXuOnb?6eCi<&O)-x z*p`m8yHnF*)K9sg?}e?*p2XR6B5TiJ->e@X2a^)_3`%aYUGb*9hKB%!-UIXara*;O zYL^rt#{(~9{gP#*uAD(KNOUxW+EExo{5{}5X9q%X`4nqp=n1ibyR5Y=8&;AX5l_~< zr};u%T3~x~y23@akJqbD$k-R?Vl%6E+IJxf_K4UA%xtFa-dLY)pR|w0g^lsy=l_Y* z#SUx!&==R5&k=O*+}k2*GE_wze3fmaz>$7~^MzzFbr=`B&ezh~p~ucKe)h$KB0r1vOgL`;NuB*z4D+HYAY7xO2ym#@_Bkhy5vg zPH(u}rMDu7aqRZV)*PFtUZ`2|9&HEeMsp7*D_5@`tK>>&c?Nz5PFGBTDoKh-1%vF5 zhIvsGjxL2|g9bBY+d~gcb)r&GdT|EompBNo4iXRI*RFl9O^`4Lgo3oh@b#-{(?8p% zv~lVv7=Zd9O^RU+`*dZZWq{@DI=G7!I8f-KygG4?r_C9-k>v+X7M>jR+e%U2#cLbg zWwq8PH})v-O=KH0;}NMy1`wuMG-2Cq5#!dtz5{u{K7K|K`Rql!UD1OxF`8*hTU;gU zPeUmdqt9%pFDLdAsTifQ>1R6p)Qh1}5WhA}rLvaID`*M~KF|USxBD?Oaf%y^9U=xAi>67iRRQe+ZE(xvK@G}XzY(!G79vvK4&-v2|e)9>2h)@YGk!-I1Q_bJ~i$(UHD$uV9LB8>m`GY)D2KInZ? zW_h!J@W@80Xpa;H7q;VF!hwuD-R}#xjC(#wq6NijoslT-5;epd2wT83fn?0LckR*N z4H~QT#fLUI!8!pI`$|tWAUMo6nnbpr6*~Q<-EQQeAG!@(Wk6&IxtJ;_rK?-2O7a8{ zT5jBgDoz_=o{|L{e)s)>*@8iuIi5K?2QOqFuZxu$V6Ztye${@ov3e$Ixzvzi*RfaC zHqISlHDj2enUoPj;IEQ~O-i!24v$-xsa9tKM*+~P=nyCejNUj{=;-4N%1{PLzIJd+ z%SiX2p`Y4m?{oJD%2aSSPh5ENo-iUpRv8fM6Tke>X5c_bsGs|GGq^exEa`$LQZ=M}HyK<8BPm}>sl%S%qN(lrE{qpM& zcHHFq(R^36#fhP~R60v2?&|{O1wg`qR(!bYt_1xbqZMZNn^>bO*Wi}Aw3c_er=7+6 zr1vJjcGD`Y2NX!nki~WvmCkbPd}rBkleGIWhD(sS{}*T!^bNSM(up#uGTp4ar6|zaeC|kjSQ zxC75M>qofqe!Q1VJcD#%!mL-fAb@p+cJNz2&zIi%Ozwz^hz7Pi&Xv)!tf@FB0$(uS zdpWlKsj6HT0&ml1FX?Z&Mt?T)5vIrZCrt`=-ltBXL|TS^uUxGf!PO%X7;9}mxw@#x z4qsFckc5W6bJ%ul2A=qnH5F(fi^4FDkZRW5eE+ajkaK|8L*OJ-&YyNR z>jKKdkk?%J8$+X2vs1&*O-)W$aPcy*`mR$~9_jvcOcYn+NOw9gFyUV{PV+<#w&^k3 ze8v3$@RId6ZS7p)ZSnJbJ|4}j2J8y=zIE!6j!T3T_=gd}{jF%M)GRn1@HlHQN1qSL zPUdod-o336*|EjaSP!a>Hz5eEZgnc`>4AC~yR!s+)DR%Z)W`K-H7#p1phgk21&-Sm zDW2$933OU9C!N?8u@tV$BK0@cfXoqCrob|k`@%E3&}T>cuRqs(M6Ke8wDM=Mn*_-{ zu%!@4nX_!yu_*jJ&c;4l*McDk)kFJda8%$p#Drjx2{(g7rTx8y?878kBJKk^)Ec%43i3cP_PwZ_olqFWtIr$IK33;0$*z#x_qL2QJx zs`ef(y1s+saU#$|IUi{CK&yg7<^7oDnP>^zPqw_wooB2s4P`~>F`(q+#|!uoKvZ^B z{?d4ZI!7f}9pkp4-rOjT=ao`R+bp2n((S-NUUZgW_k!!RwGI{v zrGWe+W=P|JE~pAaz&UQ`4Obq^Y+Qjl^!DfuN3`tu3U3wExIg!Z6`jd4*hT11|N2I3s#@o-2kr=z(MZ_F!N>|)AeDN?-wgFN6;;(;XWV= zKj{4hE|_lCD3ZxwsXqiXrQ4@=leB~p+%wR0#Jdkc19fSgcB(>2;ys22X^S_qASa+xO>WQb%d ztz7B7Z||wdcgrJugJxSs_i3w#1-T|WSb#WwnOgsMIU?m^W-E-nvy~jw@_E`MVyk+PxKf#4T@7Ch* z#r*!iqx1r=@pAmvfsA_U4R!m-OBRJ9@uaW(SRiA?r}hT27%ypynZH(lG$mg#P#f2Q zYL+i@;Go7RNzOu|zK`*@!PQ?HswtK_M0uSYI^eeFn>%O{O*RDW-&|Ls@pJYWJ6h`j zWmop0`k^M;I6crGHUW?2-WIjbb5e?2=KvNouD1Mxf0`_m3H^cna5%(- z>Spa09%^;^AFirG_KI3cGF)P2XAZdD{1m*EAlOR(FH;PpAL5(MiHZVmQ&2}oF@jek zz?>)amkmXr%mhf#4$A`{WmGjeBcW8Z)XwX-IQ{SBAF)dV7>x}eqJSITeae%`TzBlr zb8NRSb%9?L|Dv8U!|2SVl+x)qNBA8qV6r#W!Xs*Xpf7F~%lqlX$ne|Y()D+vn9Z3a zJ^ys%IV!RJNF%&wY@vBeZ_?)I(Y#3iBblHRiKr2tS*MfyZEUeW2xEf!im39N=KHu0 z;^mG^o3*;6@P@KRDxZzF~0AooLUthsA}^7gUx!NG})nZ6d^E0CVhCABT-Oo#;}w zXpFO)Lx4eCw^M9R^QOJ>h+Bia8;ajIQ1?W=W9s7MpznOUMA-+B z!KDm(8jbX&6~@OQeFRZYOHgbx$)J15=0Ldo z-<2fN5v?SjaaB3a(HF0%nI0{zV5fO^x{w(D+$RN;w1gWfc0$MV^3L z@=7c`X8EcNStyFv*|B2MC@;O){J-nlnTwx#37jp^qwNWqls$Z;Hoci;mlh|#gI3Oz zzmMzsFwkT8qC%=6odb~;AoL<^Rl?@W`4w-?UWi4(ZUGiK!8CMXMHKONSr}k$4gmrp ztvyAPQ)pzz%kREkeSAm|@~EN7ia4B_0jd`!IuM1ruc3f~0_+edY4@DYve+|07P2YO zn!bVg@u^ObU#x*Z)FlGZA4SFQ1d`9M(hPBh1eZQpDnW>gKmi`*V-Z%w`=v~-lBH`s99CRW zdVfc0WIIGWkE>#n7`GbWeF?=|lx2jDh?4w6nTD3CB1B(-d|=nVU#!z_`@@D#rGK{#)YlcHU^%NzpWW0UAYg%! zC#gUg{sD;t{y2kZcd+=u)L6?&V(=frM;FO`5OO8w+yiV4YY^=ll&!s~g1I_$4WhSC><3~IDrd_m!LCB$Rj8~y!Tg~3^% zSPM~4<5@1oag)(=^E@kJWAPT3G|oY*MV5PzR)>6ma2QGjqZH6=BVm0(kqNk#qX9Qd zn`2m&)7U hRXH{{5Z*=0.6.12", "pymysql>=1.0.2"}, + "pulsar": {"requests"}, "redash": {"redash-toolbelt", "sql-metadata", "sqllineage==1.3.4"}, "redshift": sql_common | {"sqlalchemy-redshift", "psycopg2-binary", "GeoAlchemy2", "sqllineage==1.3.4"}, @@ -452,6 +453,7 @@ entry_points = { "nifi = datahub.ingestion.source.nifi:NifiSource", "powerbi = datahub.ingestion.source.powerbi:PowerBiDashboardSource", "presto-on-hive = datahub.ingestion.source.sql.presto_on_hive:PrestoOnHiveSource", + "pulsar = datahub.ingestion.source.pulsar:PulsarSource", ], "datahub.ingestion.sink.plugins": [ "file = datahub.ingestion.sink.file:FileSink", diff --git a/metadata-ingestion/source_docs/pulsar.md b/metadata-ingestion/source_docs/pulsar.md new file mode 100644 index 0000000000..b3292ba9dc --- /dev/null +++ b/metadata-ingestion/source_docs/pulsar.md @@ -0,0 +1,176 @@ +# Pulsar + + + +![Incubating](https://img.shields.io/badge/support%20status-incubating-blue) + +## Integration Details + + + + +The Datahub Pulsar source plugin extracts `topic` and `schema` metadata from an Apache Pulsar instance and ingest the information into Datahub. The plugin uses the [Pulsar admin Rest API interface](https://pulsar.apache.org/admin-rest-api/#) to interact with the Pulsar instance. The following APIs are used in order to: +- [Get the list of existing tenants](https://pulsar.apache.org/admin-rest-api/#tag/tenants) +- [Get the list of namespaces associated with each tenant](https://pulsar.apache.org/admin-rest-api/#tag/namespaces) +- [Get the list of topics associated with each namespace](https://pulsar.apache.org/admin-rest-api/#tag/persistent-topic) + - persistent topics + - persistent partitioned topics + - non-persistent topics + - non-persistent partitioned topics +- [Get the latest schema associated with each topic](https://pulsar.apache.org/admin-rest-api/#tag/schemas) + +The data is extracted on `tenant` and `namespace` basis, topics with corresponding schema (if available) are ingested as [Dataset](docs/generated/metamodel/entities/dataset.md) into Datahub. Some additional values like `schema description`, `schema_version`, `schema_type` and `partitioned` are included as `DatasetProperties`. + + +### Concept Mapping + + + + +This ingestion source maps the following Source System Concepts to DataHub Concepts: + + + + +| Source Concept | DataHub Concept | Notes | +|----------------|--------------------------------------------------------------------|---------------------------------------------------------------------------| +| `pulsar` | [Data Platform](docs/generated/metamodel/entities/dataPlatform.md) | | +| Pulsar Topic | [Dataset](docs/generated/metamodel/entities/dataset.md) | _subType_: `topic` | +| Pulsar Schema | [SchemaField](docs/generated/metamodel/entities/schemaField.md) | Maps to the fields defined within the `Avro` or `JSON` schema definition. | + + +### Supported Capabilities + + + + +| Capability | Status | Notes | +|-------------------------------------------------------|:------:|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| Data Container | ❌ | | +| [Stateful Ingestion](./stateful_ingestion.md) | ✅ | Requires recipe configuration, stateful Ingestion is available only when a Platform Instance is assigned to this source. | +| Partition Support | ✅ | Requires recipe configuration, each individual partition topic can be ingest. Behind the scenes, a partitioned topic is actually implemented as N internal topics, where N is the number of partitions. This feature is disabled by default. | +| [Platform Instance](../../docs/platform-instances.md) | ✅ | Requires recipe configuration and is mandatory for Stateful Ingestion. A Pulsar instance consists of one or more Pulsar clusters. | +| [Data Domain](../../docs/domains.md) | ✅ | Requires recipe configuration | +| Dataset Profiling | ❌ | | +| Dataset Usage | ❌ | | +| Extract Descriptions | ❌ | | +| Extract Lineage | ❌ | | +| Extract Ownership | ❌ | | +| Extract Tags | ❌ | | +| ... | | + +## Metadata Ingestion Quickstart + +For context on getting started with ingestion, check out our [metadata ingestion guide](../README.md). + +### Prerequisites + +In order to ingest metadata from Apache Pulsar, you will need: + +* Access to a Pulsar Instance, if authentication is enabled a valid access token. +* Pulsar version >= 2.7.0 +* ... + +> **_NOTE:_** A _superUser_ role is required for listing all existing tenants within a Pulsar instance. +> + +### Install the Plugin(s) + +Run the following commands to install the relevant plugin(s): + +`pip install 'acryl-datahub[pulsar]'` + +### Configure the Ingestion Recipe(s) + +Use the following recipe(s) to get started with ingestion. See [below](#config-details) for full configuration options. + +_For general pointers on writing and running a recipe, see our [main recipe guide](../README.md#recipes)._ + +#### Quickstart recipe +Getting started receipt +```yml +source: + type: pulsar + config: + # Required fields + web_service_url: "http://localhost:8080" + +sink: + # sink configs +``` + + +#### Example recipe with authentication +An example recipe for ingesting from a Pulsar instance with oauth authentication and ssl enabled. + + +```yml +source: + type: "pulsar" + config: + env: "TEST" + platform_instance: "local" + ## Pulsar client connection config ## + web_service_url: "https://localhost:8443" + verify_ssl: "/opt/certs/ca.cert.pem" + # Issuer url for auth document, for example "http://localhost:8083/realms/pulsar" + issuer_url: + client_id: ${CLIENT_ID} + client_secret: ${CLIENT_SECRET} + # Tenant list to scrape + tenants: + - tenant_1 + - tenant_2 + # Topic filter pattern + topic_patterns: + allow: + - ".*sales.*" + +sink: + # sink configs +``` + +> **_NOTE:_** Always use TLS encryption in a production environment and use variable substitution for sensitive information (e.g. ${CLIENT_ID} and ${CLIENT_SECRET}). +> + +## Config details +
+ View All Recipe Configuration Options + +Note that a `.` is used to denote nested fields in the YAML recipe. + +| Field | Required | Default | Description | +|---------------------------------|:--------:|-------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `env` | ❌ | `PROD` | The data fabric, defaults to PROD | +| `platform_instance` | ❌ | | The Platform instance to use while constructing URNs. Mandatory for Stateful Ingestion | +| `web_service_url` | ✅ | `http://localhost:8080` | The web URL for the cluster. | +| `timeout` | ❌ | `5` | Timout setting, how long to wait for the Pulsar rest api to send data before giving up | +| `verify_ssl` | ❌ | `True` | Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use. | +| `issuer_url` | ❌ | | The complete URL for a Custom Authorization Server. Mandatory for OAuth based authentication. | +| `client_id` | ❌ | | The application's client ID | +| `client_secret` | ❌ | | The application's client secret | +| `token` | ❌ | | The access token for the application. Mandatory for token based authentication. | +| `tenant_patterns.allow` | ❌ | `.*` | List of regex patterns for tenants to include in ingestion. By default all tenants are allowed. | +| `tenant_patterns.deny` | ❌ | `pulsar` | List of regex patterns for tenants to exclude from ingestion. By default the Pulsar system tenant is denied. | +| `tenant_patterns.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during tenant pattern matching. | +| `namespace_patterns.allow` | ❌ | `.*` | List of regex patterns for namespaces to include in ingestion. By default all namespaces are allowed. | +| `namespace_patterns.deny` | ❌ | `public/functions` | List of regex patterns for namespaces to exclude from ingestion. By default the functions namespace is denied. | +| `namespace_patterns.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during namespace pattern matching. | +| `topic_patterns.allow` | ❌ | `.*` | List of regex patterns for topics to include in ingestion. By default all topics are allowed. | +| `topic_patterns.deny` | ❌ | `/__.*$` | List of regex patterns for topics to exclude from ingestion. By default the Pulsar system topics are denied. | +| `topic_patterns.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during topic pattern matching. | +| `tenants` | ❌ | | Listing all tenants requires superUser role, alternative you can set a list of tenants you want to scrape using the tenant admin role | +| `exclude_individual_partitions` | ❌ | `True` | Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 `Datesets`. | +| `domain.domain_urn.allow` | ❌ | | List of regex patterns for topics to set domain_urn domain key. There can be multiple domain key specified. | +| `domain.domain_urn.deny` | ❌ | | List of regex patterns for topics to not assign domain_urn. There can be multiple domain key specified. | +| `domain.domain_urn.ignoreCase` | ❌ | `True` | Whether to ignore case sensitivity during pattern matching.There can be multiple domain key specified. | +| `stateful_ingestion` | ❌ | | see [Stateful Ingestion](./stateful_ingestion.md) | +
+ + +## Troubleshooting + +### [Common Issue] + +[Provide description of common issues with this integration and steps to resolve] \ No newline at end of file diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py new file mode 100644 index 0000000000..2e87b7a516 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -0,0 +1,642 @@ +import json +import logging +import re +from dataclasses import dataclass +from hashlib import md5 +from typing import Iterable, List, Optional, Tuple, cast + +import requests + +from datahub.configuration.common import ConfigurationError +from datahub.emitter.mce_builder import ( + make_data_platform_urn, + make_dataplatform_instance_urn, + make_dataset_urn_with_platform_instance, + make_domain_urn, +) +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import add_domain_to_entity_wu +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.extractor import schema_util +from datahub.ingestion.source.state.checkpoint import Checkpoint +from datahub.ingestion.source.state.kafka_state import KafkaCheckpointState +from datahub.ingestion.source.state.stateful_ingestion_base import ( + JobId, + StatefulIngestionSourceBase, +) +from datahub.ingestion.source_config.pulsar import PulsarSourceConfig +from datahub.ingestion.source_report.pulsar import PulsarSourceReport +from datahub.metadata.com.linkedin.pegasus2avro.common import StatusClass +from datahub.metadata.com.linkedin.pegasus2avro.schema import ( + KafkaSchema, + SchemaField, + SchemaMetadata, +) +from datahub.metadata.schema_classes import ( + BrowsePathsClass, + ChangeTypeClass, + DataPlatformInstanceClass, + DatasetPropertiesClass, + JobStatusClass, + SubTypesClass, +) + +logger = logging.getLogger(__name__) + + +class PulsarTopic(object): + __slots__ = ["topic_parts", "fullname", "type", "tenant", "namespace", "topic"] + + def __init__(self, topic): + topic_parts = re.split("[: /]", topic) + self.fullname = topic + self.type = topic_parts[0] + self.tenant = topic_parts[3] + self.namespace = topic_parts[4] + self.topic = topic_parts[5] + + +class PulsarSchema(object): + __slots__ = [ + "schema_version", + "schema_name", + "schema_description", + "schema_type", + "schema_str", + "properties", + ] + + def __init__(self, schema): + self.schema_version = schema.get("version") + + avro_schema = json.loads(schema.get("data")) + self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name") + self.schema_description = avro_schema.get("doc") + self.schema_type = schema.get("type") + self.schema_str = schema.get("data") + self.properties = schema.get("properties") + + +@dataclass +class PulsarSource(StatefulIngestionSourceBase): + def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext): + super().__init__(config, ctx) + self.platform: str = "pulsar" + self.config: PulsarSourceConfig = config + self.report: PulsarSourceReport = PulsarSourceReport() + self.base_url: str = self.config.web_service_url + "/admin/v2" + self.tenants: List[str] = config.tenants + + if ( + self.is_stateful_ingestion_configured() + and not self.config.platform_instance + ): + raise ConfigurationError( + "Enabling Pulsar stateful ingestion requires to specify a platform instance." + ) + + self.session = requests.Session() + self.session.verify = self.config.verify_ssl + self.session.headers.update( + { + "Content-Type": "application/json", + } + ) + + if self._is_oauth_authentication_configured(): + # Get OpenId configuration from issuer, e.g. token_endpoint + oid_config_url = ( + "%s/.well-known/openid-configuration" % self.config.issuer_url + ) + oid_config_response = requests.get( + oid_config_url, verify=False, allow_redirects=False + ) + + if oid_config_response: + self.config.oid_config.update(oid_config_response.json()) + else: + logger.error( + "Unexpected response while getting discovery document using %s : %s" + % (oid_config_url, oid_config_response) + ) + + if "token_endpoint" not in self.config.oid_config: + raise Exception( + "The token_endpoint is not set, please verify the configured issuer_url or" + " set oid_config.token_endpoint manually in the configuration file." + ) + + # Authentication configured + if ( + self._is_token_authentication_configured() + or self._is_oauth_authentication_configured() + ): + # Update session header with Bearer token + self.session.headers.update( + {"Authorization": f"Bearer {self.get_access_token()}"} + ) + + def get_access_token(self) -> str: + """ + Returns an access token used for authentication, token comes from config or third party provider + when issuer_url is provided + """ + # JWT, get access token (jwt) from config + if self._is_token_authentication_configured(): + return str(self.config.token) + + # OAuth, connect to issuer and return access token + if self._is_oauth_authentication_configured(): + assert self.config.client_id + assert self.config.client_secret + data = {"grant_type": "client_credentials"} + try: + # Get a token from the issuer + token_endpoint = self.config.oid_config["token_endpoint"] + logger.info(f"Request access token from {token_endpoint}") + token_response = requests.post( + url=token_endpoint, + data=data, + verify=False, + allow_redirects=False, + auth=( + self.config.client_id, + self.config.client_secret, + ), + ) + token_response.raise_for_status() + + return token_response.json()["access_token"] + + except requests.exceptions.RequestException as e: + logger.error(f"An error occurred while handling your request: {e}") + # Failed to get an access token, + raise ConfigurationError( + f"Failed to get the Pulsar access token from token_endpoint {self.config.oid_config.get('token_endpoint')}." + f" Please check your input configuration." + ) + + def _get_pulsar_metadata(self, url): + """ + Interacts with the Pulsar Admin Api and returns Pulsar metadata. Invocations with insufficient privileges + are logged. + """ + try: + # Request the Pulsar metadata + response = self.session.get(url, timeout=self.config.timeout) + response.raise_for_status() + # Return the response for status_code 200 + return response.json() + + except requests.exceptions.HTTPError as http_error: + # Topics can exist without a schema, log the warning and move on + if http_error.response.status_code == 404 and "/schemas/" in url: + message = ( + f"Failed to get schema from schema registry. The topic is either schema-less or" + f" no messages have been written to the topic yet." + f" {http_error}" + ) + self.report.report_warning("NoSchemaFound", message) + else: + # Authorization error + message = f"An HTTP error occurred: {http_error}" + self.report.report_warning("HTTPError", message) + except requests.exceptions.RequestException as e: + raise Exception( + f"An ambiguous exception occurred while handling the request: {e}" + ) + + def is_checkpointing_enabled(self, job_id: JobId) -> bool: + return job_id == ( + self.get_default_ingestion_job_id() + and self.is_stateful_ingestion_configured() + and self.config.stateful_ingestion + and self.config.stateful_ingestion.remove_stale_metadata + ) + + def get_default_ingestion_job_id(self) -> JobId: + """ + Default ingestion job name that kafka provides. + """ + return JobId("ingest_from_pulsar_source") + + def create_checkpoint(self, job_id: JobId) -> Optional[Checkpoint]: + """ + Create a custom checkpoint with empty state for the job. + """ + assert self.ctx.pipeline_name is not None + if job_id == self.get_default_ingestion_job_id(): + return Checkpoint( + job_name=job_id, + pipeline_name=self.ctx.pipeline_name, + platform_instance_id=self.get_platform_instance_id(), + run_id=self.ctx.run_id, + config=self.config, + # TODO Create a PulsarCheckpointState ? + state=KafkaCheckpointState(), + ) + return None + + def get_platform_instance_id(self) -> str: + assert self.config.platform_instance is not None + return self.config.platform_instance + + @classmethod + def create(cls, config_dict, ctx): + config = PulsarSourceConfig.parse_obj(config_dict) + + # Do not include each individual partition for partitioned topics, + if config.exclude_individual_partitions: + config.topic_patterns.deny.append(r".*-partition-[0-9]+") + + return cls(config, ctx) + + def soft_delete_dataset(self, urn: str, type: str) -> Iterable[MetadataWorkUnit]: + logger.debug(f"Soft-deleting stale entity of type {type} - {urn}.") + mcp = MetadataChangeProposalWrapper( + entityType="dataset", + entityUrn=urn, + changeType=ChangeTypeClass.UPSERT, + aspectName="status", + aspect=StatusClass(removed=True), + ) + wu = MetadataWorkUnit(id=f"soft-delete-{type}-{urn}", mcp=mcp) + self.report.report_workunit(wu) + self.report.report_stale_entity_soft_deleted(urn) + yield wu + + def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]: + last_checkpoint = self.get_last_checkpoint( + self.get_default_ingestion_job_id(), KafkaCheckpointState + ) + cur_checkpoint = self.get_current_checkpoint( + self.get_default_ingestion_job_id() + ) + if ( + self.config.stateful_ingestion + and self.config.stateful_ingestion.remove_stale_metadata + and last_checkpoint is not None + and last_checkpoint.state is not None + and cur_checkpoint is not None + and cur_checkpoint.state is not None + ): + logger.debug("Checking for stale entity removal.") + + last_checkpoint_state = cast(KafkaCheckpointState, last_checkpoint.state) + cur_checkpoint_state = cast(KafkaCheckpointState, cur_checkpoint.state) + + for topic_urn in last_checkpoint_state.get_topic_urns_not_in( + cur_checkpoint_state + ): + yield from self.soft_delete_dataset(topic_urn, "topic") + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + """ + Interacts with the Pulsar Admin Api and loops over tenants, namespaces and topics. For every topic + the schema information is retrieved if available. + + Pulsar web service admin rest api urls for retrieving topic information + - [web_service_url]/admin/v2/persistent/{tenant}/{namespace} + - [web_service_url]/admin/v2/persistent/{tenant}/{namespace}/partitioned + - [web_service_url]/admin/v2/non-persistent/{tenant}/{namespace} + - [web_service_url]/admin/v2/non-persistent/{tenant}/{namespace}/partitioned + """ + topic_urls = [ + self.base_url + "/persistent/{}", + self.base_url + "/persistent/{}/partitioned", + self.base_url + "/non-persistent/{}", + self.base_url + "/non-persistent/{}/partitioned", + ] + + # Report the Pulsar broker version we are communicating with + self.report.report_pulsar_version( + self.session.get( + "%s/brokers/version" % self.base_url, + timeout=self.config.timeout, + ).text + ) + + # If no tenants are provided, request all tenants from cluster using /admin/v2/tenants endpoint. + # Requesting cluster tenant information requires superuser privileges + if not self.tenants: + self.tenants = self._get_pulsar_metadata(self.base_url + "/tenants") or [] + + # Initialize counters + self.report.tenants_scanned = 0 + self.report.namespaces_scanned = 0 + self.report.topics_scanned = 0 + + for tenant in self.tenants: + self.report.tenants_scanned += 1 + if self.config.tenant_patterns.allowed(tenant): + # Get namespaces belonging to a tenant, /admin/v2/%s/namespaces + # A tenant admin role has sufficient privileges to perform this action + namespaces = ( + self._get_pulsar_metadata(self.base_url + "/namespaces/%s" % tenant) + or [] + ) + for namespace in namespaces: + self.report.namespaces_scanned += 1 + if self.config.namespace_patterns.allowed(namespace): + # Get all topics (persistent, non-persistent and partitioned) belonging to a tenant/namespace + # Four endpoint invocations are needs to get all topic metadata for a namespace + topics = {} + for url in topic_urls: + # Topics are partitioned when admin url ends with /partitioned + partitioned = url.endswith("/partitioned") + # Get the topics for each type + pulsar_topics = ( + self._get_pulsar_metadata(url.format(namespace)) or [] + ) + # Create a mesh of topics with partitioned values, the + # partitioned info is added as a custom properties later + topics.update( + {topic: partitioned for topic in pulsar_topics} + ) + + # For all allowed topics get the metadata + for topic, is_partitioned in topics.items(): + self.report.topics_scanned += 1 + if self.config.topic_patterns.allowed(topic): + + yield from self._extract_record(topic, is_partitioned) + # Add topic to checkpoint if stateful ingestion is enabled + if self.is_stateful_ingestion_configured(): + self._add_topic_to_checkpoint(topic) + else: + self.report.report_topics_dropped(topic) + + if self.is_stateful_ingestion_configured(): + # Clean up stale entities. + yield from self.gen_removed_entity_workunits() + + else: + self.report.report_namespaces_dropped(namespace) + else: + self.report.report_tenants_dropped(tenant) + + def _add_topic_to_checkpoint(self, topic: str) -> None: + cur_checkpoint = self.get_current_checkpoint( + self.get_default_ingestion_job_id() + ) + + if cur_checkpoint is not None: + checkpoint_state = cast(KafkaCheckpointState, cur_checkpoint.state) + checkpoint_state.add_topic_urn( + make_dataset_urn_with_platform_instance( + platform=self.platform, + name=topic, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + ) + + def _is_token_authentication_configured(self) -> bool: + if self.config.token is not None: + return True + return False + + def _is_oauth_authentication_configured(self) -> bool: + if self.config.issuer_url is not None: + return True + return False + + def _get_schema_and_fields( + self, pulsar_topic: PulsarTopic, is_key_schema: bool + ) -> Tuple[Optional[PulsarSchema], List[SchemaField]]: + + pulsar_schema: Optional[PulsarSchema] = None + + schema_url = self.base_url + "/schemas/%s/%s/%s/schema" % ( + pulsar_topic.tenant, + pulsar_topic.namespace, + pulsar_topic.topic, + ) + + schema_payload = self._get_pulsar_metadata(schema_url) + + # Get the type and schema from the Pulsar Schema + if schema_payload is not None: + # pulsar_schema: Optional[PulsarSchema] = None + pulsar_schema = PulsarSchema(schema_payload) + + # Obtain the schema fields from schema for the topic. + fields: List[SchemaField] = [] + if pulsar_schema is not None: + fields = self._get_schema_fields( + pulsar_topic=pulsar_topic, + schema=pulsar_schema, + is_key_schema=is_key_schema, + ) + return pulsar_schema, fields + + def _get_schema_fields( + self, pulsar_topic: PulsarTopic, schema: PulsarSchema, is_key_schema: bool + ) -> List[SchemaField]: + # Parse the schema and convert it to SchemaFields. + fields: List[SchemaField] = [] + if schema.schema_type == "AVRO" or schema.schema_type == "JSON": + # Extract fields from schema and get the FQN for the schema + fields = schema_util.avro_schema_to_mce_fields( + schema.schema_str, is_key_schema=is_key_schema + ) + else: + self.report.report_warning( + pulsar_topic.fullname, + f"Parsing Pulsar schema type {schema.schema_type} is currently not implemented", + ) + return fields + + def _get_schema_metadata( + self, pulsar_topic: PulsarTopic, platform_urn: str + ) -> Tuple[Optional[PulsarSchema], Optional[SchemaMetadata]]: + + schema, fields = self._get_schema_and_fields( + pulsar_topic=pulsar_topic, is_key_schema=False + ) # type: Tuple[Optional[PulsarSchema], List[SchemaField]] + + # Create the schemaMetadata aspect. + if schema is not None: + md5_hash = md5(schema.schema_str.encode()).hexdigest() + + return schema, SchemaMetadata( + schemaName=schema.schema_name, + version=schema.schema_version, + hash=md5_hash, + platform=platform_urn, + platformSchema=KafkaSchema( + documentSchema=schema.schema_str if schema is not None else "", + keySchema=None, + ), + fields=fields, + ) + return None, None + + def _extract_record( + self, topic: str, partitioned: bool + ) -> Iterable[MetadataWorkUnit]: + logger.info(f"topic = {topic}") + + # 1. Create and emit the default dataset for the topic. Extract type, tenant, namespace + # and topic name from full Pulsar topic name i.e. persistent://tenant/namespace/topic + pulsar_topic = PulsarTopic(topic) + + platform_urn = make_data_platform_urn(self.platform) + dataset_urn = make_dataset_urn_with_platform_instance( + platform=self.platform, + name=pulsar_topic.fullname, + platform_instance=self.config.platform_instance, + env=self.config.env, + ) + + status_wu = MetadataWorkUnit( + id=f"{dataset_urn}-status", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="status", + aspect=StatusClass(removed=False), + ), + ) + self.report.report_workunit(status_wu) + yield status_wu + + # 2. Emit schemaMetadata aspect + schema, schema_metadata = self._get_schema_metadata(pulsar_topic, platform_urn) + if schema_metadata is not None: + schema_metadata_wu = MetadataWorkUnit( + id=f"{dataset_urn}-schemaMetadata", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="schemaMetadata", + aspect=schema_metadata, + ), + ) + self.report.report_workunit(schema_metadata_wu) + yield schema_metadata_wu + + # TODO Add topic properties (Pulsar 2.10.0 feature) + # 3. Construct and emit dataset properties aspect + if schema is not None: + schema_properties = { + "schema_version": str(schema.schema_version), + "schema_type": schema.schema_type, + "partitioned": str(partitioned).lower(), + } + # Add some static properties to the schema properties + schema.properties.update(schema_properties) + + dataset_properties_wu = MetadataWorkUnit( + id=f"{dataset_urn}-datasetProperties", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="datasetProperties", + aspect=DatasetPropertiesClass( + description=schema.schema_description, + customProperties=schema.properties, + ), + ), + ) + self.report.report_workunit(dataset_properties_wu) + yield dataset_properties_wu + + # 4. Emit browsePaths aspect + pulsar_path = ( + f"{pulsar_topic.tenant}/{pulsar_topic.namespace}/{pulsar_topic.topic}" + ) + browse_path_suffix = ( + f"{self.config.platform_instance}/{pulsar_path}" + if self.config.platform_instance + else pulsar_path + ) + + browse_path_wu = MetadataWorkUnit( + id=f"{dataset_urn}-browsePaths", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="browsePaths", + aspect=BrowsePathsClass( + [f"/{self.config.env.lower()}/{self.platform}/{browse_path_suffix}"] + ), + ), + ) + self.report.report_workunit(browse_path_wu) + yield browse_path_wu + + # 5. Emit dataPlatformInstance aspect. + if self.config.platform_instance: + platform_instance_wu = MetadataWorkUnit( + id=f"{dataset_urn}-dataPlatformInstance", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="dataPlatformInstance", + aspect=DataPlatformInstanceClass( + platform=platform_urn, + instance=make_dataplatform_instance_urn( + self.platform, self.config.platform_instance + ), + ), + ), + ) + self.report.report_workunit(platform_instance_wu) + yield platform_instance_wu + + # 6. Emit subtype aspect marking this as a "topic" + subtype_wu = MetadataWorkUnit( + id=f"{dataset_urn}-subTypes", + mcp=MetadataChangeProposalWrapper( + entityType="dataset", + changeType=ChangeTypeClass.UPSERT, + entityUrn=dataset_urn, + aspectName="subTypes", + aspect=SubTypesClass(typeNames=["topic"]), + ), + ) + self.report.report_workunit(subtype_wu) + yield subtype_wu + + # 7. Emit domains aspect + domain_urn: Optional[str] = None + for domain, pattern in self.config.domain.items(): + if pattern.allowed(pulsar_topic.fullname): + domain_urn = make_domain_urn(domain) + + if domain_urn: + wus = add_domain_to_entity_wu( + entity_type="dataset", + entity_urn=dataset_urn, + domain_urn=domain_urn, + ) + for wu in wus: + self.report.report_workunit(wu) + yield wu + + def get_report(self): + return self.report + + def update_default_job_run_summary(self) -> None: + summary = self.get_job_run_summary(self.get_default_ingestion_job_id()) + if summary is not None: + # For now just add the config and the report. + summary.config = self.config.json() + summary.custom_summary = self.report.as_string() + summary.runStatus = ( + JobStatusClass.FAILED + if self.get_report().failures + else JobStatusClass.COMPLETED + ) + + def close(self): + self.update_default_job_run_summary() + self.prepare_for_commit() + self.session.close() diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py new file mode 100644 index 0000000000..dc3276440a --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py @@ -0,0 +1,111 @@ +import re +from typing import Dict, List, Optional, Union +from urllib.parse import urlparse + +from pydantic import Field, validator + +from datahub.configuration.common import AllowDenyPattern, ConfigurationError +from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigBase +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionConfig, + StatefulIngestionConfigBase, +) +from datahub.utilities import config_clean + + +class PulsarSourceStatefulIngestionConfig(StatefulIngestionConfig): + """ + Specialization of the basic StatefulIngestionConfig to add custom config. + This will be used to override the stateful_ingestion config param of StatefulIngestionConfigBase + in the PulsarSourceConfig. + """ + + remove_stale_metadata: bool = True + + +def _is_valid_hostname(hostname: str) -> bool: + """ + Loosely ascii hostname validation. A hostname is considered valid when the total length does not exceed 253 + characters, contains valid characters and are max 63 octets per label. + """ + if len(hostname) > 253: + return False + # Hostnames ending on a dot are valid, if present strip exactly one + if hostname[-1] == ".": + hostname = hostname[:-1] + allowed = re.compile(r"(?!-)[A-Z\d-]{1,63}(? Optional[str]: + if token is not None and values.get("issuer_url") is not None: + raise ConfigurationError( + "Expected only one authentication method, either issuer_url or token." + ) + return token + + @validator("client_secret", always=True) + def ensure_client_id_and_secret_for_issuer_url( + cls, client_secret: Optional[str], values: Dict[str, Optional[str]] + ) -> Optional[str]: + if values.get("issuer_url") is not None and ( + client_secret is None or values.get("client_id") is None + ): + raise ConfigurationError( + "Missing configuration: client_id and client_secret are mandatory when issuer_url is set." + ) + return client_secret + + @validator("web_service_url") + def web_service_url_scheme_host_port(cls, val: str) -> str: + # Tokenize the web url + url = urlparse(val) + + if url.scheme not in ["http", "https"]: + raise ConfigurationError( + f"Scheme should be http or https, found {url.scheme}" + ) + + if not _is_valid_hostname(url.hostname.__str__()): + raise ConfigurationError( + f"Not a valid hostname, hostname contains invalid characters, found {url.hostname}" + ) + + return config_clean.remove_trailing_slashes(val) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source_report/pulsar.py new file mode 100644 index 0000000000..8d987a4527 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source_report/pulsar.py @@ -0,0 +1,33 @@ +from dataclasses import dataclass, field +from typing import List, Optional + +from datahub.ingestion.source.state.stateful_ingestion_base import ( + StatefulIngestionReport, +) + + +@dataclass +class PulsarSourceReport(StatefulIngestionReport): + pulsar_version: Optional[str] = None + tenants_scanned: Optional[int] = None + namespaces_scanned: Optional[int] = None + topics_scanned: Optional[int] = None + tenants_filtered: List[str] = field(default_factory=list) + namespaces_filtered: List[str] = field(default_factory=list) + topics_filtered: List[str] = field(default_factory=list) + soft_deleted_stale_entities: List[str] = field(default_factory=list) + + def report_pulsar_version(self, version: str) -> None: + self.pulsar_version = version + + def report_tenants_dropped(self, tenant: str) -> None: + self.tenants_filtered.append(tenant) + + def report_namespaces_dropped(self, namespace: str) -> None: + self.namespaces_filtered.append(namespace) + + def report_topics_dropped(self, topic: str) -> None: + self.topics_filtered.append(topic) + + def report_stale_entity_soft_deleted(self, urn: str) -> None: + self.soft_deleted_stale_entities.append(urn) diff --git a/metadata-ingestion/tests/unit/test_pulsar_source.py b/metadata-ingestion/tests/unit/test_pulsar_source.py new file mode 100644 index 0000000000..a565b9908e --- /dev/null +++ b/metadata-ingestion/tests/unit/test_pulsar_source.py @@ -0,0 +1,239 @@ +import unittest +from typing import Any, Dict +from unittest.mock import patch + +import pytest + +from datahub.configuration.common import ConfigurationError +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.pulsar import ( + PulsarSchema, + PulsarSource, + PulsarSourceConfig, + PulsarTopic, +) + +mock_schema_response: Dict[str, Any] = { + "version": 1, + "type": "AVRO", + "timestamp": 0, + "data": '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}', + "properties": {"__jsr310ConversionEnabled": "false", "__alwaysAllowNull": "true"}, +} + + +class TestPulsarSourceConfig: + def test_pulsar_source_config_valid_web_service_url(self): + assert ( + PulsarSourceConfig().web_service_url_scheme_host_port( + "http://localhost:8080/" + ) + == "http://localhost:8080" + ) + + def test_pulsar_source_config_invalid_web_service_url_scheme(self): + with pytest.raises( + ConfigurationError, match=r"Scheme should be http or https, found ftp" + ): + PulsarSourceConfig().web_service_url_scheme_host_port( + "ftp://localhost:8080/" + ) + + def test_pulsar_source_config_invalid_web_service_url_host(self): + with pytest.raises( + ConfigurationError, + match=r"Not a valid hostname, hostname contains invalid characters, found localhost&", + ): + PulsarSourceConfig().web_service_url_scheme_host_port( + "http://localhost&:8080/" + ) + + +class TestPulsarTopic: + def test_pulsar_source_parse_topic_string(self) -> None: + topic = "persistent://tenant/namespace/topic" + pulsar_topic = PulsarTopic(topic) + assert pulsar_topic.type == "persistent" + assert pulsar_topic.tenant == "tenant" + assert pulsar_topic.namespace == "namespace" + assert pulsar_topic.topic == "topic" + assert pulsar_topic.fullname == "persistent://tenant/namespace/topic" + + +class TestPulsarSchema: + def test_pulsar_source_parse_pulsar_schema(self) -> None: + pulsar_schema = PulsarSchema(mock_schema_response) + assert pulsar_schema.schema_type == "AVRO" + assert ( + pulsar_schema.schema_str + == '{"type":"record","name":"FooSchema","namespace":"foo.bar","doc":"Description of FooSchema","fields":[{"name":"field1","type":{"type":"string","avro.java.string":"String"},"doc":"Description of field1"},{"name":"field2","type":"long","doc":"Some description","default":0}]}' + ) + assert pulsar_schema.schema_name == "foo.bar.FooSchema" + assert pulsar_schema.schema_version == 1 + assert pulsar_schema.schema_description == "Description of FooSchema" + assert pulsar_schema.properties == { + "__jsr310ConversionEnabled": "false", + "__alwaysAllowNull": "true", + } + + +class TestPulsarSource(unittest.TestCase): + def test_pulsar_source_get_token_jwt(self): + ctx = PipelineContext(run_id="test") + pulsar_source = PulsarSource.create( + {"web_service_url": "http://localhost:8080", "token": "jwt_token"}, + ctx, + ) + # source = PulsarSource( + # ctx=PipelineContext(run_id="pulsar-source-test"), + # config=self.token_config) + assert pulsar_source.get_access_token() == "jwt_token" + + @patch("datahub.ingestion.source.pulsar.requests.get", autospec=True) + @patch("datahub.ingestion.source.pulsar.requests.post", autospec=True) + def test_pulsar_source_get_token_oauth(self, mock_post, mock_get): + ctx = PipelineContext(run_id="test") + mock_get.return_value.json.return_value = { + "token_endpoint": "http://127.0.0.1:8083/realms/pulsar/protocol/openid-connect/token" + } + + pulsar_source = PulsarSource.create( + { + "web_service_url": "http://localhost:8080", + "issuer_url": "http://localhost:8083/realms/pulsar", + "client_id": "client_id", + "client_secret": "client_secret", + }, + ctx, + ) + mock_post.return_value.json.return_value = {"access_token": "oauth_token"} + assert pulsar_source.get_access_token() == "oauth_token" + + @patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True) + def test_pulsar_source_get_workunits_all_tenant(self, mock_session): + ctx = PipelineContext(run_id="test") + pulsar_source = PulsarSource.create( + { + "web_service_url": "http://localhost:8080", + }, + ctx, + ) + + # Mock fetching Pulsar metadata + with patch( + "datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata" + ) as mock: + mock.side_effect = [ + ["t_1"], # tenant list + ["t_1/ns_1"], # namespaces list + ["persistent://t_1/ns_1/topic_1"], # persistent topic list + [], # persistent partitioned topic list + [], # none-persistent topic list + [], # none-persistent partitioned topic list + mock_schema_response, + ] # schema for persistent://t_1/ns_1/topic + + work_units = list(pulsar_source.get_workunits()) + first_mcp = work_units[0].metadata + assert isinstance(first_mcp, MetadataChangeProposalWrapper) + + # Expected calls 7 + # http://localhost:8080/admin/v2/tenants + # http://localhost:8080/admin/v2/namespaces/t_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema + assert mock.call_count == 7 + # expecting 5 mcp for one topic with default config + assert len(work_units) == 5 + + @patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True) + def test_pulsar_source_get_workunits_custom_tenant(self, mock_session): + ctx = PipelineContext(run_id="test") + pulsar_source = PulsarSource.create( + { + "web_service_url": "http://localhost:8080", + "tenants": ["t_1", "t_2"], + }, + ctx, + ) + + # Mock fetching Pulsar metadata + with patch( + "datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata" + ) as mock: + mock.side_effect = [ + ["t_1/ns_1"], # namespaces list + ["persistent://t_1/ns_1/topic_1"], # topic list + [], # empty persistent partitioned topic list + [], # empty none-persistent topic list + [], # empty none-persistent partitioned topic list + mock_schema_response, # schema for persistent://t_1/ns_1/topic + [], # no namespaces for tenant t_2 + ] + + work_units = list(pulsar_source.get_workunits()) + first_mcp = work_units[0].metadata + assert isinstance(first_mcp, MetadataChangeProposalWrapper) + + # Expected calls 7 + # http://localhost:8080/admin/v2/namespaces/t_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema + # http://localhost:8080/admin/v2/namespaces/t_2 + assert mock.call_count == 7 + # expecting 5 mcp for one topic with default config + assert len(work_units) == 5 + + @patch("datahub.ingestion.source.pulsar.requests.Session.get", autospec=True) + def test_pulsar_source_get_workunits_patterns(self, mock_session): + ctx = PipelineContext(run_id="test") + pulsar_source = PulsarSource.create( + { + "web_service_url": "http://localhost:8080", + "tenants": ["t_1", "t_2", "bad_t_3"], + "tenant_patterns": {"deny": ["bad_t_3"]}, + "namespace_patterns": {"allow": [r"t_1/ns_1"]}, + "topic_patterns": {"allow": [r"persistent://t_1/ns_1/topic_1"]}, + }, + ctx, + ) + + # Mock fetching Pulsar metadata + with patch( + "datahub.ingestion.source.pulsar.PulsarSource._get_pulsar_metadata" + ) as mock: + mock.side_effect = [ + ["t_1/ns_1", "t_2/ns_1"], # namespaces list + [ + "persistent://t_1/ns_1/topic_1", # persistent topic list + "non-persistent://t_1/ns_1/bad_topic", + ], # topic will be filtered out + [], # persistent partitioned topic list + [], # none-persistent topic list + [], # none-persistent partitioned topic list + mock_schema_response, # schema for persistent://t_1/ns_1/topic + [], # no namespaces for tenant t_2 + ] + + work_units = list(pulsar_source.get_workunits()) + first_mcp = work_units[0].metadata + assert isinstance(first_mcp, MetadataChangeProposalWrapper) + + # Expected calls 7 + # http://localhost:8080/admin/v2/namespaces/t_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1 + # http://localhost:8080/admin/v2/non-persistent/t_1/ns_1/partitioned + # http://localhost:8080/admin/v2/schemas/t_1/ns_1/topic_1/schema + # http://localhost:8080/admin/v2/namespaces/t_2 + assert mock.call_count == 7 + # expecting 5 mcp for one topic with default config + assert len(work_units) == 5 diff --git a/metadata-service/war/src/main/resources/boot/data_platforms.json b/metadata-service/war/src/main/resources/boot/data_platforms.json index 2e3d783227..67fa55dd56 100644 --- a/metadata-service/war/src/main/resources/boot/data_platforms.json +++ b/metadata-service/war/src/main/resources/boot/data_platforms.json @@ -456,6 +456,16 @@ "logoUrl": "/assets/platforms/trinologo.png" } }, + { + "urn": "urn:li:dataPlatform:pulsar", + "aspect": { + "datasetNameDelimiter": ".", + "name": "pulsar", + "displayName": "Pulsar", + "type": "MESSAGE_BROKER", + "logoUrl": "/assets/platforms/pulsarlogo.png" + } + }, { "urn": "urn:li:dataPlatform:unknown", "aspect": {