# coding: utf8import jsonimport base64import hashlibimport requestsfrom typing import List, Optional, Unionfrom requests import Sessionfrom airflow.exceptions import AirflowExceptionfrom airflow.configuration import conffrom airflow.providers.http.hooks.http import HttpHookclass WorkWeChatHook(HttpHook):"""This hook allows you send Dingding message using Dingding custom bot.Get Dingding token from conn_id.password. And prefer set domain toconn_id.host, if not will use default ``https://oapi.dingtalk.com``.For more detail message in`Dingding custom bot <https://open-doc.dingtalk.com/microapp/serverapi2/qf2nxq>`_:param dingding_conn_id: The name of the Dingding connection to use:type dingding_conn_id: str:param message_type: Message type you want to send to Dingding, support five type so farincluding text, link, markdown, actionCard, feedCard:type message_type: str:param message: The message send to Dingding chat group:type message: str or dict:param at_mobiles: Remind specific users with this message:type at_mobiles: list[str]:param at_all: Remind all people in group or not. If True, will overwrite ``at_mobiles``:type at_all: bool>>> from custom_hooks.workwechat import WorkWeChatHook>>>"""conn_name_attr = 'workwechat_conn_id'default_conn_name = 'workwechat_default'conn_type = 'workwechat'hook_name = 'WorkWeChat'def __init__(self,workwechat_conn_id='workwechat_default',message_type: str = 'text',messages: Optional[List[str]] = None,images: Optional[List[str]] = None,files: Optional[List[str]] = None,artitles: Optional[List[str]] = None,at_mobiles: Optional[List[str]] = None,at_all: bool = False,*args,**kwargs,) -> None:"""InitializationArgs:dingding_conn_id (str, optional): [description]. Defaults to 'workwechat_default'.message_type (str, optional): [description]. Defaults to 'text'.message (Optional[Union[str, dict]], optional): [description]. Defaults to None.at_mobiles (Optional[List[str]], optional): [description]. Defaults to None.at_all (bool, optional): [description]. Defaults to False."""super().__init__(http_conn_id=workwechat_conn_id, *args, **kwargs)self.message_type = message_typeself.messages = messagesself.images = imagesself.files = filesself.artitles = artitlesself.at_mobiles = at_mobilesself.at_all = at_allworkwechat_key = conf.get('robots', 'workwechat_key')self.web_hook = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={workwechat_key}"self.upload_hook = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/upload_media?key={workwechat_key}&type=file"self.headers = {"Content-Type": "application/json;text/plain;charset=UTF-8","Accept": "application/json;charset=UTF-8","User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ""Chrome/79.0.3945.79 Safari/537.36 "}def _build_message(self):preloads = []if self.message_type in ["text", "markdown"]:for msg in self.messages:data = {"msgtype": self.message_type,self.message_type: {"content": msg}}if self.message_type == "text":data["text"]["mentioned_mobile_list"] = self.at_mobiles if not self.at_all else ["@all"]preloads.append(data)elif self.message_type == "image":for image in self.images:with open(image, "rb") as img:base64_ = str(base64.b64encode(img.read()), encoding='utf-8')with open(image, "rb") as img:md = hashlib.md5()md.update(img.read())md5_ = md.hexdigest()data = {"msgtype": "image","image": {"base64": base64_,"md5": md5_,}}preloads.append(data)elif self.message_type == "news":data = {"msgtype": "news","news": {"articles": self.articles}}preloads.append(data)elif self.message_type == "file":for file in self.files:with open(file, "rb") as fp:data = {"file": fp}res = requests.post(self.upload_hook, files=data)media_id = res.json()["media_id"]data = {"msgtype": "file","file": {"media_id": media_id}}preloads.append(data)else:raise AirflowException("`message_type` must in ['text', 'markdown', 'image', 'news', 'file']")return preloadsdef send(self):preloads = self._build_message()for preload in preloads:res = requests.post(self.web_hook, json=preload, verify=False)if json.loads(res.text)["errmsg"] == "ok":self.log.info("successful")else:self.log.info.info("failed")
版权声明
本站仅做备份收录,仅供研究与教学参考之用。
读者将信息用于其他用途的,全部法律及连带责任由读者自行承担,本站不承担任何责任。









评论