chore: remove unnecessary modules from codebase (#5420)

This commit is contained in:
Shirshanka Das 2022-07-17 22:02:45 -07:00 committed by GitHub
parent f8d059901f
commit 238fca5191
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 1 additions and 3648 deletions

View File

@ -1,72 +0,0 @@
# datahub Ingestion Tool
## Introduction
some tool to ingestion [jdbc-database-schema] and [etl-lineage] metadata.
i split the ingestion procedure to two part: [datahub-producer] and different [metadata-generator]
## Roadmap
- [X] datahub-producer load json avro data.
- [X] add lineage-hive generator
- [X] add dataset-jdbc generator[include [mysql, mssql, postgresql, oracle] driver]
- [X] add dataset-hive generator
- [ ] *> add lineage-oracle generator
- [ ] enhance lineage-jdbc generator to lazy iterator mode.
- [ ] enchance avro parser to show error information
## Quickstart
1. install nix and channel
```
sudo install -d -m755 -o $(id -u) -g $(id -g) /nix
curl https://nixos.org/nix/install | sh
nix-channel --add https://nixos.org/channels/nixos-20.03 nixpkgs
nix-channel --update nixpkgs
```
2. [optional] you can download specified dependency in advanced, or it will automatically download at run time.
```
nix-shell bin/[datahub-producer].hs.nix
nix-shell bin/[datahub-producer].py.nix
...
```
3. load json data to datahub
```
cat sample/mce.json.dat | bin/datahub-producer.hs config
```
4. parse hive sql to datahub
```
ls sample/hive_*.sql | bin/lineage_hive_generator.hs | bin/datahub-producer.hs config
```
5. load jdbc schema(mysql, mssql, postgresql, oracle) to datahub
```
bin/dataset-jdbc-generator.hs | bin/datahub-producer.hs config
```
6. load hive schema to datahub
```
bin/dataset-hive-generator.py | bin/datahub-producer.hs config
```
## Reference
- hive/presto/vertica SQL Parser
uber/queryparser [https://github.com/uber/queryparser.git]
- oracle procedure syntax
https://docs.oracle.com/cd/E11882_01/server.112/e41085/sqlqr01001.htm#SQLQR110
- postgresql procedure parser
SQream/hssqlppp [https://github.com/JakeWheat/hssqlppp.git]

View File

@ -1,174 +0,0 @@
#! /usr/bin/env nix-shell
#! nix-shell datahub-producer.hs.nix -i runghc
{-# LANGUAGE OverloadedStrings, FlexibleInstances, FlexibleContexts, ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
import System.Environment (getArgs)
import System.Directory (canonicalizePath)
import Data.Typeable (Typeable)
import Data.Functor ((<&>))
import Control.Arrow (left, right)
import Control.Monad ((>=>), when)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Catch (Exception, MonadThrow(..))
import qualified Data.ByteString.Lazy as B
import qualified Data.ByteString.Lazy.Char8 as BC
import qualified Data.Text as T
import qualified Data.Aeson as J
import Data.String.Conversions (cs)
import qualified Data.Binary as BIN
import Data.HashMap.Strict ((!))
import qualified Data.HashMap.Strict as MS
import qualified Data.Vector as V
import Control.Lens ((^?), (^..), folded, _Just)
import Data.Aeson.Lens (key, _Array, _String)
import qualified Data.Avro.Types as A (Value(..))
import qualified Data.Avro as A (Schema, Result(..))
import qualified Data.Avro.Schema as AS (
Schema(..), resultToEither, buildTypeEnvironment
, renderFullname, parseFullname, typeName, parseAvroJSON
)
-- import Data.Avro.JSON (decodeAvroJSON)
import Data.Avro.Encode (encodeAvro)
import Data.Avro.Decode (decodeAvro)
-- import Data.Avro.Deriving (makeSchema)
import Kafka.Avro (
SchemaRegistry(..), Subject(..), SchemaId(..)
, schemaRegistry, sendSchema
, extractSchemaId, loadSchema
)
import Data.Conduit (ConduitT, ZipSink(..), getZipSink, runConduitRes, runConduit, bracketP, (.|), yield)
import qualified Data.Conduit.Combinators as C
import Kafka.Conduit.Sink (ProducerRecord(..), TopicName(..), ProducePartition(..), BrokerAddress(..), kafkaSink, brokersList)
import Network.URI (parseURI)
import Network.URI.Lens (uriAuthorityLens, uriRegNameLens, uriPortLens)
import System.Process (readProcess)
data StringException = StringException String deriving (Typeable, Show)
instance Exception StringException
decodeAvroJSON :: A.Schema -> J.Value -> A.Result (A.Value A.Schema)
decodeAvroJSON schema json =
AS.parseAvroJSON union env schema json
where
env =
AS.buildTypeEnvironment missing schema
missing name =
fail ("Type " <> show name <> " not in schema")
union (AS.Union schemas) J.Null
| AS.Null `elem` schemas =
pure $ A.Union schemas AS.Null A.Null
| otherwise =
fail "Null not in union."
union (AS.Union schemas) (J.Object obj)
| null obj =
fail "Invalid encoding of union: empty object ({})."
| length obj > 1 =
fail ("Invalid encoding of union: object with too many fields:" ++ show obj)
| otherwise =
let
canonicalize name
| isBuiltIn name = name
| otherwise = AS.renderFullname $ AS.parseFullname name
branch =
head $ MS.keys obj
names =
MS.fromList [(AS.typeName t, t) | t <- V.toList schemas]
in case MS.lookup (canonicalize branch) names of
Just t -> do
nested <- AS.parseAvroJSON union env t (obj ! branch)
return (A.Union schemas t nested)
Nothing -> fail ("Type '" <> T.unpack branch <> "' not in union: " <> show schemas)
union AS.Union{} _ =
A.Error "Invalid JSON representation for union: has to be a JSON object with exactly one field."
union _ _ =
error "Impossible: function given non-union schema."
isBuiltIn name = name `elem` [ "null", "boolean", "int", "long", "float"
, "double", "bytes", "string", "array", "map" ]
fromRight :: (MonadThrow m, Show a) => String -> Either a b -> m b
fromRight label = either (throwM . StringException . (label ++) . show) return
fromJust :: (MonadThrow m, Show a) => String -> Maybe a -> m a
fromJust label = maybe (throwM . StringException $ (label ++ "value is missing") ) return
encodeJsonWithSchema :: (MonadIO m, MonadThrow m)
=> SchemaRegistry
-> Subject
-> A.Schema
-> J.Value
-> m B.ByteString
encodeJsonWithSchema sr subj schema json = do
v <- fromRight "[decodeAvroJSON]" $ AS.resultToEither $ decodeAvroJSON schema json
mbSid <- fromRight "[SchemaRegistry.sendSchema]"=<< sendSchema sr subj schema
return $ appendSchemaId v mbSid
where appendSchemaId v (SchemaId sid)= B.cons (toEnum 0) (BIN.encode sid) <> (encodeAvro v)
decodeJsonWithSchema :: (MonadIO m, MonadThrow m)
=> SchemaRegistry
-> B.ByteString
-> m J.Value
decodeJsonWithSchema sr bs = do
(sid, payload) <- maybe (throwM . StringException $ "BadPayloadNoSchemaId") return $ extractSchemaId bs
schema <- fromRight "[SchemaRegistry.loadSchema]" =<< loadSchema sr sid
J.toJSON <$> (fromRight "[Avro.decodeAvro]" $ decodeAvro schema payload)
parseNixJson :: FilePath -> IO J.Value
parseNixJson f = do
stdout :: String <- read <$> readProcess "nix-instantiate" ["--eval", "--expr", "builtins.toJSON (import " ++ f ++ ")"] ""
fromRight "[Aeson.eitherDecode] parse nix json" (J.eitherDecode (cs stdout))
main :: IO ()
main = do
args <- getArgs
when (length args /= 1) $
error " datahub-producer.hs [config-dir]"
confDir <- canonicalizePath (head args)
putStrLn ("confDir:" <> confDir)
confJson <- parseNixJson (confDir <> "/" <> "datahub-config.nix")
-- putStrLn ("confJson: " ++ show confJson)
schema <- fromRight "[Aeson.eitherDecode] parse asvc file:" =<<
J.eitherDecode <$> B.readFile (confDir <> "/" <> "MetadataChangeEvent.avsc")
-- putStrLn ("schema: " ++ show schema)
let
topic = "MetadataChangeEvent"
-- schema = $(makeSchema "../MetadataChangeEvent.avsc")
sandboxL = key "services".key "linkedin-datahub-pipeline".key "sandbox"
urisL = key "uris". _Array.folded._String
brokers = confJson ^.. sandboxL.key "kafka".urisL
srs = confJson ^.. sandboxL.key "schema-registry".urisL
brokers' = map (\uriText -> BrokerAddress . cs . concat $ parseURI (cs uriText) ^.. _Just.uriAuthorityLens._Just.(uriRegNameLens <> uriPortLens)) brokers
contents <- B.getContents <&> BC.lines
sr <- schemaRegistry (cs (head srs))
putStrLn " ==> beginning to send data..."
runConduitRes $ C.yieldMany contents
.| C.mapM (fromRight "[JSON.eitherDecode] read json record:". J.eitherDecode)
-- .| C.iterM (liftIO . putStrLn. cs . J.encode)
.| C.mapM (encodeJsonWithSchema sr (Subject (topic <> "-value")) schema)
-- .| C.iterM (decodeJsonWithSchema sr >=> liftIO . print . J.encode)
.| C.map (mkRecord (TopicName topic))
.| getZipSink ( ZipSink (kafkaSink (brokersList brokers')) *>
ZipSink ((C.length >>= yield) .| C.iterM (\n -> liftIO $ putStrLn ("total table num:" <> show n)) .| C.sinkNull))
return ()
where
mkRecord :: TopicName -> B.ByteString -> ProducerRecord
mkRecord topic bs = ProducerRecord topic UnassignedPartition Nothing (Just (cs bs))

View File

@ -1,16 +0,0 @@
with import <nixpkgs> {} ;
let
in
mkShell {
buildInputs = [
(haskellPackages.ghcWithPackages ( p:
[ p.bytestring p.string-conversions
p.exceptions
p.network-uri p.directory
p.lens p.aeson p.lens-aeson p.avro p.hw-kafka-avro
p.hw-kafka-client
p.conduit p.hw-kafka-conduit
]
))
];
}

View File

@ -1,66 +0,0 @@
#! /usr/bin/env nix-shell
#! nix-shell dataset-hive-generator.py.nix -i python
import sys
import time
from pyhive import hive
from TCLIService.ttypes import TOperationState
import simplejson as json
HIVESTORE='localhost'
AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc'
KAFKATOPIC = 'MetadataChangeEvent_v4'
BOOTSTRAP = 'localhost:9092'
SCHEMAREGISTRY = 'http://localhost:8081'
def hive_query(query):
"""
Execute the query to the HiveStore.
"""
cursor = hive.connect(HIVESTORE).cursor()
cursor.execute(query, async_=True)
status = cursor.poll().operationState
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
logs = cursor.fetch_logs()
for message in logs:
sys.stdout.write(message)
status = cursor.poll().operationState
results = cursor.fetchall()
return results
def build_hive_dataset_mce(dataset_name, schema, metadata):
"""
Create the MetadataChangeEvent via dataset_name and schema.
"""
actor, type, created_time, upstreams_dataset, sys_time = "urn:li:corpuser:" + metadata[2][7:], str(metadata[-1][11:-1]), int(metadata[3][12:]), metadata[-28][10:], int(time.time())
owners = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":sys_time,"actor":actor}}
upstreams = {"upstreams":[{"auditStamp":{"time":sys_time,"actor":actor},"dataset":"urn:li:dataset:(urn:li:dataPlatform:hive," + upstreams_dataset + ",PROD)","type":"TRANSFORMED"}]}
elements = {"elements":[{"url":HIVESTORE,"description":"sample doc to describe upstreams","createStamp":{"time":sys_time,"actor":actor}}]}
schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:hive","version":0,"created":{"time":created_time,"actor":actor},
"lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.OtherSchema": {"rawSchema": schema}},
"fields":[{"fieldPath":"","description":{"string":""},"nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]}
mce = {"auditHeader": None,
"proposedSnapshot":{"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot":
{"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,"+ dataset_name +",PROD)"
,"aspects": [
{"com.linkedin.pegasus2avro.common.Ownership": owners}
, {"com.linkedin.pegasus2avro.dataset.UpstreamLineage": upstreams}
, {"com.linkedin.pegasus2avro.common.InstitutionalMemory": elements}
, {"com.linkedin.pegasus2avro.schema.SchemaMetadata": schema_name}
]}},
"proposedDelta": None}
print(json.dumps(mce))
databases = hive_query('show databases')
for database in databases:
tables = hive_query('show tables in ' + database[0])
for table in tables:
dataset_name = database[0] + '.' + table[0]
description = hive_query('describe extended ' + dataset_name)
build_hive_dataset_mce(dataset_name, str(description[:-1][:-1]), description[-1][1].split(','))
sys.exit(0)

View File

@ -1,61 +0,0 @@
with import <nixpkgs> {} ;
let
avro-python3-1_8 = python3Packages.buildPythonPackage rec {
pname = "avro-python3" ;
version = "1.8.2" ;
src = python3Packages.fetchPypi {
inherit pname version ;
sha256 = "f82cf0d66189600b1e6b442f650ad5aca6c189576723dcbf6f9ce096eab81bd6" ;
} ;
doCheck = false;
} ;
sasl = python3Packages.buildPythonPackage rec {
pname = "sasl" ;
version = "0.2.1" ;
src = python3Packages.fetchPypi {
inherit pname version ;
sha256 = "04f22e17bbebe0cd42471757a48c2c07126773c38741b1dad8d9fe724c16289d" ;
} ;
doCheck = false;
propagatedBuildInputs = [ cyrus_sasl ] ++ (with python3Packages ; [six]) ;
} ;
thrift_sasl = python3Packages.buildPythonPackage rec {
pname = "thrift_sasl" ;
version = "0.4.2" ;
src = python3Packages.fetchPypi {
inherit pname version ;
sha256 = "6a1c54731cb3ce61bdc041d9dc36e21f0f56db0163bb7b57be84de3fda70922f" ;
} ;
doCheck = false;
propagatedBuildInputs = with python3Packages; [ thrift sasl ] ;
} ;
PyHive = python3Packages.buildPythonPackage rec {
pname = "PyHive" ;
version = "0.6.1" ;
src = python3Packages.fetchPypi {
inherit pname version ;
sha256 = "a5f2b2f8bcd85a8cd80ab64ff8fbfe1c09515d266650a56f789a8d89ad66d7f4" ;
} ;
doCheck = false;
propagatedBuildInputs = with python3Packages ; [ dateutil future thrift sasl thrift_sasl ];
} ;
in
mkShell {
buildInputs = (with python3Packages ;[
python
requests
PyHive
simplejson
# avro-python3-1_8
# confluent-kafka
]) ;
}

View File

@ -1,213 +0,0 @@
#! /usr/bin/env nix-shell
#! nix-shell dataset-jdbc-generator.hs.nix -i "runghc --ghc-arg=-fobject-code"
{-# LANGUAGE OverloadedStrings, FlexibleInstances, FlexibleContexts, ScopedTypeVariables #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE TemplateHaskell, QuasiQuotes #-}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -fplugin=Language.Java.Inline.Plugin #-}
import System.Environment (lookupEnv)
import System.IO (hPrint, stderr, hSetEncoding, stdout, utf8)
import qualified Language.Haskell.TH.Syntax as TH
import Control.Concurrent (runInBoundThread)
import Language.Java (J, withJVM, reify, reflect, JType(..))
import Language.Java.Inline (imports, java)
import qualified Data.Text as T
import qualified Data.Text.IO as T
import Data.String.Conversions (cs)
import Text.InterpolatedString.Perl6 (q)
import Prelude hiding ((>>=), (>>))
import Data.Conduit (ConduitT, ZipSink(..), getZipSink, runConduitRes, runConduit, bracketP, (.|), yield)
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as C (groupBy)
import qualified Data.Aeson as J
import Control.Arrow ((>>>))
import Data.Aeson.QQ (aesonQQ)
import Text.Printf (printf)
imports "java.util.*"
imports "java.sql.*"
datasetOracleSql :: T.Text
datasetOracleSql = [q|
select
c.OWNER || '.' || c.TABLE_NAME as schema_name
, t.COMMENTS as schema_description
, c.COLUMN_NAME as field_path
, c.DATA_TYPE as native_data_type
, m.COMMENTS as description
from ALL_TAB_COLUMNS c
left join ALL_TAB_COMMENTS t
on c.OWNER = t.OWNER
and c.TABLE_NAME = t.TABLE_NAME
left join ALL_COL_COMMENTS m
on c.OWNER = m.OWNER
and c.TABLE_NAME = m.TABLE_NAME
and c.COLUMN_NAME = m.COLUMN_NAME
where NOT REGEXP_LIKE(c.OWNER, 'ANONYMOUS|PUBLIC|SYS|SYSTEM|DBSNMP|MDSYS|CTXSYS|XDB|TSMSYS|ORACLE.*|APEX.*|TEST?*|GG_.*|\\$')
order by schema_name, c.COLUMN_ID
|]
datasetMysqlSql :: T.Text
datasetMysqlSql = [q|
select
concat(c.TABLE_SCHEMA, '.', c.TABLE_NAME) as schema_name
, NULL as schema_description
, c.COLUMN_NAME as field_path
, c.DATA_TYPE as native_data_type
, c.COLUMN_COMMENT as description
from information_schema.columns c
where table_schema not in ('information_schema')
order by schema_name, c.ORDINAL_POSITION
|]
datasetPostgresqlSql :: T.Text
datasetPostgresqlSql = [q|
SELECT
c.table_schema || '.' || c.table_name as schema_name
, pgtd.description as schema_description
, c.column_name as field_path
, c.data_type as native_data_type
, pgcd.description as description
FROM INFORMATION_SCHEMA.COLUMNS c
INNER JOIN
pg_catalog.pg_statio_all_tables as st on c.table_schema=st.schemaname and c.table_name=st.relname
LEFT JOIN
pg_catalog.pg_description pgcd on pgcd.objoid=st.relid and pgcd.objsubid=c.ordinal_position
LEFT JOIN
pg_catalog.pg_description pgtd on pgtd.objoid=st.relid and pgtd.objsubid=0
WHERE c.table_schema NOT IN ('information_schema', 'pg_catalog')
ORDER by schema_name, ordinal_position ;
|]
mkMCE :: Int -> T.Text -> [[T.Text]] -> J.Value
mkMCE ts platform fields@((schemaName:schemaDescription:_):_) = [aesonQQ|
{ "proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": #{urn}
, "aspects": [
{ "com.linkedin.pegasus2avro.common.Ownership": {
"owners": [{"owner": "urn:li:corpuser:datahub", "type":"DATAOWNER"}]
, "lastModified": {"time": #{ts}, "actor": "urn:li:corpuser:datahub"}
}
}
, { "com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"description": {"string": #{schemaDescription}}
}
}
, { "com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": #{schemaName}
, "platform": "urn:li:dataPlatform:"
, "version": 0
, "created": {"time": #{ts}, "actor": "urn:li:corpuser:datahub"}
, "lastModified": {"time": #{ts}, "actor": "urn:li:corpuser:datahub"}
, "hash": ""
, "platformSchema": {
"com.linkedin.pegasus2avro.schema.MySqlDDL": {
"documentSchema": "{}"
, "tableSchema": "{}"
}
}
, "fields": #{mkFields fields}
}
}
]
}
}
}
|]
where
urn :: String = printf "urn:li:dataset:(urn:li:dataPlatform:%s,%s,%s)"
platform schemaName ("PROD"::String)
mkField (_:_:fieldPath:nativeDataType:description:[]) = [aesonQQ|
{
"fieldPath": #{fieldPath}
, "description": {"string": #{description}}
, "type": {"type": {"com.linkedin.pegasus2avro.schema.StringType": {}}}
, "nativeDataType": #{nativeDataType}
}
|]
mkFields = map mkField
main :: IO ()
main = do
hSetEncoding stdout utf8
let
jvmArgs = case $(TH.lift =<< TH.runIO (lookupEnv "CLASSPATH")) of
Nothing -> []
Just cp -> [ cs ("-Djava.class.path=" ++ cp) ]
platform :: T.Text = "localhost_postgresql"
-- dbUrl :: T.Text = "jdbc:mysql://localhost:3306/datahub?useSSL=false"
-- dbUrl :: T.Text = "jdbc:oracle:thin@localhost:1521:EDWDB"
dbUrl :: T.Text = "jdbc:postgresql://localhost:5432/datahub"
dbUser :: T.Text = "datahub"
dbPassword :: T.Text = "datahub"
-- dbDriver:: T.Text = "oracle.jdbc.OracleDriver" ;
-- dbDriver:: T.Text = "com.mysql.jdbc.Driver" ;
dbDriver:: T.Text = "org.postgresql.Driver" ;
-- dbDriver:: T.Text = "com.microsoft.sqlserver.jdbc.SQLServerDriver" ;
-- dbSQL :: T.Text = datasetMysqlSql
-- dbSQL :: T.Text = datasetOracleSql
dbSQL :: T.Text = datasetPostgresqlSql
runInBoundThread $ withJVM jvmArgs $ do
[jDbUrl, jDbUser, jDbPassword, jDbDriver, jDbSQL ] <-
mapM reflect [dbUrl, dbUser, dbPassword, dbDriver, dbSQL]
result <- [java| {
try {
Class.forName($jDbDriver) ;
} catch (ClassNotFoundException e) {
e.printStackTrace() ;
System.exit(1) ;
}
List<String[]> result = new ArrayList<String[]>() ;
try (Connection con = DriverManager.getConnection($jDbUrl, $jDbUser, $jDbPassword) ;
Statement st = con.createStatement(); ) {
try (ResultSet rs = st.executeQuery($jDbSQL)) {
while(rs.next()) {
String[] row = new String[] {
Optional.ofNullable(rs.getString("schema_name")).orElse("")
, Optional.ofNullable(rs.getString("schema_description")).orElse("")
, Optional.ofNullable(rs.getString("field_path")).orElse("")
, Optional.ofNullable(rs.getString("native_data_type")).orElse("")
, Optional.ofNullable(rs.getString("description")).orElse("")
} ;
result.add(row) ;
}
}
return result.toArray(new String[0][0]) ;
} catch (SQLException e) {
e.printStackTrace() ;
return null ;
}
} |]
rows :: [[T.Text]] <- reify result
runConduit $ C.yieldMany rows
-- .| C.iterM (hPrint stderr)
.| C.groupBy sameSchemaName
-- .| C.iterM (hPrint stderr)
.| C.map (mkMCE 0 platform)
.| C.mapM_ (J.encode >>> cs >>> putStrLn)
.| C.sinkNull
return ()
where
sameSchemaName (schemaNameL:_) (schemaNameR:_) = schemaNameL == schemaNameR

View File

@ -1,54 +0,0 @@
with import <nixpkgs> {} ;
let
inline_java_git = fetchFromGitHub {
owner = "tweag" ;
repo = "inline-java" ;
rev = "a897d32df99e4ed19314d2a7e245785152e9099d" ;
sha256 = "00pk19j9g0mm9sknj3aklz01zv1dy234s3vnzg6daq1dmwd4hb68" ;
} ;
haskellPackages = pkgs.haskellPackages.override {
overrides = self: super: with pkgs.haskell.lib; {
jni = overrideCabal (self.callCabal2nix "jni" (inline_java_git + /jni) {}) (drv: {
preConfigure = ''
local libdir=( "${pkgs.jdk}/lib/openjdk/jre/lib/"*"/server" )
configureFlags+=" --extra-lib-dir=''${libdir[0]}"
'' ;
}) ;
jvm = overrideCabal (self.callCabal2nix "jvm" (inline_java_git + /jvm) {}) (drv: {
doCheck = false ;
}) ;
inline-java = overrideCabal (self.callCabal2nix "inline-java" inline_java_git {}) (drv: {
doCheck = false ;
}) ;
jvm-batching = overrideCabal (self.callCabal2nix "jvm-batching" (inline_java_git + /jvm-batching) {}) (drv: {
doCheck = false ;
}) ;
jvm-streaming = overrideCabal (self.callCabal2nix "jvm-streaming" (inline_java_git + /jvm-streaming) {}) (drv: {
doCheck = false ;
}) ;
} ;
};
in
mkShell {
buildInputs = [
pkgs.jdk
pkgs.postgresql_jdbc
pkgs.mysql_jdbc
pkgs.mssql_jdbc
pkgs.oracle-instantclient
(haskellPackages.ghcWithPackages ( p:
[ p.bytestring p.string-conversions
p.interpolatedstring-perl6
p.aeson p.aeson-qq
p.exceptions
p.inline-java
p.conduit
]
))
];
}

View File

@ -1,113 +0,0 @@
#! /usr/bin/env nix-shell
#! nix-shell ./lineage_hive_generator.hs.nix -i runghc
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE QuasiQuotes #-}
import Data.Functor ((<&>))
import Control.Monad (when)
import Control.Arrow ((>>>))
import Data.Proxy (Proxy(..))
import Data.Either (isLeft, fromLeft, fromRight)
import Text.Printf (formatString)
import System.IO (hPrint, stderr)
import Data.String.Conversions (cs)
import qualified Data.Text.Lazy as T
import qualified Data.Text.Lazy.IO as T
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.HashMap.Strict as HM
import qualified Data.Aeson as J
import Data.Conduit (ConduitT, runConduitRes, runConduit, bracketP, (.|))
import qualified Data.Conduit.Combinators as C
import qualified Database.Sql.Hive.Parser as HIVE
import qualified Database.Sql.Hive.Type as HIVE
import Database.Sql.Type (
Catalog(..), DatabaseName(..), FullyQualifiedTableName(..), FQTN(..)
, makeDefaultingCatalog, mkNormalSchema
)
import Database.Sql.Util.Scope (runResolverWarn)
import Database.Sql.Util.Lineage.Table (getTableLineage)
import Data.Aeson.QQ (aesonQQ)
import Data.Time.Clock.POSIX (getPOSIXTime)
instance J.ToJSON FullyQualifiedTableName
instance J.ToJSONKey FullyQualifiedTableName
nowts :: IO Int
nowts = floor . (* 1000) <$> getPOSIXTime
catalog :: Catalog
catalog = makeDefaultingCatalog HM.empty
[mkNormalSchema "public" ()]
(DatabaseName () "defaultDatabase")
tableName :: FullyQualifiedTableName -> T.Text
tableName (FullyQualifiedTableName database schema name) = T.intercalate "." [database, schema, name]
mkMCE :: Int -> (FQTN, S.Set FQTN) -> J.Value
mkMCE ts (output, inputs) = [aesonQQ|
{ "proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": #{uriName output}
, "aspects": [
{ "com.linkedin.pegasus2avro.dataset.UpstreamLineage": {
"upstreams": #{upstreams ts inputs}
}
}
]
}
}
}
|]
where
upstream :: Int -> T.Text -> J.Value
upstream ts dataset = [aesonQQ|
{ "auditStamp": {"time":#{ts}, "actor":"urn:li:corpuser:jdoe"}
, "dataset": #{dataset}
, "type":"TRANSFORMED"
}
|]
upstreams ts = map (upstream ts . uriName) . S.toList
uriName :: FQTN -> T.Text
uriName fqtn = "urn:li:dataset:(urn:li:dataPlatform:hive,"
<> tableName fqtn
<> ",PROD)"
main = do
contents <- T.getContents <&> T.lines
ts <- nowts
runConduit $ C.yieldMany contents
.| C.iterM (hPrint stderr)
.| C.mapM (cs >>> T.readFile)
.| C.concatMap parseSQL
.| C.mapM resolveStatement
.| C.concatMap (getTableLineage >>> M.toList)
.| C.map (mkMCE ts)
.| C.mapM_ (J.encode >>> cs >>> putStrLn)
where
parseSQL sql = do
let stOrErr = HIVE.parseManyAll sql
when (isLeft stOrErr) $
error $ show (fromLeft undefined stOrErr)
fromRight undefined stOrErr
resolveStatement st = do
let resolvedStOrErr = runResolverWarn (HIVE.resolveHiveStatement st) HIVE.dialectProxy catalog
when (isLeft . fst $ resolvedStOrErr) $
error $ show (fromLeft undefined (fst resolvedStOrErr))
let (Right queryResolved, resolutions) = resolvedStOrErr
return queryResolved

View File

@ -1,31 +0,0 @@
with import <nixpkgs> {} ;
let
queryparser_git = fetchFromGitHub {
owner = "uber" ;
repo = "queryparser" ;
rev = "6015e8f273f4498326fec0315ac5580d7036f8a4" ;
sha256 = "05pnifm5awyqxi6330v791b1cvw26xbcn2r20pqakvl8d3xyaxa4" ;
} ;
haskellPackages = pkgs.haskellPackages.override {
overrides = self: super: with pkgs.haskell.lib; {
queryparser = appendConfigureFlag
(dontHaddock (doJailbreak (self.callCabal2nix "queryparser" queryparser_git {})))
"--ghc-options=-XNoMonadFailDesugaring" ;
queryparser-hive = dontHaddock (doJailbreak (self.callCabal2nix "queryparser-hive" (queryparser_git + /dialects/hive) {})) ;
} ;
};
in
mkShell {
buildInputs = [
(haskellPackages.ghcWithPackages ( p:
[ p.bytestring p.text p.string-conversions
p.exceptions p.time
p.aeson p.aeson-qq
p.conduit
p.queryparser p.queryparser-hive
]
))
];
}

View File

@ -1 +0,0 @@
../../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc

View File

@ -1 +0,0 @@
../../nix/datahub-config.nix

View File

@ -1,144 +0,0 @@
WITH TL AS
(
SELECT B1.CPC,
B1.CSC,
B1.CSN,
B1.CSC_P,
B1.CCC,
B1.NSD,
CASE
WHEN B1.CSL = 4 THEN
B5.CSN
ELSE
B1.CSN
END AS GL_ATTR,
B1.CSL,
CASE WHEN B1.CSL = 4 THEN B4.CSN
WHEN B1.CSL = 3 THEN B3.CSN
WHEN B1.CSL = 2 THEN B2.CSN
WHEN B1.CSL = 1 THEN B1.CSN
END AS FACCTNAME_LV1,
CASE WHEN B1.CSL = 4 THEN B3.CSN
WHEN B1.CSL = 3 THEN B2.CSN
WHEN B1.CSL = 2 THEN B1.CSN
END AS FACCTNAME_LV2,
CASE WHEN B1.CSL = 4 THEN B2.CSN
WHEN B1.CSL = 3 THEN B1.CSN
END AS FACCTNAME_LV3,
CASE WHEN B1.CSL = 4 THEN B1.CSN
END AS FACCTNAME_LV4
FROM (SELECT CPC, CSC, CSN, CSC_P, CCC
, NSD, CSL
FROM ODS.FVS
WHERE HDATASRC = 'A'
) B1
LEFT JOIN
(SELECT CPC, CSC, CSN, CSC_P, CCC
, NSD, CSL
FROM ODS.FVS
WHERE HDATASRC = 'A'
) B2
ON B1.CPC = B2.CPC
AND B1.CSC_P = B2.CSC
LEFT JOIN
(SELECT CPC, CSC, CSN, CSC_P, CCC
, NSD, CSL
FROM ODS.FVS
WHERE HDATASRC = 'A'
) B3
ON B2.CPC = B3.CPC
AND B2.CSC_P = B3.CSC
LEFT JOIN
(SELECT CPC, CSC, CSN, CSC_P, CCC
, NSD, CSL
FROM ODS.FVS
WHERE HDATASRC = 'A'
) B4
ON B3.CPC = B4.CPC
AND B3.CSC_P = B4.CSC
LEFT JOIN
(SELECT CPC, CSC, CSN, CSC_P, CCC
, NSD, CSL
FROM ODS.FVS
WHERE HDATASRC = 'A'
) B5
ON B1.CPC = B5.CPC
AND B1.CSC_P = B5.CSC
)
INSERT OVERWRITE TABLE TMP.TFVDM1 PARTITION (HDATASRC = 'A')
SELECT qt_sequence("UUID", A.CAC) AS UUID,
C.PH AS PH,
A.CAC AS PC,
A.CPC AS ASS,
A.D_BIZ AS BD,
E.CH AS CH,
F.EH AS EH,
A.CSC AS GL_CODE,
CASE
WHEN A.CSN = ' ' THEN
A.C_KEY_NAME
ELSE
NVL(A.CSN,A.C_KEY_NAME)
END AS GL_NAME,
A.N_VALPRICE AS ATPRICE,
A.N_HLDAMT AS ATQTY,
A.N_HLDCST_LOCL AS ATCOST,
A.N_HLDCST AS ATCOST_ORICUR,
A.N_HLDMKV_LOCL AS ATMKTVAL,
A.N_HLDMKV AS ATMKTVAL_ORICUR,
A.N_HLDVVA_L AS ATVAL_ADDED,
A.N_HLDVVA AS ATVAL_ADDED_ORICUR,
A.N_VALRATE AS ATEXRATE,
NULL AS COST_TO_AT_RIO,
NULL AS MKTVAL_TO_AT_RIO,
B.NSD AS IS_DETAIL_GL,
A.C_PA_CODE AS ATITEM,
A.C_IVT_CLSS AS INVEST_CLASS,
A.C_ML_ATTR AS ISSUE_MODE,
A.C_FEE_CODE AS FEE_CODE,
A.C_SEC_VAR_MX AS SEC_KIND,
A.C_TD_ATTR AS TRADE_ATTR,
H.C_CA_ATTR AS CASH_ACCOUNT,
A.GL_LV1 AS GL_LV1,
B.FACCTNAME_LV1 AS GL_NAME_LV1,
B.FACCTNAME_LV2 AS GL_NAME_LV2,
B.FACCTNAME_LV3 AS GL_NAME_LV3,
B.FACCTNAME_LV4 AS GL_NAME_LV4,
NULL AS GL_NAME_LV5,
NULL AS GL_NAME_LV6,
A.CSC_T AS GL_ATTR_CODE,
CASE WHEN B.GL_ATTR = '<CA>' THEN A.CSN
ELSE B.GL_ATTR END AS GL_ATTR,
NVL(B.CSN, A.C_KEY_NAME) AS GL_FNAME,
A.C_SEC_CODE AS SEC_CODE_FA,
NULL AS SYMBOL_ORI,
NULL AS SYMBOL,
NULL AS SEC_TYPE ,
FROM_UNIXTIME(UNIX_TIMESTAMP(CURRENT_TIMESTAMP()),'yyyy-MM-dd HH:mm:ss') AS HLOADTIME,
'20190101' AS HBATCHDATE
FROM (SELECT SUBSTR(T.CSC, 1, 4) AS GL_LV1,
T.*
FROM ODS.FVRV T
WHERE T.D_BIZ IN (SELECT BD
FROM CTL.CFD
WHERE HDATASRC = 'A')
AND T.HDATASRC = 'A'
) A
LEFT JOIN TL B
ON NVL(A.CSC_T, A.CSC) = B.CSC
AND A.CPC = B.CPC
LEFT JOIN DW.PPCM C
ON A.CPC = C.ORI_SYS_PC
AND C.ORI_SYS_HCODE = 'A'
AND A.D_BIZ BETWEEN C.STDATE AND C.ENDDATE
LEFT JOIN DW.RCM E
ON A.CCC = E.ORI_SYS_CR_CODE
AND E.ORI_SYS_HCODE = 'A'
LEFT JOIN DW.REM F
ON A.C_MKT_CODE = F.ORI_SYS_EXCH_CODE
AND F.ORI_SYS_HCODE = 'A'
LEFT JOIN (SELECT C_CA_CODE, MAX(C_CA_ATTR) AS C_CA_ATTR
FROM ODS.FVC
GROUP BY C_CA_CODE) H
ON A.C_CA_CODE = H.C_CA_CODE

View File

@ -1,5 +0,0 @@
{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {"urn": "urn:li:corpuser:datahub", "aspects": [{"com.linkedin.pegasus2avro.identity.CorpUserInfo":{"active": true, "displayName": {"string": "Data Hub"}, "email": "datahub@linkedin.com", "title": {"string": "CEO"}, "fullName": {"string": "Data Hub"}}}]}}}
{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.CorpUserSnapshot": {"urn": "urn:li:corpuser:jdoe", "aspects": [{"com.linkedin.pegasus2avro.identity.CorpUserInfo":{"active": true, "displayName": {"string": "John Doe"}, "email": "jdoe@linkedin.com", "title": {"string": "Software Engineer"}, "fullName": {"string": "John Doe"}}}]}}}
{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)", "aspects": [{"com.linkedin.pegasus2avro.common.Ownership": {"owners":[{"owner":"urn:li:corpuser:jdoe","type":"DATAOWNER"}, {"owner":"urn:li:corpuser:datahub","type":"DATAOWNER"}],"lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}},{"com.linkedin.pegasus2avro.dataset.UpstreamLineage":{"upstreams":[]}}, {"com.linkedin.pegasus2avro.common.InstitutionalMemory":{"elements":[{"url":"https://www.linkedin.com","description":"Sample doc","createStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}]}},{"com.linkedin.pegasus2avro.schema.SchemaMetadata":{"schemaName":"SampleKafkaSchema","platform":"urn:li:dataPlatform:kafka","version":0, "created":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.KafkaSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"SampleKafkaSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Kafka dataset.\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"}}, "fields":[{"fieldPath":"field_foo", "description":{"string": "Foo field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}},"nativeDataType":"string"}, {"fieldPath":"field_bar", "description":{"string": "Bar field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.BooleanType":{}}},"nativeDataType":"boolean"}]}}] } } }
{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {"urn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)", "aspects": [{"com.linkedin.pegasus2avro.common.Ownership": {"owners":[{"owner":"urn:li:corpuser:jdoe","type":"DATAOWNER"}, {"owner":"urn:li:corpuser:datahub","type":"DATAOWNER"}],"lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}},{"com.linkedin.pegasus2avro.dataset.UpstreamLineage":{"upstreams":[{"auditStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"},"dataset":"urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)","type":"TRANSFORMED"}]}}, {"com.linkedin.pegasus2avro.common.InstitutionalMemory":{"elements":[{"url":"https://www.linkedin.com","description":"Sample doc","createStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}]}},{"com.linkedin.pegasus2avro.schema.SchemaMetadata":{"schemaName":"SampleHdfsSchema","platform":"urn:li:dataPlatform:hdfs","version":0, "created":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.KafkaSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"SampleHdfsSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample HDFS dataset.\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"}}, "fields":[{"fieldPath":"field_foo", "description":{"string": "Foo field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}},"nativeDataType":"string"}, {"fieldPath":"field_bar", "description":{"string": "Bar field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.BooleanType":{}}},"nativeDataType":"boolean"}]}}] } } }
{"proposedSnapshot": {"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", "aspects": [{"com.linkedin.pegasus2avro.common.Ownership": {"owners":[{"owner":"urn:li:corpuser:jdoe","type":"DATAOWNER"}, {"owner":"urn:li:corpuser:datahub","type":"DATAOWNER"}],"lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}},{"com.linkedin.pegasus2avro.dataset.UpstreamLineage":{"upstreams":[{"auditStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"},"dataset":"urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)","type":"TRANSFORMED"}]}}, {"com.linkedin.pegasus2avro.common.InstitutionalMemory":{"elements":[{"url":"https://www.linkedin.com","description":"Sample doc","createStamp":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}}]}},{"com.linkedin.pegasus2avro.schema.SchemaMetadata":{"schemaName":"SampleHiveSchema","platform":"urn:li:dataPlatform:hive","version":0, "created":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "lastModified":{"time":1581407189000,"actor":"urn:li:corpuser:jdoe"}, "hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.KafkaSchema":{"documentSchema":"{\"type\":\"record\",\"name\":\"SampleHiveSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Hive dataset.\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"}}, "fields":[{"fieldPath":"field_foo", "description":{"string": "Foo field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}},"nativeDataType":"string"}, {"fieldPath":"field_bar", "description":{"string": "Bar field description"},"type":{"type":{"com.linkedin.pegasus2avro.schema.BooleanType":{}}},"nativeDataType":"boolean"}]}}] } } }

View File

@ -1,15 +0,0 @@
## looker_dashboard_ingestion.py
This tool helps ingest Looker dashboard and chart metadata into datahub.
Currently it creates a separate platform named "looker" and loads all dashboard and chart information into that platform as virtual datasets. This was to workaround datahub's lack of support for dashboard entities, however datahub recently started supporting proper dashboard entities.
The script assumes you already have run lookml_ingestion.py to scrape view definitions into datahub, this is important because we assign lineage between looker views and looker dashboards/charts where possible.
## Steps:
- Use a version of python >= 3.7
- Make a virtual environment
- pip install -r requirements.txt
- Set env vars: LOOKERSDK_CLIENT_ID, LOOKERSDK_CLIENT_SECRET, LOOKERSDK_BASE_URL
- Configure extra kafka conf in looker_dashboard_ingestion.py
python looker_dashboard_ingestion.py

View File

@ -1,426 +0,0 @@
#! /usr/bin/python
import time
import os
import json
import typing
from pprint import pprint
import looker_sdk
from looker_sdk.sdk.api31.models import Query, DashboardElement, LookWithQuery, Dashboard
from looker_sdk.error import SDKError
from dataclasses import dataclass
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
# Configuration
AVSC_PATH = "../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc"
KAFKA_TOPIC = 'MetadataChangeEvent_v4'
# Set the following environmental variables to hit Looker's API
# LOOKERSDK_CLIENT_ID=YourClientID
# LOOKERSDK_CLIENT_SECRET=YourClientSecret
# LOOKERSDK_BASE_URL=https://company.looker.com:19999
LOOKERSDK_BASE_URL = os.environ["LOOKERSDK_BASE_URL"]
EXTRA_KAFKA_CONF = {
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
# 'security.protocol': 'SSL',
# 'ssl.ca.location': '',
# 'ssl.key.location': '',
# 'ssl.certificate.location': ''
}
# The datahub platform where looker views are stored, must be the same as VIEW_DATAHUB_PLATFORM in lookml_ingestion.py
VIEW_DATAHUB_PLATFORM = "looker_views"
# The datahub platform where looker dashboards will be stored
VISUALIZATION_DATAHUB_PLATFORM = "looker"
@dataclass
class LookerDashboardElement:
id: str
title: str
query_slug: str
looker_views: typing.List[str]
look_id: typing.Optional[str]
@property
def url(self) -> str:
base_url = get_looker_base_url()
# A dashboard element can use a look or just a raw query against an explore
if self.look_id is not None:
return base_url + "/looks/" + self.look_id
else:
return base_url + "/x/" + self.query_slug
def get_urn_element_id(self):
# A dashboard element can use a look or just a raw query against an explore
return f"dashboard_elements.{self.id}"
def get_view_urns(self) -> typing.List[str]:
return [f"urn:li:dataset:(urn:li:dataPlatform:{VIEW_DATAHUB_PLATFORM},{v},PROD)" for v in self.looker_views]
@dataclass
class LookerDashboard:
id: str
title: str
description: str
dashboard_elements: typing.List[LookerDashboardElement]
@property
def url(self):
return get_looker_base_url() + "/dashboards/" + self.id
def get_urn_dashboard_id(self):
return f"dashboards.{self.id}"
@dataclass
class DashboardKafkaEvents:
dashboard_mce: typing.Dict
chart_mces: typing.List[typing.Dict]
def all_mces(self) -> typing.List[typing.Dict]:
return self.chart_mces + [self.dashboard_mce]
def get_looker_base_url():
base_url = LOOKERSDK_BASE_URL.split("looker.com")[0] + "looker.com"
return base_url
def get_actor_and_sys_time():
actor, sys_time = "urn:li:corpuser:analysts", int(time.time()) * 1000
return actor, sys_time
class ProperDatahubEvents:
"""
This class generates events for "proper" datahub charts and dashboards
These events will not be visualized anywhere as of 12/11/2020
"""
@staticmethod
def make_chart_mce(dashboard_element: LookerDashboardElement) -> typing.Dict:
actor, sys_time = get_actor_and_sys_time()
owners = [{
"owner": actor,
"type": "DEVELOPER"
}]
return {
"auditHeader": None,
"proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.ChartSnapshot", {
"urn": f"urn:li:chart:(looker,{dashboard_element.get_urn_element_id()})",
"aspects": [
("com.linkedin.pegasus2avro.dataset.ChartInfo", {
"title": dashboard_element.title,
"description": "",
"inputs": dashboard_element.get_view_urns(),
"url": f"",
"lastModified": {"created": {"time": sys_time, "actor": actor}}
}),
("com.linkedin.pegasus2avro.common.Ownership", {
"owners": owners,
"lastModified": {
"time": sys_time,
"actor": actor
}
})
]
}),
"proposedDelta": None
}
@staticmethod
def make_dashboard_mce(looker_dashboard: LookerDashboard) -> DashboardKafkaEvents:
actor, sys_time = get_actor_and_sys_time()
owners = [{
"owner": actor,
"type": "DEVELOPER"
}]
chart_mces = [ProperDatahubEvents.make_chart_mce(element) for element in looker_dashboard.dashboard_elements]
dashboard_mce = {
"auditHeader": None,
"proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DashboardSnapshot", {
"urn": f"urn:li:dashboard:(looker,{looker_dashboard.get_urn_dashboard_id()},PROD)",
"aspects": [
("com.linkedin.pegasus2avro.dataset.DashboardInfo", {
"title": looker_dashboard.title,
"description": looker_dashboard.description,
"charts": [mce["proposedSnapshot"][1]["urn"] for mce in chart_mces],
"url": looker_dashboard.url,
"lastModified": {"created": {"time": sys_time, "actor": actor}}
}),
("com.linkedin.pegasus2avro.common.Ownership", {
"owners": owners,
"lastModified": {
"time": sys_time,
"actor": actor
}
})
]
}),
"proposedDelta": None
}
return DashboardKafkaEvents(dashboard_mce=dashboard_mce, chart_mces=chart_mces)
class WorkaroundDatahubEvents:
"""
This class generates events for "workaround" datahub charts and dashboards
This is so we can display end to end lineage without being blocked on datahub's support for dashboards and charts
The approach is we generate "charts" and "dashboards" as just "datasets" in datahub under a new platform
We then link them together using "UpstreamLineage" just like any other dataset
"""
@staticmethod
def make_chart_mce(dashboard_element: LookerDashboardElement) -> typing.Dict:
actor, sys_time = get_actor_and_sys_time()
owners = [{
"owner": actor,
"type": "DEVELOPER"
}]
upstreams = [{
"auditStamp":{
"time": sys_time,
"actor": actor
},
"dataset": view_urn,
"type":"TRANSFORMED"
} for view_urn in dashboard_element.get_view_urns()]
doc_elements = [{
"url": dashboard_element.url,
"description": "Looker chart url",
"createStamp": {
"time": sys_time,
"actor": actor
}
}]
return {
"auditHeader": None,
"proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", {
"urn": f"urn:li:dataset:(urn:li:dataPlatform:{VISUALIZATION_DATAHUB_PLATFORM},{dashboard_element.get_urn_element_id()},PROD)",
"aspects": [
("com.linkedin.pegasus2avro.dataset.UpstreamLineage", {"upstreams": upstreams}),
("com.linkedin.pegasus2avro.common.InstitutionalMemory", {"elements": doc_elements}),
("com.linkedin.pegasus2avro.dataset.DatasetProperties", {"description": dashboard_element.title, "customProperties": {}}),
("com.linkedin.pegasus2avro.common.Ownership", {
"owners": owners,
"lastModified": {
"time": sys_time,
"actor": actor
}
})
]
}),
"proposedDelta": None
}
@staticmethod
def make_dashboard_mce(looker_dashboard: LookerDashboard) -> DashboardKafkaEvents:
actor, sys_time = get_actor_and_sys_time()
chart_mces = [WorkaroundDatahubEvents.make_chart_mce(element) for element in looker_dashboard.dashboard_elements]
owners = [{
"owner": actor,
"type": "DEVELOPER"
}]
upstreams = [{
"auditStamp":{
"time": sys_time,
"actor": actor
},
"dataset": chart_urn,
"type":"TRANSFORMED"
} for chart_urn in [mce["proposedSnapshot"][1]["urn"] for mce in chart_mces]]
doc_elements = [{
"url": looker_dashboard.url,
"description": "Looker dashboard url",
"createStamp": {
"time": sys_time,
"actor": actor
}
}]
dashboard_mce = {
"auditHeader": None,
"proposedSnapshot": ("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", {
"urn": f"urn:li:dataset:(urn:li:dataPlatform:{VISUALIZATION_DATAHUB_PLATFORM},{looker_dashboard.get_urn_dashboard_id()},PROD)",
"aspects": [
("com.linkedin.pegasus2avro.dataset.UpstreamLineage", {"upstreams": upstreams}),
("com.linkedin.pegasus2avro.common.InstitutionalMemory", {"elements": doc_elements}),
("com.linkedin.pegasus2avro.dataset.DatasetProperties", {"description": looker_dashboard.title, "customProperties": {}}),
("com.linkedin.pegasus2avro.common.Ownership", {
"owners": owners,
"lastModified": {
"time": sys_time,
"actor": actor
}
})
]
}),
"proposedDelta": None
}
return DashboardKafkaEvents(dashboard_mce=dashboard_mce, chart_mces=chart_mces)
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
def make_kafka_producer(extra_kafka_conf):
conf = {
"on_delivery": delivery_report,
**extra_kafka_conf
}
key_schema = avro.loads('{"type": "string"}')
record_schema = avro.load(AVSC_PATH)
producer = AvroProducer(conf, default_key_schema=key_schema, default_value_schema=record_schema)
return producer
def _extract_view_from_field(field: str) -> str:
assert field.count(".") == 1, f"Error: A field must be prefixed by a view name, field is: {field}"
view_name = field.split(".")[0]
return view_name
def get_views_from_query(query: Query) -> typing.List[str]:
all_views = set()
# query.dynamic_fields can contain:
# - looker table calculations: https://docs.looker.com/exploring-data/using-table-calculations
# - looker custom measures: https://docs.looker.com/de/exploring-data/adding-fields/custom-measure
# - looker custom dimensions: https://docs.looker.com/exploring-data/adding-fields/custom-measure#creating_a_custom_dimension_using_a_looker_expression
dynamic_fields = json.loads(query.dynamic_fields if query.dynamic_fields is not None else '[]')
custom_field_to_underlying_field = {}
for field in dynamic_fields:
# Table calculations can only reference fields used in the fields section, so this will always be a subset of of the query.fields
if "table_calculation" in field:
continue
# Looker custom measures can reference fields in arbitrary views, so this needs to be parsed to find the underlying view field the custom measure is based on
if "measure" in field:
measure = field["measure"]
based_on = field["based_on"]
custom_field_to_underlying_field[measure] = based_on
# Looker custom dimensions can reference fields in arbitrary views, so this needs to be parsed to find the underlying view field the custom measure is based on
# However, unlike custom measures custom dimensions can be defined using an arbitrary expression
# We are not going to support parsing arbitrary Looker expressions here, so going to ignore these fields for now
# TODO: support parsing arbitrary looker expressions
if "dimension" in field:
dimension = field["dimension"]
expression = field["expression"]
custom_field_to_underlying_field[dimension] = None
# A query uses fields defined in views, find the views those fields use
fields: typing.Sequence[str] = query.fields if query.fields is not None else []
for field in fields:
# If the field is a custom field, look up the field it is based on
field_name = custom_field_to_underlying_field[field] if field in custom_field_to_underlying_field else field
if field_name is None:
continue
view_name = _extract_view_from_field(field_name)
all_views.add(view_name)
# A query uses fields for filtering and those fields are defined in views, find the views those fields use
filters: typing.MutableMapping[str, typing.Any] = query.filters if query.filters is not None else {}
for field in filters.keys():
# If the field is a custom field, look up the field it is based on
field_name = custom_field_to_underlying_field[field] if field in custom_field_to_underlying_field else field
if field_name is None:
continue
view_name = _extract_view_from_field(field_name)
all_views.add(view_name)
return list(all_views)
def get_views_from_look(look: LookWithQuery):
return get_views_from_query(look.query)
def get_looker_dashboard_element(element: DashboardElement)-> typing.Optional[LookerDashboardElement]:
# Dashboard elements can use raw queries against explores
if element.query is not None:
views = get_views_from_query(element.query)
return LookerDashboardElement(id=element.id, title=element.title, look_id=None, query_slug=element.query.slug, looker_views=views)
# Dashboard elements can *alternatively* link to an existing look
if element.look is not None:
views = get_views_from_look(element.look)
return LookerDashboardElement(id=element.id, title=element.title, look_id=element.look_id, query_slug=element.look.query.slug, looker_views=views)
# This occurs for "text" dashboard elements that just contain static text (ie: no queries)
# There is not much meaningful info to extract from these elements, so ignore them
return None
def get_looker_dashboard(dashboard: Dashboard) -> LookerDashboard:
dashboard_elements: typing.List[LookerDashboardElement] = []
for element in dashboard.dashboard_elements:
looker_dashboard_element = get_looker_dashboard_element(element)
if looker_dashboard_element is not None:
dashboard_elements.append(looker_dashboard_element)
looker_dashboard = LookerDashboard(id=dashboard.id, title=dashboard.title, description=dashboard.description, dashboard_elements=dashboard_elements)
return looker_dashboard
# Perform IO in main
def main():
kafka_producer = make_kafka_producer(EXTRA_KAFKA_CONF)
sdk = looker_sdk.init31()
dashboard_ids = [dashboard_base.id for dashboard_base in sdk.all_dashboards(fields="id")]
looker_dashboards = []
for dashboard_id in dashboard_ids:
try:
fields = ["id", "title", "dashboard_elements", "dashboard_filters"]
dashboard_object = sdk.dashboard(dashboard_id=dashboard_id, fields=",".join(fields))
except SDKError as e:
# A looker dashboard could be deleted in between the list and the get
print(f"Skipping dashboard with dashboard_id: {dashboard_id}")
print(e)
continue
looker_dashboard = get_looker_dashboard(dashboard_object)
looker_dashboards.append(looker_dashboard)
pprint(looker_dashboard)
for looker_dashboard in looker_dashboards:
workaround_dashboard_kafka_events = WorkaroundDatahubEvents.make_dashboard_mce(looker_dashboard)
# Hard to test these events since datahub does not have a UI, for now disable sending them
# proper_dashboard_kafka_events = ProperDatahubEvents.make_dashboard_mce(looker_dashboard)
for mce in workaround_dashboard_kafka_events.all_mces():
print(mce)
kafka_producer.produce(topic=KAFKA_TOPIC, key=mce['proposedSnapshot'][1]['urn'], value=mce)
kafka_producer.flush()
if __name__ == "__main__":
main()

View File

@ -1,4 +0,0 @@
avro-python3==1.8.2
confluent-kafka[avro]==1.4.0
PyYAML==5.4.1
looker-sdk==0.1.3b20

View File

@ -1,20 +0,0 @@
## lookml_ingestion.py
This script ingests Looker view metadata from lookml into datahub. Looker views are essentially like database views that can be either materialized or ephemeral, so we treat them as you would any other dataset in datahub.
Underneath the hood, this script uses the `lkml` python parsing library to parse lkml files and so it comes with all the limitations of that underlying parser.
Roughly how the script works:
- Point the script at a directory on the filesystem, finds all files named `*.model.lkml` in any level of nesting
- Finds the viewfile includes in the model file, this indicates that the viewfile is a part of that model (and a model has a single SQL connection associated with it). Does not handle a model importing a view file but *not* using the view in the model since that would require parsing explore blocks and adds complexity.
- For each viewfile in the model, parses the view files. For each view in the viewfile, resolve the sql table name for the view:
- We do not support parsing derived tables using a `sql:` block, this would require parsing SQL to understand dependencies. We only support views using `sql_table_name`. In the future, could support limited SQL parsing for limited SQL dialects.
- We support views using the `extends` keyword: https://docs.looker.com/reference/view-params/extends-for-view This is surprisingly painful because views can extend other views in other files. We do this inefficiently right now.
- We do not support views using `refinements`. SpotHero does not use refinements right now, so we had no need to implement it: https://docs.looker.com/data-modeling/learning-lookml/refinements
- After binding views to models and finding the sql table name associated with the views, we generate the MCE events into a separate looker platform in datahub since they are not "real" tables but "virtual" looker tables
## Steps
- Use a version of python >= 3.7
- Make a virtual environment
- pip install -r requirements.txt
- Set env var: LOOKER_DIRECTORY to the root path of lkml on your filesystem
- Modify EXTRA_KAFKA_CONF section of script to point to datahub

View File

@ -1,343 +0,0 @@
import lkml
import glob
import time
import typing
import os
import re
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
from dataclasses import dataclass, replace
from sql_metadata import get_query_tables
# Configuration
AVSC_PATH = "../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc"
KAFKA_TOPIC = 'MetadataChangeEvent_v4'
# LOOKER_DIRECTORY = "./test_lookml"
LOOKER_DIRECTORY = os.environ["LOOKER_DIRECTORY"]
LOOKER_DIRECTORY = os.path.abspath(LOOKER_DIRECTORY)
EXTRA_KAFKA_CONF = {
'bootstrap.servers': 'localhost:9092',
'schema.registry.url': 'http://localhost:8081'
# 'security.protocol': 'SSL',
# 'ssl.ca.location': '',
# 'ssl.key.location': '',
# 'ssl.certificate.location': ''
}
# The datahub platform where looker views are stored
LOOKER_VIEW_PLATFORM = "looker_views"
class LookerViewFileLoader:
"""
Loads the looker viewfile at a :path and caches the LookerViewFile in memory
This is to avoid reloading the same file off of disk many times during the recursive include resolution process
"""
def __init__(self):
self.viewfile_cache = {}
def _load_viewfile(self, path: str) -> typing.Optional["LookerViewFile"]:
if path in self.viewfile_cache:
return self.viewfile_cache[path]
try:
with open(path, "r") as file:
parsed = lkml.load(file)
looker_viewfile = LookerViewFile.from_looker_dict(path, parsed)
self.viewfile_cache[path] = looker_viewfile
return looker_viewfile
except Exception as e:
print(e)
print(f"Error processing view file {path}. Skipping it")
def load_viewfile(self, path: str, connection: str):
viewfile = self._load_viewfile(path)
if viewfile is None:
return None
return replace(viewfile, connection=connection)
@dataclass
class LookerModel:
connection: str
includes: typing.List[str]
resolved_includes: typing.List[str]
@staticmethod
def from_looker_dict(looker_model_dict):
connection = looker_model_dict["connection"]
includes = looker_model_dict["includes"]
resolved_includes = LookerModel.resolve_includes(includes)
return LookerModel(connection=connection, includes=includes, resolved_includes=resolved_includes)
@staticmethod
def resolve_includes(includes) -> typing.List[str]:
resolved = []
for inc in includes:
# Massage the looker include into a valid glob wildcard expression
glob_expr = f"{LOOKER_DIRECTORY}/{inc}"
outputs = glob.glob(glob_expr)
resolved.extend(outputs)
return resolved
@dataclass
class LookerViewFile:
absolute_file_path: str
connection: typing.Optional[str]
includes: typing.List[str]
resolved_includes: typing.List[str]
views: typing.List[typing.Dict]
@staticmethod
def from_looker_dict(absolute_file_path, looker_view_file_dict):
includes = looker_view_file_dict.get("includes", [])
resolved_includes = LookerModel.resolve_includes(includes)
views = looker_view_file_dict.get("views", [])
return LookerViewFile(absolute_file_path=absolute_file_path, connection=None, includes=includes, resolved_includes=resolved_includes, views=views)
@dataclass
class LookerView:
absolute_file_path: str
connection: str
view_name: str
sql_table_names: typing.List[str]
def get_relative_file_path(self):
if LOOKER_DIRECTORY in self.absolute_file_path:
return self.absolute_file_path.replace(LOOKER_DIRECTORY, '').lstrip('/')
else:
raise Exception(f"Found a looker view with name: {view_name} at path: {absolute_file_path} not underneath the base LOOKER_DIRECTORY: {LOOKER_DIRECTORY}. This should not happen")
@staticmethod
def from_looker_dict(looker_view, connection: str, looker_viewfile: LookerViewFile, looker_viewfile_loader: LookerViewFileLoader) -> typing.Optional["LookerView"]:
view_name = looker_view["name"]
sql_table_name = looker_view.get("sql_table_name", None)
# Some sql_table_name fields contain quotes like: optimizely."group", just remove the quotes
sql_table_name = sql_table_name.replace('"', '') if sql_table_name is not None else None
derived_table = looker_view.get("derived_table", None)
# Parse SQL from derived tables to extract dependencies
if derived_table is not None and 'sql' in derived_table:
# Get the list of tables in the query
sql_tables: typing.List[str] = get_query_tables(derived_table['sql'])
# Remove temporary tables from WITH statements
sql_table_names = [t for t in sql_tables if not re.search(f'WITH(.*,)?\s+{t}(\s*\([\w\s,]+\))?\s+AS\s+\(', derived_table['sql'], re.IGNORECASE|re.DOTALL)]
# Remove quotes from tables
sql_table_names = [t.replace('"', '') for t in sql_table_names]
return LookerView(absolute_file_path=looker_viewfile.absolute_file_path, connection=connection, view_name=view_name, sql_table_names=sql_table_names)
# There is a single dependency in the view, on the sql_table_name
if sql_table_name is not None:
return LookerView(absolute_file_path=looker_viewfile.absolute_file_path, connection=connection, view_name=view_name, sql_table_names=[sql_table_name])
# The sql_table_name might be defined in another view and this view is extending that view, try to find it
else:
extends = looker_view.get("extends", [])
if len(extends) == 0:
# The view is malformed, the view is not a derived table, does not contain a sql_table_name or an extends
print(f"Skipping malformed with view_name: {view_name}. View should have a sql_table_name if it is not a derived table")
return None
extends_to_looker_view = []
# The base view could live in the same file
for raw_view in looker_viewfile.views:
raw_view_name = raw_view["name"]
# Make sure to skip loading view we are currently trying to resolve
if raw_view_name != view_name:
maybe_looker_view = LookerView.from_looker_dict(raw_view, connection, looker_viewfile, looker_viewfile_loader)
if maybe_looker_view is not None and maybe_looker_view.view_name in extends:
extends_to_looker_view.append(maybe_looker_view)
# Or it could live in one of the included files, we do not know which file the base view lives in, try them all!
for include in looker_viewfile.resolved_includes:
looker_viewfile = looker_viewfile_loader.load_viewfile(include, connection)
if looker_viewfile is not None:
for view in looker_viewfile.views:
maybe_looker_view = LookerView.from_looker_dict(view, connection, looker_viewfile, looker_viewfile_loader)
if maybe_looker_view is None:
continue
if maybe_looker_view is not None and maybe_looker_view.view_name in extends:
extends_to_looker_view.append(maybe_looker_view)
if len(extends_to_looker_view) != 1:
print(f"Skipping malformed view with view_name: {view_name}. View should have a single view in a view inheritance chain with a sql_table_name")
return None
output_looker_view = LookerView(absolute_file_path=looker_viewfile.absolute_file_path, connection=connection, view_name=view_name, sql_table_names=extends_to_looker_view[0].sql_table_names)
return output_looker_view
def get_platform_and_table(view_name: str, connection: str, sql_table_name: str):
"""
This will depend on what database connections you use in Looker
For SpotHero, we had two database connections in Looker: "redshift_test" (a redshift database) and "presto" (a presto database)
Presto supports querying across multiple catalogs, so we infer which underlying database presto is using based on the presto catalog name
For SpotHero, we have 3 catalogs in presto: "redshift", "hive", and "hive_emr"
"""
if connection == "redshift_test":
platform = "redshift"
table_name = sql_table_name
return platform, table_name
elif connection == "presto":
parts = sql_table_name.split(".")
catalog = parts[0]
if catalog == "hive":
platform = "hive"
elif catalog == "hive_emr":
platform = "hive_emr"
elif catalog == "redshift":
platform = "redshift"
else:
# Looker lets you exclude a catalog and use a configured default, the default we have configured is to use hive_emr
if sql_table_name.count(".") != 1:
raise Exception("Unknown catalog for sql_table_name: {sql_table_name} for view_name: {view_name}")
platform = "hive_emr"
return platform, sql_table_name
table_name = ".".join(parts[1::])
return platform, table_name
else:
raise Exception(f"Could not find a platform for looker view with connection: {connection}")
def construct_datalineage_urn(view_name: str, connection: str, sql_table_name: str):
platform, table_name = get_platform_and_table(view_name, connection, sql_table_name)
return f"urn:li:dataset:(urn:li:dataPlatform:{platform},{table_name},PROD)"
def construct_data_urn(looker_view: LookerView):
return f"urn:li:dataset:(urn:li:dataPlatform:{LOOKER_VIEW_PLATFORM},{looker_view.view_name},PROD)"
def build_dataset_mce(looker_view: LookerView):
"""
Creates MetadataChangeEvent for the dataset, creating upstream lineage links
"""
actor, sys_time = "urn:li:corpuser:etl", int(time.time()) * 1000
upstreams = [{
"auditStamp":{
"time": sys_time,
"actor":actor
},
"dataset": construct_datalineage_urn(looker_view.view_name, looker_view.connection, sql_table_name),
"type":"TRANSFORMED"
} for sql_table_name in looker_view.sql_table_names]
doc_elements = [{
"url":f"https://github.com/spothero/internal-looker-repo/blob/master/{looker_view.get_relative_file_path()}",
"description":"Github looker view definition",
"createStamp":{
"time": sys_time,
"actor": actor
}
}]
owners = [{
"owner": f"urn:li:corpuser:analysts",
"type": "DEVELOPER"
}]
return {
"auditHeader": None,
"proposedSnapshot":("com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot", {
"urn": construct_data_urn(looker_view),
"aspects": [
("com.linkedin.pegasus2avro.dataset.UpstreamLineage", {"upstreams": upstreams}),
("com.linkedin.pegasus2avro.common.InstitutionalMemory", {"elements": doc_elements}),
("com.linkedin.pegasus2avro.common.Ownership", {
"owners": owners,
"lastModified":{
"time": sys_time,
"actor": actor
}
})
]
}),
"proposedDelta": None
}
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
def make_kafka_producer(extra_kafka_conf):
conf = {
"on_delivery": delivery_report,
**extra_kafka_conf
}
key_schema = avro.loads('{"type": "string"}')
record_schema = avro.load(AVSC_PATH)
producer = AvroProducer(conf, default_key_schema=key_schema, default_value_schema=record_schema)
return producer
def main():
kafka_producer = make_kafka_producer(EXTRA_KAFKA_CONF)
viewfile_loader = LookerViewFileLoader()
looker_models = []
all_views = []
model_files = sorted(f for f in glob.glob(f"{LOOKER_DIRECTORY}/**/*.model.lkml", recursive=True))
for f in model_files:
try:
with open(f, 'r') as file:
parsed = lkml.load(file)
looker_model = LookerModel.from_looker_dict(parsed)
looker_models.append(looker_model)
except Exception as e:
print(e)
print(f"Error processing model file {f}. Skipping it")
for model in looker_models:
for include in model.resolved_includes:
looker_viewfile = viewfile_loader.load_viewfile(include, model.connection)
if looker_viewfile is not None:
for raw_view in looker_viewfile.views:
maybe_looker_view = LookerView.from_looker_dict(raw_view, model.connection, looker_viewfile, viewfile_loader)
if maybe_looker_view:
all_views.append(maybe_looker_view)
for view in all_views:
MCE = build_dataset_mce(view)
print(view)
print(MCE)
kafka_producer.produce(topic=KAFKA_TOPIC, key=MCE['proposedSnapshot'][1]['urn'], value=MCE)
kafka_producer.flush()
if __name__ == "__main__":
main()

View File

@ -1,4 +0,0 @@
lkml==1.1.0
avro-python3==1.8.2
confluent-kafka[avro]==1.4.0
sql-metadata==1.12.0

View File

@ -1,67 +0,0 @@
# Nix sandbox for datahub
## Introduction
database is not suitable for virtualization for it's io performance.
so we use simple nix package tool to install package and setup service on physical machine.
we declare it, then it works. see [sandbox.nix] file for details.
it install software on /nix directory, and run service on launchpad(darwin) and systemd(linux).
NOTE: for linux, ensure 'systemd --user' process running.
## Roadmap
- [X] support mac and linux
- [ ] add environment check script
- [ ] add datahub nix package
- [ ] add datahub[gms, frontend, pipeline] service module
- [ ] add nixops distributed deploy
## Quickstart
1. install nix and channel
```
sudo install -d -m755 -o $(id -u) -g $(id -g) /nix
curl https://nixos.org/nix/install | sh
nix-channel --add https://nixos.org/channels/nixos-20.03 nixpkgs
nix-channel --update nixpkgs
```
2. install home-manager
```
nix-channel --add https://github.com/clojurians-org/home-manager/archive/v1.0.0.tar.gz home-manager
nix-channel --update home-manager
NIX_PATH=~/.nix-defexpr/channels nix-shell '<home-manager>' -A install
```
3. setup environment, and well done!
```
NIX_PATH=~/.nix-defexpr/channels home-manager -f sandbox.nix switch
```
## Client connect
```
mysql => mysql -u root -S /nix/var/run/mysqld.sock
postgresql => psql -h /nix/var/run postgres
elasticsearch => curl http://localhost:9200
neo4j => cypher-shell -uneo4j -pneo4j
zookeeper => zkCli.sh
kafka => kafka-topics.sh --bootstrap-server localhost:9092 --list
confluent schema-registry => curl http://localhost:8081
```
## Environemnt Check
you only need install nix to run it!
```
nix-shell datahub-check.nix -A gms
```

View File

@ -1,99 +0,0 @@
{ pkgs ? import <nixpkgs> {} }:
with pkgs ;
let
datahub = import ./datahub-config.nix ;
build-prompt = ''
echo This derivation is not buildable, instead run it using nix-shell.
exit 1
'' ;
parse-uri = uri :
let
uriSchemaSplit = builtins.split "://" uri ;
schema = builtins.head uriSchemaSplit ;
uriNoSchema = lib.last uriSchemaSplit ;
uriPathSplit = builtins.split "/" uriNoSchema ;
hostPort = builtins.head uriPathSplit ;
path = lib.optionalString (builtins.length uriPathSplit > 1) (lib.last uriPathSplit) ;
hostPortSplit = builtins.split ":" hostPort ;
host = builtins.head hostPortSplit ;
port = lib.last hostPortSplit ;
in { inherit schema host port path ; } ;
gms =
let
gms-conf = datahub.services.linkedin-datahub-gms ;
jdbc-uri = parse-uri gms-conf.sandbox.jdbc.uri ;
elasticsearch-uri = parse-uri (builtins.head gms-conf.sandbox.elasticsearch.uris) ;
neo4j-uri = parse-uri gms-conf.sandbox.neo4j.uri ;
kafka-uri = parse-uri (builtins.head gms-conf.sandbox.kafka.uris) ;
schema-registry-uri = parse-uri (builtins.head gms-conf.sandbox.schema-registry.uris) ;
gms-uri = parse-uri gms-conf.listener ;
check-port = name : uri : ''
echo " [${name}] checking port..."
${netcat-gnu}/bin/nc -z ${uri.host} ${uri.port}
if [ $? != 0 ]; then echo " [${name}] !ERROR: can not connec to ${uri.host}:${uri.port}" && exit 1; fi
'' ;
check-jdbc-user = ''
# echo " [jdbc] checking username and password..."
'' ;
check-jdbc-table = ''
# echo " [jdbc] checking [metadata_aspect] table..."
'' ;
check-elasticsearch-index = ''
# echo " [elasticsearch] checking [corpuserinfodocument, datasetdocument] indices ..."
'' ;
check-neo4j-user = ''
# echo " [neo4j] checking user and password..."
'' ;
check-kafka-topic = ''
# echo " [kafka] checking [MetadataChangeEvent, MetadataAuditEvent] indices..."
'' ;
in
stdenv.mkDerivation {
name = "gms-check" ;
buildInputs = [ netcat-gnu ] ;
preferLocalBuild = true ;
buildCommand = build-prompt ;
shellHookOnly = true;
shellHook = ''
echo "******** checking sandbox.jdbc "
${check-port "jdbc" jdbc-uri}
${check-jdbc-user }
${check-jdbc-table }
echo "******** checking sandbox.elasticsearch "
${check-port "elasticsearch" elasticsearch-uri}
${check-elasticsearch-index}
echo "******** checking sandbox.neo4j "
${check-port "neo4j" neo4j-uri}
${check-neo4j-user }
echo "******** checking sandbox.kafka "
${check-port "kafka" kafka-uri}
${check-kafka-topic }
echo "******** checking sandbox.schema-registry "
${check-port "schema-registry" schema-registry-uri}
echo "******** checking gms "
${check-port "gms" gms-uri}
exit 0
'' ;
} ;
frontend =
let
frontend-conf = ddatahub.services.linkedin-datahub-frontend ;
in {} ;
pipeline =
let
pipeline-conf = ddatahub.services.linkedin-datahub-pipeline ;
in {} ;
in { inherit gms frontend pipeline;}

View File

@ -1,31 +0,0 @@
{
services.linkedin-datahub-gms = {
enable = true;
sandbox = {
jdbc.uri = "jdbc:postgresql://localhost:5432/datahub" ;
jdbc.username = "datahub" ;
jdbc.password = "datahub" ;
elasticsearch.uris = [ "http://localhost:9200" ] ;
neo4j.uri = "bolt://localhost:7687" ;
neo4j.username = "neo4j" ;
neo4j.password = "datahub" ;
kafka.uris = [ "PLAINTEXT://localhost:9092" ] ;
schema-registry.uris = [ "http://localhost:8081" ] ;
} ;
listener = "http://localhost:8080" ;
} ;
services.linkedin-datahub-frontend = {
enable = true ;
listener = "http://localhost:9001" ;
linkedin-datahub-gms.uri = "http://localhost:8080" ;
} ;
services.linkedin-datahub-pipeline = {
enable = true ;
linkedin-datahub-gms.uri = "http://localhost:8080" ;
sandbox = {
kafka.uris = [ "PLAINTEXT://localhost:9092" ] ;
schema-registry.uris = [ "http://localhost:8081" ] ;
} ;
} ;
}

View File

@ -1,80 +0,0 @@
{ config, pkgs, ... }:
{
# Let Home Manager install and manage itself.
programs.home-manager.enable = true;
# This value determines the Home Manager release that your
# configuration is compatible with. This helps avoid breakage
# when a new Home Manager release introduces backwards
# incompatible changes.
#
# You can update Home Manager without changing this value. See
# the Home Manager release notes for a list of state version
# changes in each release.
home.stateVersion = "19.09";
environment.systemPackages = [
pkgs.gradle
pkgs.postgresql_11
pkgs.mysql57
pkgs.elasticsearch
pkgs.neo4j
pkgs.zookeeper
pkgs.apacheKafka
pkgs.confluent-platform
pkgs.kafkacat
pkgs.neo4j
];
services.postgresql = {
enable = true ;
package = pkgs.postgresql_11 ;
dataDir = "/opt/nix-module/data/postgresql" ;
} ;
services.mysql = {
enable = true ;
# package = pkgs.mysql80 ;
package = pkgs.mysql57 ;
dataDir = "/opt/nix-module/data/mysql" ;
} ;
services.elasticsearch = {
enable = true ;
# package = pkgs.elasticsearch7 ;
package = pkgs.elasticsearch ;
dataDir = "/opt/nix-module/data/elasticsearch" ;
} ;
services.neo4j = {
enable = true ;
package = pkgs.neo4j ;
directories.home = "/opt/nix-module/data/neo4j" ;
} ;
services.zookeeper = {
enable = true ;
package = pkgs.zookeeper ;
dataDir = "/opt/nix-module/data/zookeeper" ;
} ;
services.apache-kafka = {
enable = true ;
package = pkgs.apacheKafka ;
logDirs = [ "/opt/nix-module/data/kafka" ] ;
zookeeper = "localhost:2181" ;
extraProperties = ''
offsets.topic.replication.factor = 1
zookeeper.session.timeout.ms = 600000
'' ;
} ;
services.confluent-schema-registry = {
enable = true ;
package = pkgs.confluent-platform ;
kafkas = [ "PLAINTEXT://localhost:9092" ] ;
} ;
}

View File

@ -1,40 +0,0 @@
# Metadata Ingestion
**LEGACY**
This is a legacy module. The examples here are not actively maintained and may not work as described. Please see the `metadata-ingestion` module for more up-to-date uses.
This directory contains example apps for ingesting data into DataHub.
You are more than welcome to use these examples directly, or use them as a reference for you own jobs.
See the READMEs of each example for more information on each.
### Common themes
All these examples ingest by firing MetadataChangeEvent Kafka events. They do not ingest directly into DataHub, though
this is possible. Instead, the mce-consumer-job should be running, listening for these events, and perform the ingestion
for us.
### A note on languages
We initially wrote these examples in Python (they still exist in `contrib`). The idea was that these were very small
example scripts, that should've been easy to use. However, upon reflection, not all developers are familiar with Python,
and the lack of types can hinder development. So the decision was made to port the examples to Java.
You're more than welcome to extrapolate these examples into whatever languages you like. At LinkedIn, we primarily use
Java.
### Ingestion at LinkedIn
It is worth noting that we do not use any of these examples directly (in Java, Python, or anything else) at LinkedIn. We
have several different pipelines for ingesting data; it all depends on the source.
- Some pipelines are based off other Kafka events, where we'll transform some existing Kafka event to a metadata event.
- For example, we get Kafka events hive changes. We make MCEs out of those hive events to ingest hive data.
- For others, we've directly instrumented existing pipelines / apps / jobs to also emit metadata events.
- For others still, we've created a series offline jobs to ingest data.
- For example, we have an Azkaban job to process our HDFS datasets.
For some sources of data one of these example scripts may work fine. For others, it may make more sense to have some
custom logic, like the above list. Namely, all these examples today are one-off (they run, fire events, and then stop),
you may wish to build continuous ingestion pipelines instead.

View File

@ -1,29 +0,0 @@
plugins {
id 'java'
}
dependencies {
compile project(':metadata-dao-impl:kafka-producer')
compile externalDependency.javaxInject
compile externalDependency.kafkaAvroSerde
compile externalDependency.lombok
compile externalDependency.springBeans
compile externalDependency.springBootAutoconfigure
compile externalDependency.springCore
compile externalDependency.springKafka
compile externalDependency.zookeeper
annotationProcessor externalDependency.lombok
runtime externalDependency.logbackClassic
constraints {
implementation("org.apache.logging.log4j:log4j-core:2.17.0") {
because("previous versions are vulnerable to CVE-2021-45105")
}
implementation("org.apache.logging.log4j:log4j-api:2.17.0") {
because("previous versions are vulnerable to CVE-2021-45105")
}
}
}

View File

@ -1,42 +0,0 @@
package com.linkedin.metadata.examples.configs;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Arrays;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}")
private String kafkaBootstrapServers;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Bean(name = "kafkaProducer")
public Producer<String, GenericRecord> kafkaProducerFactory(KafkaProperties properties) {
KafkaProperties.Producer producerProps = properties.getProducer();
producerProps.setKeySerializer(StringSerializer.class);
producerProps.setValueSerializer(KafkaAvroSerializer.class);
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> props = properties.buildProducerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
return new KafkaProducer<>(props);
}
}

View File

@ -1,19 +0,0 @@
package com.linkedin.metadata.examples.configs;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class SchemaRegistryConfig {
@Value("${SCHEMAREGISTRY_URL:http://localhost:8081}")
private String schemaRegistryUrl;
@Bean(name = "schemaRegistryClient")
public SchemaRegistryClient schemaRegistryFactory() {
return new CachedSchemaRegistryClient(schemaRegistryUrl, 512);
}
}

View File

@ -1,52 +0,0 @@
package com.linkedin.metadata.examples.configs;
import com.linkedin.mxe.TopicConvention;
import com.linkedin.mxe.TopicConventionImpl;
import com.linkedin.mxe.Topics;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Creates a {@link TopicConvention} to generate kafka metadata event topic names.
*
* <p>This allows you to easily override Kafka topic names within your organization.
*/
@Configuration
public class TopicConventionFactory {
public static final String TOPIC_CONVENTION_BEAN = "metadataKafkaTopicConvention";
@Value("${METADATA_CHANGE_EVENT_NAME:" + Topics.METADATA_CHANGE_EVENT + "}")
private String metadataChangeEventName;
@Value("${METADATA_AUDIT_EVENT_NAME:" + Topics.METADATA_AUDIT_EVENT + "}")
private String metadataAuditEventName;
@Value("${FAILED_METADATA_CHANGE_EVENT_NAME:" + Topics.FAILED_METADATA_CHANGE_EVENT + "}")
private String failedMetadataChangeEventName;
@Value("${METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.METADATA_CHANGE_PROPOSAL + "}")
private String metadataChangeProposalName;
@Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}")
private String metadataChangeLogVersionedTopicName;
@Value("${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}")
private String metadataChangeLogTimeseriesTopicName;
@Value("${FAILED_METADATA_CHANGE_PROPOSAL_TOPIC_NAME:" + Topics.FAILED_METADATA_CHANGE_PROPOSAL + "}")
private String failedMetadataChangeProposalName;
@Value("${PLATFORM_EVENT_TOPIC_NAME:" + Topics.PLATFORM_EVENT + "}")
private String platformEventTopicName;
@Bean(name = TOPIC_CONVENTION_BEAN)
protected TopicConvention createInstance() {
return new TopicConventionImpl(metadataChangeEventName, metadataAuditEventName, failedMetadataChangeEventName,
metadataChangeProposalName, metadataChangeLogVersionedTopicName, metadataChangeLogTimeseriesTopicName,
failedMetadataChangeProposalName, platformEventTopicName,
// TODO once we start rolling out v5 add support for changing the new event names.
TopicConventionImpl.DEFAULT_EVENT_PATTERN);
}
}

View File

@ -1,26 +0,0 @@
package com.linkedin.metadata.examples.configs;
import java.io.IOException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZooKeeperConfig {
@Value("${ZOOKEEPER:localhost:2181}")
private String zookeeper;
@Value("${ZOOKEEPER_TIMEOUT_MILLIS:3000}")
private int timeoutMillis;
@Bean(name = "zooKeeper")
public ZooKeeper zooKeeperFactory() throws IOException {
Watcher noopWatcher = event -> {
};
return new ZooKeeper(zookeeper, timeoutMillis, noopWatcher);
}
}

View File

@ -1,40 +0,0 @@
# Kafka ETL
A small application which reads existing Kafka topics from ZooKeeper, retrieves their schema from the schema registry,
and then fires an MCE for each schema.
## Running the Application
First, ensure that services this depends on, like schema registry / zookeeper / mce-consumer-job / gms / etc, are all
running.
This application can be run via gradle:
```
./gradlew :metadata-ingestion-examples:kafka-etl:bootRun
```
Or by building and running the jar:
```
./gradlew :metadata-ingestion-examples:kafka-etl:build
java -jar metadata-ingestion-examples/kafka-etl/build/libs/kafka-etl.jar
```
### Environment Variables
See the files under `src/main/java/com/linkedin/metadata/examples/kafka/config` for a list of customizable spring
environment variables.
### Common pitfalls
For events to be fired correctly, schemas must exist in the schema registry. If a topic was newly created, but no schema
has been registered for it yet, this application will fail to retrieve the schema for that topic. Check the output of
the application to see if this happens. If you see a message like
```
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
```
Then the odds are good that you need to register the schema for this topic.

View File

@ -1,36 +0,0 @@
plugins {
id 'org.springframework.boot'
id 'java'
}
dependencies {
compile project(':metadata-utils')
compile project(':metadata-dao-impl:kafka-producer')
compile project(':metadata-events:mxe-schemas')
compile project(':metadata-ingestion-examples:common')
compile externalDependency.javaxInject
compile externalDependency.kafkaAvroSerde
compile externalDependency.lombok
compile externalDependency.springBeans
compile externalDependency.springBootAutoconfigure
compile externalDependency.springCore
compile externalDependency.springKafka
annotationProcessor externalDependency.lombok
runtime externalDependency.logbackClassic
constraints {
implementation("org.apache.logging.log4j:log4j-core:2.17.0") {
because("previous versions are vulnerable to CVE-2021-45105")
}
implementation("org.apache.logging.log4j:log4j-api:2.17.0") {
because("previous versions are vulnerable to CVE-2021-45105")
}
}
}
bootJar {
mainClassName = 'com.linkedin.metadata.examples.kafka.KafkaEtlApplication'
}

View File

@ -1,121 +0,0 @@
package com.linkedin.metadata.examples.kafka;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.FabricType;
import com.linkedin.common.urn.CorpuserUrn;
import com.linkedin.common.urn.DataPlatformUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.metadata.aspect.DatasetAspect;
import com.linkedin.metadata.dao.producer.KafkaMetadataEventProducer;
import com.linkedin.metadata.examples.configs.TopicConventionFactory;
import com.linkedin.metadata.snapshot.DatasetSnapshot;
import com.linkedin.mxe.MetadataChangeEvent;
import com.linkedin.mxe.TopicConvention;
import com.linkedin.schema.KafkaSchema;
import com.linkedin.schema.SchemaField;
import com.linkedin.schema.SchemaFieldArray;
import com.linkedin.schema.SchemaFieldDataType;
import com.linkedin.schema.SchemaMetadata;
import com.linkedin.schema.StringType;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
* Gathers Kafka topics from the local zookeeper instance and schemas from the schema registry, and then fires
* MetadataChangeEvents for their schemas.
*
* <p>This should cause DataHub to be populated with this information, assuming it and the mce-consumer-job are running
* locally.
*
* <p>Can be run with {@code ./gradlew :metadata-ingestion-examples:java:kafka-etl:bootRun}.
*/
@Slf4j
@Component
public final class KafkaEtl implements CommandLineRunner {
private static final DataPlatformUrn KAFKA_URN = new DataPlatformUrn("kafka");
@Inject
@Named("kafkaProducer")
private Producer<String, GenericRecord> _producer;
@Inject
@Named(TopicConventionFactory.TOPIC_CONVENTION_BEAN)
private TopicConvention _topicConvention;
@Inject
@Named("zooKeeper")
private ZooKeeper _zooKeeper;
@Inject
@Named("schemaRegistryClient")
private SchemaRegistryClient _schemaRegistryClient;
private SchemaMetadata buildDatasetSchema(String datasetName, String schema, int schemaVersion) {
final AuditStamp auditStamp = new AuditStamp();
auditStamp.setTime(System.currentTimeMillis());
auditStamp.setActor(new CorpuserUrn(System.getenv("USER")));
final SchemaMetadata.PlatformSchema platformSchema = new SchemaMetadata.PlatformSchema();
platformSchema.setKafkaSchema(new KafkaSchema().setDocumentSchema(schema));
return new SchemaMetadata().setSchemaName(datasetName)
.setPlatform(KAFKA_URN)
.setCreated(auditStamp)
.setLastModified(auditStamp)
.setVersion(schemaVersion)
.setHash("")
.setPlatformSchema(platformSchema)
.setFields(new SchemaFieldArray(new SchemaField().setFieldPath("")
.setDescription("")
.setNativeDataType("string")
.setType(new SchemaFieldDataType().setType(SchemaFieldDataType.Type.create(new StringType())))));
}
private void produceKafkaDatasetMce(SchemaMetadata schemaMetadata) {
MetadataChangeEvent.class.getClassLoader().getResource("avro/com/linkedin/mxe/MetadataChangeEvent.avsc");
// Kafka topics are considered datasets in the current DataHub metadata ecosystem.
final KafkaMetadataEventProducer<DatasetSnapshot, DatasetAspect, DatasetUrn> eventProducer =
new KafkaMetadataEventProducer<>(DatasetSnapshot.class, DatasetAspect.class, _producer, _topicConvention);
eventProducer.produceSnapshotBasedMetadataChangeEvent(
new DatasetUrn(KAFKA_URN, schemaMetadata.getSchemaName(), FabricType.PROD), schemaMetadata);
_producer.flush();
}
@Override
public void run(String... args) throws Exception {
log.info("Starting up");
final List<String> topics = _zooKeeper.getChildren("/brokers/topics", false);
for (String datasetName : topics) {
if (datasetName.startsWith("_")) {
continue;
}
final String topic = datasetName + "-value";
io.confluent.kafka.schemaregistry.client.SchemaMetadata schemaMetadata;
try {
schemaMetadata = _schemaRegistryClient.getLatestSchemaMetadata(topic);
} catch (Throwable t) {
log.error("Failed to get schema for topic " + datasetName, t);
log.error("Common failure: does this event schema exist in the schema registry?");
continue;
}
if (schemaMetadata == null) {
log.warn(String.format("Skipping topic without schema: %s", topic));
continue;
}
log.trace(topic);
produceKafkaDatasetMce(buildDatasetSchema(datasetName, schemaMetadata.getSchema(), schemaMetadata.getVersion()));
log.info("Successfully fired MCE for " + datasetName);
}
}
}

View File

@ -1,16 +0,0 @@
package com.linkedin.metadata.examples.kafka;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class}, scanBasePackages = {
"com.linkedin.metadata.examples.configs", "com.linkedin.metadata.examples.kafka"})
public class KafkaEtlApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaEtlApplication.class).web(WebApplicationType.NONE).run(args);
}
}

