下面是Python监控文件并发送告警邮件的完整攻略。
主要思路
本文的主要思路是通过Python的第三方库watchdog
监控指定文件夹下的文件变动,并根据设定的条件发送邮件。
具体来说,我们需要以下几步:
- 安装
watchdog
库。 - 编写监控程序并监听指定文件夹。
- 根据文件变动的类型进行不同的操作。
- 如果满足设定的条件则发送邮件。
下面我们详细步骤地介绍这些操作。
需要的库
首先,我们需要安装watchdog
库。在命令行下运行以下命令即可:
pip install watchdog
编写监控程序
接下来,我们需要编写一个监控程序来监听指定文件夹。以下是一个简单的示例:
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MyHandler(FileSystemEventHandler):
def on_any_event(self, event):
# 根据文件变动类型进行相应的操作
# ...
if __name__ == "__main__":
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, "/path/to/folder", recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
上面的代码中,我们定义了一个MyHandler
类继承自watchdog.events.FileSystemEventHandler
,然后在其中实现了on_any_event
方法。这个方法会在任何类型的文件变动时都被调用,我们可以在这个方法里面根据具体的变动类型来进行一些处理。
然后我们在__main__
函数中创建一个MyHandler
对象,并指定需要监控的文件夹的路径,然后启动Observer
对象来监听文件夹。
处理文件变动
在MyHandler
类中的on_any_event
方法里面,我们可以根据具体的文件变动类型来进行操作,以下是一些示例:
新建文件
def on_created(self, event):
if not event.is_directory:
print(f"Created file: {event.src_path}")
上面的代码中,我们使用on_created
方法来处理文件的创建事件,如果不是文件夹的话我们就打印出文件路径。
修改文件
def on_modified(self, event):
if not event.is_directory:
print(f"Modified file: {event.src_path}")
上面的代码中,我们使用on_modified
方法来处理文件的修改事件,如果不是文件夹的话我们就打印出文件路径。
删除文件
def on_deleted(self, event):
if not event.is_directory:
print(f"Deleted file: {event.src_path}")
上面的代码中,我们使用on_deleted
方法来处理文件的删除事件,如果不是文件夹的话我们就打印出文件路径。
发送告警邮件
最后,我们需要根据设定的条件来判断是否需要发送邮件。以下是一个示例代码:
import smtplib
from email.mime.text import MIMEText
# 邮箱服务器配置
mail_host = "smtp.example.com"
mail_user = "user@example.com"
mail_pass = "password"
sender = "user@example.com"
receiver = "someone@example.com"
def send_mail(subject, content):
# 邮件内容
message = MIMEText(content, "plain", "utf-8")
message["Subject"] = subject
message["From"] = sender
message["To"] = receiver
# 发送邮件
try:
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host, 25)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receiver, message.as_string())
smtpObj.quit()
print("邮件发送成功")
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
class MyHandler(FileSystemEventHandler):
def on_any_event(self, event):
if event.is_directory:
return
if event.event_type == "created":
send_mail("文件创建告警", f"文件创建:{event.src_path}")
elif event.event_type == "modified":
send_mail("文件修改告警", f"文件修改:{event.src_path}")
elif event.event_type == "deleted":
send_mail("文件删除告警", f"文件删除:{event.src_path}")
上面的代码中,我们定义了一个send_mail
函数来发送邮件,需要提前配置好邮箱服务器地址和账号密码。然后我们在MyHandler
类中的on_any_event
方法中根据具体的事件类型来发送不同的邮件。
示例说明
以下是两个示例说明,分别是:
- 监控一个文件夹,当有文件创建时发送告警邮件。
- 监控一个文件夹,当有文件修改,并且修改时间超过1小时时发送告警邮件。
示例1
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import smtplib
from email.mime.text import MIMEText
# 邮箱服务器配置
mail_host = "smtp.example.com"
mail_user = "user@example.com"
mail_pass = "password"
sender = "user@example.com"
receiver = "someone@example.com"
def send_mail(subject, content):
# 邮件内容
message = MIMEText(content, "plain", "utf-8")
message["Subject"] = subject
message["From"] = sender
message["To"] = receiver
# 发送邮件
try:
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host, 25)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receiver, message.as_string())
smtpObj.quit()
print("邮件发送成功")
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
class MyHandler(FileSystemEventHandler):
def on_created(self, event):
if not event.is_directory:
send_mail("文件创建告警", f"文件创建:{event.src_path}")
if __name__ == "__main__":
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, "/path/to/folder", recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
上面的代码中,我们只需要修改MyHandler
类中的on_created
方法来发送告警邮件即可。
示例2
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import smtplib
from email.mime.text import MIMEText
import os.path
import datetime
# 邮箱服务器配置
mail_host = "smtp.example.com"
mail_user = "user@example.com"
mail_pass = "password"
sender = "user@example.com"
receiver = "someone@example.com"
def send_mail(subject, content):
# 邮件内容
message = MIMEText(content, "plain", "utf-8")
message["Subject"] = subject
message["From"] = sender
message["To"] = receiver
# 发送邮件
try:
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host, 25)
smtpObj.login(mail_user, mail_pass)
smtpObj.sendmail(sender, receiver, message.as_string())
smtpObj.quit()
print("邮件发送成功")
except smtplib.SMTPException as e:
print("Error: 无法发送邮件", e)
class MyHandler(FileSystemEventHandler):
def on_modified(self, event):
if not event.is_directory:
now = datetime.datetime.now()
mtime = os.path.getmtime(event.src_path)
diff = now - datetime.datetime.fromtimestamp(mtime)
if diff.seconds > 3600:
send_mail("文件修改告警", f"文件修改:{event.src_path}")
if __name__ == "__main__":
event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, "/path/to/folder", recursive=True)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
上面的代码中,我们在MyHandler
类中使用on_modified
方法来判断文件修改时间是否超过了1小时,如果超过则发送告警邮件。
综上所述,以上是一个Python监控文件并发送告警邮件的完整攻略,希望能够对您有所帮助!