From 45511644e75e70ce4f8efa0fce2aaa4aaf8b0e4e Mon Sep 17 00:00:00 2001 From: Aseem Bansal Date: Thu, 8 Feb 2024 14:05:26 +0530 Subject: [PATCH] feat(ingest/slack): source to get user info from slack (#9776) --- metadata-ingestion/setup.py | 8 + .../ingestion/source/slack/__init__.py | 0 .../datahub/ingestion/source/slack/slack.py | 181 ++++++++++++++++++ 3 files changed, 189 insertions(+) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/slack/__init__.py create mode 100644 metadata-ingestion/src/datahub/ingestion/source/slack/slack.py diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index bbbab73fd1..74dcde5e06 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -245,6 +245,10 @@ delta_lake = { powerbi_report_server = {"requests", "requests_ntlm"} +slack = { + "slack-sdk==3.18.1" +} + databricks = { # 0.1.11 appears to have authentication issues with azure databricks "databricks-sdk>=0.9.0", @@ -367,6 +371,7 @@ plugins: Dict[str, Set[str]] = { "snowflake": snowflake_common | usage_common | sqlglot_lib, "sqlalchemy": sql_common, "sql-queries": usage_common | sqlglot_lib, + "slack": slack, "superset": { "requests", "sqlalchemy", @@ -503,6 +508,7 @@ base_dev_requirements = { "redshift", "s3", "snowflake", + "slack", "tableau", "teradata", "trino", @@ -543,6 +549,7 @@ full_test_dev_requirements = { "kafka-connect", "ldap", "mongodb", + "slack", "mssql", "mysql", "mariadb", @@ -597,6 +604,7 @@ entry_points = { "postgres = datahub.ingestion.source.sql.postgres:PostgresSource", "redash = datahub.ingestion.source.redash:RedashSource", "redshift = datahub.ingestion.source.redshift.redshift:RedshiftSource", + "slack = datahub.ingestion.source.slack.slack:SlackSource", "snowflake = datahub.ingestion.source.snowflake.snowflake_v2:SnowflakeV2Source", "superset = datahub.ingestion.source.superset:SupersetSource", "tableau = datahub.ingestion.source.tableau:TableauSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/slack/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/slack/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py b/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py new file mode 100644 index 0000000000..ed425cc25d --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py @@ -0,0 +1,181 @@ +import logging +import textwrap +from dataclasses import dataclass +from typing import Iterable, Optional + +from pydantic import Field, SecretStr +from slack_sdk import WebClient + +from datahub.configuration.common import ConfigModel +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SupportStatus, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import ( + SourceReport, + TestableSource, + TestConnectionReport, +) +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.schema_classes import CorpUserEditableInfoClass +from datahub.utilities.urns.urn import Urn + +logger: logging.Logger = logging.getLogger(__name__) + + +@dataclass +class CorpUser: + urn: Optional[str] = None + email: Optional[str] = None + slack_id: Optional[str] = None + title: Optional[str] = None + image_url: Optional[str] = None + phone: Optional[str] = None + + +class SlackSourceConfig(ConfigModel): + bot_token: SecretStr = Field( + description="Bot token for the Slack workspace. Needs `users:read`, `users:read.email` and `users.profile:read` scopes.", + ) + + +@platform_name("Slack") +@config_class(SlackSourceConfig) +@support_status(SupportStatus.TESTING) +class SlackSource(TestableSource): + def __init__(self, ctx: PipelineContext, config: SlackSourceConfig): + self.ctx = ctx + self.config = config + self.report = SourceReport() + + @classmethod + def create(cls, config_dict, ctx): + config = SlackSourceConfig.parse_obj(config_dict) + return cls(ctx, config) + + @staticmethod + def test_connection(config_dict: dict) -> TestConnectionReport: + raise NotImplementedError("This class does not implement this method") + + def get_slack_client(self) -> WebClient: + return WebClient(token=self.config.bot_token.get_secret_value()) + + def get_workunits_internal( + self, + ) -> Iterable[MetadataWorkUnit]: + assert self.ctx.graph is not None + auth_resp = self.get_slack_client().auth_test() + logger.info("Successfully connected to Slack") + logger.info(auth_resp.data) + for user_obj in self.get_user_to_be_updated(): + self.populate_slack_id_from_email(user_obj) + if user_obj.slack_id is None: + continue + self.populate_user_profile(user_obj) + if user_obj.urn is None: + continue + logger.info(f"User: {user_obj}") + corpuser_editable_info = ( + self.ctx.graph.get_aspect( + entity_urn=user_obj.urn, aspect_type=CorpUserEditableInfoClass + ) + or CorpUserEditableInfoClass() + ) + corpuser_editable_info.email = user_obj.email + corpuser_editable_info.slack = user_obj.slack_id + corpuser_editable_info.title = user_obj.title + if user_obj.image_url: + corpuser_editable_info.pictureLink = user_obj.image_url + if user_obj.phone: + corpuser_editable_info.phone = user_obj.phone + yield MetadataWorkUnit( + id=f"{user_obj.urn}", + mcp=MetadataChangeProposalWrapper( + entityUrn=user_obj.urn, + aspect=corpuser_editable_info, + ), + ) + + def populate_user_profile(self, user_obj: CorpUser) -> None: + try: + # https://api.slack.com/methods/users.profile.get + user_profile_res = self.get_slack_client().users_profile_get( + user=user_obj.slack_id + ) + user_profile = user_profile_res.get("profile", {}) + user_obj.title = user_profile.get("title") + user_obj.image_url = user_profile.get("image_192") + user_obj.phone = user_profile.get("phone") + except Exception as e: + if "missing_scope" in str(e): + raise e + return + + def populate_slack_id_from_email(self, user_obj: CorpUser) -> None: + if user_obj.email is None: + return + try: + # https://api.slack.com/methods/users.lookupByEmail + user_info_res = self.get_slack_client().users_lookupByEmail( + email=user_obj.email + ) + user_info = user_info_res.get("user", {}) + user_obj.slack_id = user_info.get("id") + except Exception as e: + if "users_not_found" in str(e): + return + raise e + + def get_user_to_be_updated(self) -> Iterable[CorpUser]: + graphql_query = textwrap.dedent( + """ + query listUsers($input: ListUsersInput!) { + listUsers(input: $input) { + total + users { + urn + editableProperties { + email + slack + } + } + } + } + """ + ) + start = 0 + count = 10 + total = count + + assert self.ctx.graph is not None + + while start < total: + variables = {"input": {"start": start, "count": count}} + response = self.ctx.graph.execute_graphql( + query=graphql_query, variables=variables + ) + list_users = response.get("listUsers", {}) + total = list_users.get("total", 0) + users = list_users.get("users", []) + for user in users: + user_obj = CorpUser() + editable_properties = user.get("editableProperties", {}) + user_obj.urn = user.get("urn") + if user_obj.urn is None: + continue + if editable_properties is not None: + user_obj.email = editable_properties.get("email") + if user_obj.email is None: + urn_id = Urn.from_string(user_obj.urn).get_entity_id_as_string() + if "@" in urn_id: + user_obj.email = urn_id + if user_obj.email is not None: + yield user_obj + start += count + + def get_report(self) -> SourceReport: + return self.report