View File

@ -1,40 +0,0 @@
<configuration>
<property name="LOG_DIR" value="${LOG_DIR:- /tmp/datahub/logs}"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_DIR}/kafka-etl-java.log</file>
<append>true</append>
<encoder>
<pattern>%d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<FileNamePattern>${LOG_DIR}/kafka-etl.%i.log</FileNamePattern>
<minIndex>1</minIndex>
<maxIndex>3</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>100MB</MaxFileSize>
</triggeringPolicy>
</appender>
<logger name="org.apache.kafka.clients" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE"/>
</logger>
<logger name="com.linkedin.metadata.examples.kafka" level="info" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE"/>
</logger>
<root level="warn">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE"/>
</root>
</configuration>

View File

@ -1,55 +0,0 @@
# MCE CLI
A small application which can produce or consume [MetadataChangeEvents](../../docs/what/mxe.md).
## Running the Application
First, ensure that services this depends on, like schema registry / zookeeper / mce-consumer-job / gms / etc, are all
running.
This application can be run via gradle:
```
./gradlew :metadata-ingestion-examples:mce-cli:bootRun
```
Or by building and running the jar:
```
./gradlew :metadata-ingestion-examples:mce-cli:build
java -jar metadata-ingestion-examples/mce-cli/build/libs/mce-cli.jar
```
### Consuming Events
Consuming MCEs may be useful to help debug other applications that are meant to produce them. You can easily see what
MCEs are being produced (or not) at a glance.
```
./gradlew :metadata-ingestion-examples:mce-cli:bootRun
# Alternatives
./gradlew :metadata-ingestion-examples:mce-cli:bootRun --args='consume'
java -jar metadata-ingestion-examples/mce-cli/build/libs/mce-cli.jar
java -jar metadata-ingestion-examples/mce-cli/build/libs/mce-cli.jar consume
```
### Producing Events
Producing events can be useful to help debug the MCE pipeline, or just to help make some fake data (ideally, don't do
this on your production stack!).
```
./gradlew :metadata-ingestion-examples:mce-cli:bootRun --args='-m produce my-file.json'
# Alternatively
java -jar metadata-ingestion-examples/mce-cli/build/libs/mce-cli.jar -m produce my-file.json
```
Where `my-file.json` is some file that contains a
[MetadataChangEvents](./src/main/pegasus/com/linkedin/metadata/examples/cli/MetadataChangeEvents.pdl) JSON object.
### Producing the Example Events with Docker
We have some example events in the `example-bootstrap.json` file, which can be invoked via the above example.

