python监控文件并且发送告警邮件

  • Post category:Python

下面是Python监控文件并发送告警邮件的完整攻略。

主要思路

本文的主要思路是通过Python的第三方库watchdog监控指定文件夹下的文件变动,并根据设定的条件发送邮件。

具体来说,我们需要以下几步:

  1. 安装watchdog库。
  2. 编写监控程序并监听指定文件夹。
  3. 根据文件变动的类型进行不同的操作。
  4. 如果满足设定的条件则发送邮件。

下面我们详细步骤地介绍这些操作。

需要的库

首先,我们需要安装watchdog库。在命令行下运行以下命令即可:

pip install watchdog

编写监控程序

接下来,我们需要编写一个监控程序来监听指定文件夹。以下是一个简单的示例:

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler

class MyHandler(FileSystemEventHandler):
    def on_any_event(self, event):
        # 根据文件变动类型进行相应的操作
        # ...

if __name__ == "__main__":
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, "/path/to/folder", recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

上面的代码中,我们定义了一个MyHandler类继承自watchdog.events.FileSystemEventHandler,然后在其中实现了on_any_event方法。这个方法会在任何类型的文件变动时都被调用,我们可以在这个方法里面根据具体的变动类型来进行一些处理。

然后我们在__main__函数中创建一个MyHandler对象,并指定需要监控的文件夹的路径,然后启动Observer对象来监听文件夹。

处理文件变动

MyHandler类中的on_any_event方法里面,我们可以根据具体的文件变动类型来进行操作,以下是一些示例:

新建文件

def on_created(self, event):
    if not event.is_directory:
        print(f"Created file: {event.src_path}")

上面的代码中,我们使用on_created方法来处理文件的创建事件,如果不是文件夹的话我们就打印出文件路径。

修改文件

def on_modified(self, event):
    if not event.is_directory:
        print(f"Modified file: {event.src_path}")

上面的代码中,我们使用on_modified方法来处理文件的修改事件,如果不是文件夹的话我们就打印出文件路径。

删除文件

def on_deleted(self, event):
    if not event.is_directory:
        print(f"Deleted file: {event.src_path}")

上面的代码中,我们使用on_deleted方法来处理文件的删除事件,如果不是文件夹的话我们就打印出文件路径。

发送告警邮件

最后,我们需要根据设定的条件来判断是否需要发送邮件。以下是一个示例代码:

import smtplib
from email.mime.text import MIMEText

# 邮箱服务器配置
mail_host = "smtp.example.com"
mail_user = "user@example.com"
mail_pass = "password"
sender = "user@example.com"
receiver = "someone@example.com"

def send_mail(subject, content):
    # 邮件内容
    message = MIMEText(content, "plain", "utf-8")
    message["Subject"] = subject
    message["From"] = sender
    message["To"] = receiver

    # 发送邮件
    try:
        smtpObj = smtplib.SMTP()
        smtpObj.connect(mail_host, 25)
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receiver, message.as_string())
        smtpObj.quit()
        print("邮件发送成功")
    except smtplib.SMTPException as e:
        print("Error: 无法发送邮件", e)

class MyHandler(FileSystemEventHandler):
    def on_any_event(self, event):
        if event.is_directory:
            return

        if event.event_type == "created":
            send_mail("文件创建告警", f"文件创建:{event.src_path}")
        elif event.event_type == "modified":
            send_mail("文件修改告警", f"文件修改:{event.src_path}")
        elif event.event_type == "deleted":
            send_mail("文件删除告警", f"文件删除:{event.src_path}")

上面的代码中,我们定义了一个send_mail函数来发送邮件,需要提前配置好邮箱服务器地址和账号密码。然后我们在MyHandler类中的on_any_event方法中根据具体的事件类型来发送不同的邮件。

示例说明

以下是两个示例说明,分别是:

  1. 监控一个文件夹,当有文件创建时发送告警邮件。
  2. 监控一个文件夹,当有文件修改,并且修改时间超过1小时时发送告警邮件。

示例1

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import smtplib
from email.mime.text import MIMEText

# 邮箱服务器配置
mail_host = "smtp.example.com"
mail_user = "user@example.com"
mail_pass = "password"
sender = "user@example.com"
receiver = "someone@example.com"

def send_mail(subject, content):
    # 邮件内容
    message = MIMEText(content, "plain", "utf-8")
    message["Subject"] = subject
    message["From"] = sender
    message["To"] = receiver

    # 发送邮件
    try:
        smtpObj = smtplib.SMTP()
        smtpObj.connect(mail_host, 25)
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receiver, message.as_string())
        smtpObj.quit()
        print("邮件发送成功")
    except smtplib.SMTPException as e:
        print("Error: 无法发送邮件", e)

class MyHandler(FileSystemEventHandler):
    def on_created(self, event):
        if not event.is_directory:
            send_mail("文件创建告警", f"文件创建:{event.src_path}")

if __name__ == "__main__":
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, "/path/to/folder", recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

上面的代码中,我们只需要修改MyHandler类中的on_created方法来发送告警邮件即可。

示例2

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import smtplib
from email.mime.text import MIMEText
import os.path
import datetime

# 邮箱服务器配置
mail_host = "smtp.example.com"
mail_user = "user@example.com"
mail_pass = "password"
sender = "user@example.com"
receiver = "someone@example.com"

def send_mail(subject, content):
    # 邮件内容
    message = MIMEText(content, "plain", "utf-8")
    message["Subject"] = subject
    message["From"] = sender
    message["To"] = receiver

    # 发送邮件
    try:
        smtpObj = smtplib.SMTP()
        smtpObj.connect(mail_host, 25)
        smtpObj.login(mail_user, mail_pass)
        smtpObj.sendmail(sender, receiver, message.as_string())
        smtpObj.quit()
        print("邮件发送成功")
    except smtplib.SMTPException as e:
        print("Error: 无法发送邮件", e)

class MyHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if not event.is_directory:
            now = datetime.datetime.now()
            mtime = os.path.getmtime(event.src_path)
            diff = now - datetime.datetime.fromtimestamp(mtime)
            if diff.seconds > 3600:
                send_mail("文件修改告警", f"文件修改:{event.src_path}")

if __name__ == "__main__":
    event_handler = MyHandler()
    observer = Observer()
    observer.schedule(event_handler, "/path/to/folder", recursive=True)
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

上面的代码中,我们在MyHandler类中使用on_modified方法来判断文件修改时间是否超过了1小时,如果超过则发送告警邮件。

综上所述,以上是一个Python监控文件并发送告警邮件的完整攻略,希望能够对您有所帮助!