网站访问量统计工具,做电销用什么软件打电话,在excel表里做网站模板,成熟的国产crm系统场景
业务上有许多发送邮件的场景#xff0c;发送的邮件基本上都是自动发送的#xff0c;而且邮件内容是很重要的#xff0c;对于邮件发没发送#xff0c;发送的时间点对不对每次回归测试工作量太大了#xff0c;所以考虑把这部分内容加入到自动化测试中
工具 python g…场景
业务上有许多发送邮件的场景发送的邮件基本上都是自动发送的而且邮件内容是很重要的对于邮件发没发送发送的时间点对不对每次回归测试工作量太大了所以考虑把这部分内容加入到自动化测试中
工具 python gmail 要访问gmail先要去console.cloud.google.com创建访问凭证如图所示进行下载后获取一个json文件credentials.json
实现代码
其中DateFormat 和get_path是自定义方法分别是获取时间和获取文件
from __future__ import print_function
import base64
import os.path
import logging
import timefrom utils.get_file_path import get_path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from utils.date_format import DateFormat
from datetime import datetime
# If modifying these scopes, delete the file token.json.
SCOPES [https://www.googleapis.com/auth/gmail.readonly, https://www.googleapis.com/auth/gmail.modify]class GmailSearch:def __init__(self):get token.json.self.creds None# The file token.json stores the users access and refresh tokens, and is# created automatically when the authorization flow completes for the first# time.file_token get_path(token.json, utils)if os.path.exists(file_token):self.creds Credentials.from_authorized_user_file(file_token, SCOPES)# If there are no (valid) credentials available, let the user log in.if not self.creds or not self.creds.valid:if self.creds and self.creds.expired and self.creds.refresh_token:self.creds.refresh(Request())else:flow InstalledAppFlow.from_client_secrets_file(get_path(credentials.json, utils), SCOPES)self.creds flow.run_local_server(port0)# Save the credentials for the next runwith open(file_token, w) as token:token.write(self.creds.to_json())def search_email(self, **kwargs):search email by keywords ( from, sender , subject, before, after, unread, interval):param kwargs: 默认查询当前时间前后10分钟未读的邮件, 如果第一次未查询到默认会间隔10s钟查询一次共查询三次 如果找到的话会将找到的标记成已读查询主题参数 subject发送人参数 sender是否已读 : unread (bool类型查询时间段参数 before, after设置查询间隔 interval设置查询次数 count:return: the email, if no result is found, return nonecount kwargs.get(count) if kwargs.get(count) is not None else 3if count 0:return no email foundsearch_str [in:inbox]unread False if kwargs.get(unread) else Trueif unread:search_str.append(is:unread)if kwargs.get(attachment):search_str.append(has:attachment)if kwargs.get(sender): # value like atestemail.com, btestemail.comsearch_str.append(ffrom:({kwargs.get(sender)}))if kwargs.get(to): # value like atestemail.com, btestemail.comsearch_str.append(fto:({kwargs.get(to)}))if kwargs.get(subject): search_str.append(fsubject:({kwargs.get(subject)}))if kwargs.get(before):search_str.append(fbefore:{kwargs.get(before)})else: # default value is current 10.minutessearch_str.append(fbefore:{DateFormat.from_current_minutes(10)})if kwargs.get(after):search_str.append(fafter:{kwargs.get(after)})else: # default value is current - 10.minutessearch_str.append(fafter:{DateFormat.from_current_minutes(-10)})interval kwargs.get(interval) if kwargs.get(interval) else 10query .join(search_str)unread kwargs.get(unread)time.sleep(interval)logging.info(datetime.now().strftime(%H:%M:%S))logging.info(str(count - 1))try:# Call the Gmail APIservice build(gmail, v1, credentialsself.creds)results service.users().messages().list(userIdme, qquery).execute()messages results.get(messages)if not messages:logging.info(No email found, continue to search)kwargs.update({count: count-1})return self.search_email_content(**kwargs)# get the last email, mark read, return email body# body []last_email service.users().messages().get(userIdme, idmessages[0][id]).execute()payload last_email[payload]# internalDate last_email[internalDate]# The Body of the message is in Encrypted format. So, we have to decode it.# Get the data and decode it with base 64 decoder.parts payload.get(parts)[0][body] if payload.get(parts) else payload.get(body)data parts[data]data data.replace(-, ).replace(_, /)decoded_data base64.b64decode(data)body str(decoded_data, utf-8)# mark readif unread:logging.info( mark read);service.users().messages().modify(userIdme, idmessages[0][id],body{removeLabelIds: [UNREAD]}).execute()return bodyexcept HttpError as error:logging.info(fAn error occurred: {error})# testGmailSearch()
# test.search_email(before2023/10/3, after2023/9/25, totesttest.com, unreadTrue)
参考文档Gmail api文档