View File

@ -1,43 +0,0 @@
plugins {
id 'org.springframework.boot'
id 'java'
id 'pegasus'
}
dependencies {
compile project(':metadata-utils')
compile project(':metadata-dao-impl:kafka-producer')
compile project(':metadata-events:mxe-schemas')
compile project(':metadata-ingestion-examples:common')
dataModel project(':metadata-models')
compile spec.product.pegasus.restliServer
compile externalDependency.javaxInject
compile externalDependency.kafkaAvroSerde
compile externalDependency.lombok
compile externalDependency.picocli
compile externalDependency.springBeans
compile externalDependency.springBootAutoconfigure
compile externalDependency.springCore
compile externalDependency.springKafka
runtime externalDependency.logbackClassic
annotationProcessor externalDependency.lombok
annotationProcessor externalDependency.picocli
constraints {
implementation("org.apache.logging.log4j:log4j-core:2.17.0") {
because("previous versions are vulnerable to CVE-2021-45105")
}
implementation("org.apache.logging.log4j:log4j-api:2.17.0") {
because("previous versions are vulnerable to CVE-2021-45105")
}
}
}
bootJar {
mainClassName = 'com.linkedin.metadata.examples.cli.MceCliApplication'
}

View File

@ -1,602 +0,0 @@
{
"events": [
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:datahub",
"aspects": [
{
"com.linkedin.identity.CorpUserInfo": {
"active": true,
"displayName": "Data Hub",
"fullName": "Data Hub",
"email": "datahub@linkedin.com",
"title": "CEO"
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.CorpUserSnapshot": {
"urn": "urn:li:corpuser:jdoe",
"aspects": [
{
"com.linkedin.identity.CorpUserInfo": {
"active": true,
"displayName": "John Doe",
"fullName": "John Doe",
"email": "jdoe@linkedin.com",
"title": "Software Engineer"
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)",
"aspects": [
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:jdoe",
"type": "DATAOWNER"
},
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
},
{
"com.linkedin.common.InstitutionalMemory": {
"elements": [
{
"url": "https://www.linkedin.com",
"description": "Sample doc",
"createStamp": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
]
}
},
{
"com.linkedin.schema.SchemaMetadata": {
"schemaName": "SampleKafkaSchema",
"platform": "urn:li:dataPlatform:kafka",
"version": 0,
"created": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
},
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.KafkaSchema": {
"documentSchema": "{\"type\":\"record\",\"name\":\"SampleKafkaSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Kafka dataset\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"
}
},
"fields": [
{
"fieldPath": "field_foo",
"description": "Foo field description",
"nativeDataType": "string",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"glossaryTerms": {
"terms": [{
"urn": "urn:li:glossaryTerm:instruments.FinancialInstrument_v1"
}],
"auditStamp": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
},
{
"fieldPath": "field_bar",
"description": "Bar field description",
"nativeDataType": "boolean",
"type": {
"type": {
"com.linkedin.schema.BooleanType": {}
}
}
}
]
}
},
{
"com.linkedin.common.GlossaryTerms": {
"terms": [{
"urn": "urn:li:glossaryTerm:instruments.FinancialInstrument_v1"
}],
"auditStamp": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)",
"aspects": [
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:jdoe",
"type": "DATAOWNER"
},
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
},
{
"com.linkedin.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)",
"type": "TRANSFORMED"
}
]
}
},
{
"com.linkedin.common.InstitutionalMemory": {
"elements": [
{
"url": "https://www.linkedin.com",
"description": "Sample doc",
"createStamp": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
]
}
},
{
"com.linkedin.schema.SchemaMetadata": {
"schemaName": "SampleHdfsSchema",
"platform": "urn:li:dataPlatform:hdfs",
"version": 0,
"created": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
},
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.KafkaSchema": {
"documentSchema": "{\"type\":\"record\",\"name\":\"SampleHdfsSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample HDFS dataset\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"
}
},
"fields": [
{
"fieldPath": "field_foo",
"description": "Foo field description",
"nativeDataType": "string",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
}
},
{
"fieldPath": "field_bar",
"description": "Bar field description",
"nativeDataType": "boolean",
"type": {
"type": {
"com.linkedin.schema.BooleanType": {}
}
}
}
]
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
"aspects": [
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:jdoe",
"type": "DATAOWNER"
},
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
},
{
"com.linkedin.dataset.UpstreamLineage": {
"upstreams": [
{
"auditStamp": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)",
"type": "TRANSFORMED"
}
]
}
},
{
"com.linkedin.common.InstitutionalMemory": {
"elements": [
{
"url": "https://www.linkedin.com",
"description": "Sample doc",
"createStamp": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
]
}
},
{
"com.linkedin.schema.SchemaMetadata": {
"schemaName": "SampleHiveSchema",
"platform": "urn:li:dataPlatform:hive",
"version": 0,
"created": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
},
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.KafkaSchema": {
"documentSchema": "{\"type\":\"record\",\"name\":\"SampleHiveSchema\",\"namespace\":\"com.linkedin.dataset\",\"doc\":\"Sample Hive dataset\",\"fields\":[{\"name\":\"field_foo\",\"type\":[\"string\"]},{\"name\":\"field_bar\",\"type\":[\"boolean\"]}]}"
}
},
"fields": [
{
"fieldPath": "field_foo",
"description": "Foo field description",
"nativeDataType": "string",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
}
},
{
"fieldPath": "field_bar",
"description": "Bar field description",
"nativeDataType": "boolean",
"type": {
"type": {
"com.linkedin.schema.BooleanType": {}
}
}
}
]
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.DataProcessSnapshot": {
"urn": "urn:li:dataProcess:(sqoop,DEMO,PROD)",
"aspects": [
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:datahub"
}
}
},
{
"com.linkedin.dataprocess.DataProcessInfo": {
"inputs": [
"urn:li:dataset:(urn:li:dataPlatform:cassandra,barEarth,DEV)",
"urn:li:dataset:(urn:li:dataPlatform:cassandra,barMars,DEV)"
],
"outputs": [
"urn:li:dataset:(urn:li:dataPlatform:hbase,barSky,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:hbase,barOcean,PROD)"
]
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.MLModelSnapshot": {
"urn": "urn:li:mlmodel:(urn:li:dataPlatform:science,scienceModel,PROD)",
"aspects": [
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:jdoe",
"type": "DATAOWNER"
},
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:jdoe"
}
}
},
{
"com.linkedin.ml.metadata.MLModelProperties": {
"description": "A sample model for predicting some outcome.",
"date": 0,
"version": {
"versionTag": "1.5.3"
},
"tags": [
"science"
]
}
},
{
"com.linkedin.ml.metadata.TrainingData": {
"trainingData": [
{
"dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,pageViewsHive,PROD)",
"motivation": "For science!",
"preProcessing": [
"Aggregation"
]
}
]
}
},
{
"com.linkedin.ml.metadata.EvaluationData": {
"evaluationData": [
{
"dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,pageViewsHive,PROD)"
}
]
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.ChartSnapshot": {
"urn": "urn:li:chart:(Looker,1)",
"aspects": [
{
"com.linkedin.chart.ChartInfo": {
"title": "Sample Looker Chart",
"description": "This chart contains sample data from Kafka",
"lastModified": {
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:datahub"
},
"created": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
},
"inputs": [
{
"string": "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)"
}
],
"chartUrl": "https://www.looker.com",
"type": "BAR",
"access": "PUBLIC"
}
},
{
"com.linkedin.chart.ChartQuery": {
"rawQuery": "SELECT * FROM SampleTable",
"type": "SQL"
}
},
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "STAKEHOLDER"
},
{
"owner": "urn:li:corpuser:jdoe",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 1581407589000,
"actor": "urn:li:corpuser:datahub"
}
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.DashboardSnapshot": {
"urn": "urn:li:dashboard:(Looker,0)",
"aspects": [
{
"com.linkedin.dashboard.DashboardInfo": {
"title": "Sample Looker Dashboard",
"description": "This dashboard shows charts about user retention.",
"lastModified": {
"lastModified": {
"time": 1581407139000,
"actor": "urn:li:corpuser:datahub"
},
"created": {
"time": 1581404189000,
"actor": "urn:li:corpuser:jdoe"
}
},
"charts": [ "urn:li:chart:(Looker,1)" ],
"dashboardUrl": "https://www.looker.com",
"access": "PUBLIC"
}
},
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:datahub",
"type": "DATAOWNER"
},
{
"owner": "urn:li:corpuser:jdoe",
"type": "STAKEHOLDER"
}
],
"lastModified": {
"time": 1581407389000,
"actor": "urn:li:corpuser:jdoe"
}
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.GlossaryTermSnapshot": {
"urn": "urn:li:glossaryTerm:instruments.FinancialInstrument_v1",
"aspects": [
{
"com.linkedin.glossary.GlossaryTermInfo": {
"definition": "written contract that gives rise to both a financial asset of one entity and a financial liability of another entity",
"parentNode": "urn:li:glossaryNode:instruments",
"sourceRef": "FIBO",
"termSource": "EXTERNAL",
"sourceUrl": "https://spec.edmcouncil.org/fibo/ontology/FBC/FinancialInstruments/FinancialInstruments/FinancialInstrument",
"customProperties": {
"FQDN": "common.instruments.FinancialInstrument"
}
}
},
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:jdoe",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
}
]
}
}
},
{
"proposedSnapshot": {
"com.linkedin.metadata.snapshot.GlossaryNodeSnapshot": {
"urn": "urn:li:glossaryNode:instruments",
"aspects": [
{
"com.linkedin.glossary.GlossaryNodeInfo": {
"definition": "Financial Instruments"
}
},
{
"com.linkedin.common.Ownership": {
"owners": [
{
"owner": "urn:li:corpuser:jdoe",
"type": "DATAOWNER"
}
],
"lastModified": {
"time": 1581407189000,
"actor": "urn:li:corpuser:jdoe"
}
}
}
]
}
}
}
]
}

