python实现自动发送报警监控邮件

  • Post category:Python

当我们需要监控特定数据和系统状态时,我们通常需要设置报警机制。报警机制可用于在系统或应用程序出现错误或在特定条件触发时发送警报通知。在Python中,我们可以使用smtplib模块和电子邮件功能库email来实现自动发送报警邮件。

以下是Python实现自动发送报警监控邮件的步骤:

1.设置SMTP服务器

要使用smtplib库发送电子邮件,您需要首先选择要使用的SMTP服务器。许多电子邮件服务提供商都提供了SMTP服务器免费服务,例如,Gmail的SMTP服务器地址为smtp.gmail.com。此外,您还需要使用端口号587进行身份验证。

例如:

import smtplib
import ssl

smtp_server = "smtp.gmail.com"
port = 587  # For starttls
sender_email = "xxxxx@gmail.com"
receiver_email = "yyyyy@gmail.com"
password = input("Type your password and press enter: ")

# Create a secure SSL context
context = ssl.create_default_context()

# Try to log in to server and send email
server = smtplib.SMTP(smtp_server,port)
server.starttls(context=context) # Secure the connection
server.login(sender_email, password)

2. 构建邮件内容

您需要使用email模块创建并构建电子邮件对象。您可以在电子邮件中设置主题,正文和附件等。以下代码提供了一个简单示例:

from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText

message = MIMEMultipart()
message["From"] = sender_email
message["To"] = receiver_email
message["Subject"] = "Website Down Alert"

# Add body to email
body = "This is an alert email. Your website is down."
message.attach(MIMEText(body, "plain"))

3. 发送电子邮件

构建邮件后,您可以使用sendmail方法和smtplib库发送邮件。以下代码演示如何使用sendmail方法发送邮件:

# Send email
server.sendmail(sender_email, receiver_email, message.as_string())

示例

以下是一个基本示例,演示如何使用上述步骤监视网站并发送警报电子邮件。使用python的requests库,我们向我们的服务器发送一个GET请求,并检查响应代码是否为200。如果响应代码为200,则网站连接正常,否则触发警报电子邮件。完整代码如下:

import requests
import smtplib
import ssl
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

smtp_server = "smtp.gmail.com"
port = 587  # For starttls
sender_email = "xxxxx@gmail.com"
receiver_email = "yyyyy@gmail.com"
password = input("Type your password and press enter: ")

# Create a secure SSL context
context = ssl.create_default_context()

def send_alert():
    message = MIMEMultipart()
    message["From"] = sender_email
    message["To"] = receiver_email
    message["Subject"] = "Website Down Alert"

    # Add message body to email
    body = "This is an alert email. Your website is down."
    message.attach(MIMEText(body, "plain"))

    # Send email
    with smtplib.SMTP(smtp_server, port) as server:
        server.starttls(context=context)
        server.login(sender_email, password)
        server.sendmail(sender_email, receiver_email, message.as_string())
    print("Alert sent successfully.")

while True:
    r = requests.get("https://example.com")
    if r.status_code != 200:
        send_alert()
    else:
        print("Website is up.")

在上面的示例中,我们使用requests模块向网站发送GET请求。如果响应的状态码不是200,则发送报警邮件。如果一切正常,则打印“网站正常”。

示例2

下一个示例是一个Python程序,用于定期查询Google Drive中的文件夹并检查文件夹中是否有新文件。如果有新文件,则发送电子邮件指示用户下载新文件。代码如下:

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from httplib2 import Http
from oauth2client import file, client, tools
import datetime
import time
import pandas as pd
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import smtplib
import os

# Constants for Google Drive
DRIVE_SCOPE = 'https://www.googleapis.com/auth/drive'
SPREADSHEET_MIMETYPE = 'application/vnd.google-apps.spreadsheet'
FOLDER_MIMETYPE = 'application/vnd.google-apps.folder'
LOOKBACK_TIME = datetime.datetime.now() - datetime.timedelta(hours=24)

# Constants for Email
FROM_EMAIL = 'example_email@gmail.com'
SMTP_SERVER = 'smtp.gmail.com'
SMTP_PORT = 587
TO_EMAIL = 'example_email_destination@gmail.com'
EMAIL_SUBJECT = 'New files in Google Drive'

# Definition for sendEmail
def sendEmail(to,subject,bodySpecific):
    """ Sends email to recipient with desired subject and body """
    message = MIMEMultipart('alternative')
    message['Subject'] = subject
    message['From'] = FROM_EMAIL
    message['To'] = to
    parts = MIMEText(bodySpecific, 'html')
    message.attach(parts)
    smtp = smtplib.SMTP(SMTP_SERVER, SMTP_PORT)
    smtp.starttls()
    smtp.login(FROM_EMAIL, os.environ['GMAIL_PASS'])
    smtp.sendmail(FROM_EMAIL, to, message.as_string())
    smtp.quit()

# Adding necessary scopes for Google OAuth
SCOPES = 'https://www.googleapis.com/auth/drive'

# Indepedent function to use dateutil
def parseDate(createdDate):
    return parser.parse(createdDate)

def main():

  # Creating Service object
  store = file.Storage('token.json')
  creds = store.get()
  if not creds or creds.invalid:
      flow = client.flow_from_clientsecrets('credentials.json', SCOPES)
      creds = tools.run_flow(flow, store)
  service = build('drive', 'v3', http=creds.authorize(Http()))

  # Look for folders in Google Drive shared with me
  shared_folder_query = "mimeType='{}' and trashed = false and sharedWithMe".format(FOLDER_MIMETYPE)

  try:
      # Searching for Folders that match Query
      response = service.files().list(q=shared_folder_query,
                                      orderBy='modifiedTime desc',
                                      pageSize=5,
                                      fields='nextPageToken, files(id, name, createdTime)').execute()

      # If at least one folder is found
      if response['files']:
          files_data = []

          # For every folder in response, list the files present in that folder
          for folder in response.get('files', []):
              folder_id = folder.get('id')
              name = folder.get('name', [])
              file_query = "parents = '{}' and trashed = false and createdTime > '{}'".format(folder_id, LOOKBACK_TIME.isoformat())

              # Searching for Files that match Query
              response = service.files().list(q=file_query,
                                              spaces='drive',
                                              orderBy='createdTime desc',
                                              pageSize=100,
                                              fields='nextPageToken, files(id, name, createdTime)').execute()

              # If at least one file is found, append it to a list called "files_data"
              if response['files']:
                  for file in response.get('files', []):
                      files_data.append(('{}'.format(name),
                                          '{}'.format(file.get('name', [])),
                                          '{}'.format(parseDate(file.get('createdTime', [])))))
      # If no folders are found
      else:
          print("No folders were found.")
          return
  except HttpError as error:
      print('An error occurred: %s' % error)
      return


  # Convert data to Pandas DataFrame, sort DataFrames by createdDate, and concatenate those DataFrames
  data_all = pd.DataFrame(data=files_data, columns=['Folder Name', 'File Name', 'Created Date'])
  data_all.sort_values(by='Created Date', inplace=True, ascending=False)
  bodytext = data_all.to_html(index=False)

  # Send email to recipients
  sendEmail(TO_EMAIL, EMAIL_SUBJECT, bodytext)

if __name__ == '__main__':
    main()

这个示例使用googleapiclient库检索GoogleDrive上的文件和文件夹,pandas库用于排序文件和文件夹,smtplib库用于电子邮件通知。一个标准的Python脚本用于检查文件系统和扫描新文件夹的最好的部分是,您可以根据需要更改检查时间。