View File

@ -1,127 +0,0 @@
package com.linkedin.metadata.examples.cli;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.linkedin.data.DataMap;
import com.linkedin.data.schema.validation.RequiredMode;
import com.linkedin.data.schema.validation.UnrecognizedFieldMode;
import com.linkedin.data.schema.validation.ValidateDataAgainstSchema;
import com.linkedin.data.schema.validation.ValidationOptions;
import com.linkedin.data.schema.validation.ValidationResult;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.metadata.EventUtils;
import com.linkedin.mxe.MetadataChangeEvent;
import com.linkedin.mxe.Topics;
import com.linkedin.restli.common.ContentType;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import javax.inject.Inject;
import javax.inject.Named;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import picocli.CommandLine;
@Slf4j
@Component
public class MceCli implements CommandLineRunner {
private enum Mode {
PRODUCE, CONSUME
}
private static final class Args {
@CommandLine.Option(names = {"-m", "--mode"}, defaultValue = "CONSUME")
Mode mode;
@CommandLine.Parameters(
paramLabel = "EVENT_FILE",
description = "MCE file; required if running 'producer' mode. See MetadataChangeEvents.pdl for schema.",
arity = "0..1"
)
File eventFile;
}
@Inject
@Named("kafkaProducer")
private Producer<String, GenericRecord> _producer;
@Inject
@Named("kafkaEventConsumer")
private Consumer<String, GenericRecord> _consumer;
private void consume() {
log.info("Consuming records.");
_consumer.subscribe(ImmutableList.of(Topics.METADATA_CHANGE_EVENT));
while (true) {
final ConsumerRecords<String, GenericRecord> records = _consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, GenericRecord> record : records) {
log.info(record.value().toString());
}
}
}
@VisibleForTesting
static MetadataChangeEvents readEventsFile(@Nonnull File eventsFile) throws IOException {
final DataMap dataMap = ContentType.JSON.getCodec().readMap(new FileInputStream(eventsFile));
final ValidationOptions options = new ValidationOptions();
options.setRequiredMode(RequiredMode.CAN_BE_ABSENT_IF_HAS_DEFAULT);
options.setUnrecognizedFieldMode(UnrecognizedFieldMode.DISALLOW);
final ValidationResult result =
ValidateDataAgainstSchema.validate(dataMap, DataTemplateUtil.getSchema(MetadataChangeEvents.class), options);
if (!result.isValid()) {
throw new IllegalArgumentException(
String.format("Error parsing metadata events file %s: \n%s", eventsFile.toString(),
Joiner.on('\n').join(result.getMessages())));
}
return DataTemplateUtil.wrap(dataMap, MetadataChangeEvents.class);
}
private void produce(@Nonnull File eventsFile) throws IOException, ExecutionException, InterruptedException {
final MetadataChangeEvents events = readEventsFile(eventsFile);
int record = 1;
for (MetadataChangeEvent mce : events.getEvents()) {
log.info("Producing record {} of {}", record++, events.getEvents().size());
_producer.send(new ProducerRecord(Topics.METADATA_CHANGE_EVENT, EventUtils.pegasusToAvroMCE(mce))).get();
log.info("Produced record.");
}
}
@Override
public void run(String... cmdLineArgs) throws Exception {
final Args args = new Args();
new CommandLine(args).setCaseInsensitiveEnumValuesAllowed(true).parseArgs(cmdLineArgs);
switch (args.mode) {
case CONSUME:
consume();
break;
case PRODUCE:
if (args.eventFile == null) {
throw new IllegalArgumentException("Event file is required when producing.");
}
produce(args.eventFile);
break;
default:
break;
}
}
}

View File

@ -1,16 +0,0 @@
package com.linkedin.metadata.examples.cli;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class}, scanBasePackages = {
"com.linkedin.metadata.examples.configs", "com.linkedin.metadata.examples.cli"})
public class MceCliApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MceCliApplication.class).web(WebApplicationType.NONE).run(args);
}
}

View File

@ -1,43 +0,0 @@
package com.linkedin.metadata.examples.cli.config;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.util.Arrays;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConsumerConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER:localhost:9092}")
private String kafkaBootstrapServers;
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;
@Bean(name = "kafkaEventConsumer")
public Consumer<String, GenericRecord> kafkaConsumerFactory(KafkaProperties properties) {
KafkaProperties.Consumer consumerProps = properties.getConsumer();
consumerProps.setKeyDeserializer(StringDeserializer.class);
consumerProps.setValueDeserializer(KafkaAvroDeserializer.class);
consumerProps.setGroupId("mce-cli");
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
consumerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092
Map<String, Object> props = properties.buildConsumerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);
return new KafkaConsumer<>(props);
}
}

View File

@ -1,13 +0,0 @@
namespace com.linkedin.metadata.examples.cli
import com.linkedin.mxe.MetadataChangeEvent
/**
* Schema definition for the format of the input file to the CLI for producing events.
*/
record MetadataChangeEvents {
/**
* Events to produce.
*/
events: array[MetadataChangeEvent]
}

View File

@ -1,40 +0,0 @@
<configuration>
<property name="LOG_DIR" value="${LOG_DIR:- /tmp/datahub/logs}"/>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_DIR}/kafka-etl-java.log</file>
<append>true</append>
<encoder>
<pattern>%d{HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<FileNamePattern>${LOG_DIR}/kafka-etl.%i.log</FileNamePattern>
<minIndex>1</minIndex>
<maxIndex>3</maxIndex>
</rollingPolicy>
<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<MaxFileSize>100MB</MaxFileSize>
</triggeringPolicy>
</appender>
<logger name="org.apache.kafka.clients" level="warn" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE"/>
</logger>
<logger name="com.linkedin.metadata.examples.cli" level="info" additivity="false">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE"/>
</logger>
<root level="warn">
<appender-ref ref="STDOUT" />
<appender-ref ref="FILE"/>
</root>
</configuration>

View File

@ -1,30 +0,0 @@
package com.linkedin.metadata.examples.cli;
import com.linkedin.restli.common.ContentType;
import java.io.File;
import java.io.FileInputStream;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
* Simple test to help us keep our example file up to date with the MCE schema definition, in the event we change
* schemas, or change the file without manually testing it (which we shouldn't do, but can happen by mistake).
*/
public class TestExamples {
private static final File EXAMPLE_FILE = new File("example-bootstrap.json");
@Test
public void examplesAreValidJson() throws Exception {
assertTrue(EXAMPLE_FILE.exists());
// no exception = test passes
ContentType.JSON.getCodec().readMap(new FileInputStream(EXAMPLE_FILE));
}
@Test
public void examplesMatchSchemas() throws Exception {
// no exception = test passes
MceCli.readEventsFile(EXAMPLE_FILE);
}
}

View File

@ -19,9 +19,6 @@ include 'metadata-events:mxe-registration'
include 'metadata-events:mxe-schemas'
include 'metadata-events:mxe-utils-avro-1.7'
include 'metadata-ingestion'
include 'metadata-ingestion-examples:common'
include 'metadata-ingestion-examples:kafka-etl'
include 'metadata-ingestion-examples:mce-cli'
include 'metadata-jobs:mae-consumer'
include 'metadata-jobs:mce-consumer'
include 'metadata-jobs:pe-consumer'
@ -46,4 +43,4 @@ include 'metadata-integration:java:datahub-client'
include 'metadata-integration:java:datahub-protobuf'
include 'metadata-ingestion-modules:airflow-plugin'
include 'ingestion-scheduler'
include 'smoke-test'
include 'smoke-test'

View File

@ -1,174 +0,0 @@
#!/usr/bin/env python3
import os
from datetime import datetime
from typing import Optional, Generator, Tuple
# import hashlib
HOUR_IN_MS = 3600000
DAY_IN_MS = 86400000
START_DAY_IN_MS = int(datetime.now().timestamp() * 1000) - 5 * DAY_IN_MS
CounterType = Optional[int]
NameType = Optional[str]
IndexRowType = Tuple[
NameType,
CounterType,
CounterType,
NameType,
CounterType,
CounterType,
CounterType,
CounterType,
CounterType,
CounterType,
]
def day(n: int) -> int:
return START_DAY_IN_MS + n * DAY_IN_MS
class MockIndexGenerator:
INDEX_NAME = "mock_dataset_stats_aspect_v1"
INDEX_FIELD_NAMES = [
"urn",
"rowCount",
"columnCount",
"columnStats.key",
"columnStats.numNull",
"eventTimestampMillis",
"eventGranularity",
"partitionSpec.parition",
"partitionSpec.timeWindow.startTimeMillis",
"partitionSpec.timeWindow.granulatiry",
]
INDEX_FIELD_TYPES = [
"keyword",
"long",
"long",
"keyword",
"long",
"date",
"long",
"keyword",
"date",
"long",
]
def __init__(self, start_days_in_ms, num_recs, num_cols):
self._start_days_in_ms = start_days_in_ms
self._num_recs = num_recs
self._num_cols = num_cols
self._stat_num_rows_start = 10000
self._stat_num_cols_start = 50
self._stat_num_nulls = 100
def _get_num_rows(self, i: int):
return self._stat_num_rows_start + (100 * i)
def _get_num_cols(self, i: int):
return self._stat_num_cols_start + i
def _get_num_nulls(self, i: int, c: int):
return self._stat_num_nulls + c + (10 * i)
def _get_event_time_ms(self, i: int):
return self._start_days_in_ms + (i * HOUR_IN_MS)
@staticmethod
def _get_index_row_json(row: IndexRowType) -> str:
return ",".join(
[
f'"{field}" : "{value}"'
for field, value in zip(MockIndexGenerator.INDEX_FIELD_NAMES, row)
if value is not None
]
)
def get_records(self) -> Generator[IndexRowType, None, None]:
for i in range(self._num_recs):
# emit one table record
yield self._get_index_row_json((
"table_1",
self._get_num_rows(i),
self._get_num_cols(i),
None,
None,
self._get_event_time_ms(i),
HOUR_IN_MS,
None,
None,
None)
)
# emit one record per column
for c in range(self._num_cols):
yield self._get_index_row_json((
f"table_1",
None,
None,
f"col_{c}",
self._get_num_nulls(i, c),
self._get_event_time_ms(i),
HOUR_IN_MS,
None,
None,
None)
)
@staticmethod
def get_props_json() -> str:
return ",".join(
[
f'"{field}" : {{ "type" : "{type}" }}'
for field, type in zip(
MockIndexGenerator.INDEX_FIELD_NAMES,
MockIndexGenerator.INDEX_FIELD_TYPES,
)
]
)
def gen_index_schema() -> None:
properties_json = MockIndexGenerator.get_props_json()
index_schema_gen_cmd = (
f"curl -v -XPUT http://localhost:9200/{MockIndexGenerator.INDEX_NAME} -H 'Content-Type: application/json' -d '"
+ """
{
"settings":{},
"mappings":{
"properties":{ """
+ f"{properties_json}"
+ """
}
}
}'"""
)
print(index_schema_gen_cmd)
os.system(index_schema_gen_cmd)
def populate_index_data() -> None:
for id, row in enumerate(
MockIndexGenerator(START_DAY_IN_MS, 100, 20).get_records()
):
# id = hashlib.md5(row.encode("utf-8")).hexdigest()
index_row_gen_command = (
f"curl -v -XPUT http://localhost:9200/{MockIndexGenerator.INDEX_NAME}/_doc/{id} "
+ "-H 'Content-Type: application/json' -d '{ "
+ f"{row}"
+ " }'"
)
print(index_row_gen_command)
os.system(index_row_gen_command)
def generate() -> None:
#gen_index_schema()
populate_index_data()
if __name__ == "__main__":
generate()