网络设备配置批量备份

2023-10-13 10:05:09

背景需求

日常网络运维,为了审计和防止设备故障导致设备配置丢失而造成长时间网络故障,需要对网络设备的配置做策略备份。

开源软件

oxidized - https://github.com/ytti/oxidized

Bacula - https://www.bacula.org/source-download-center

开源软件场景:跨平台,支持多厂商系统,UI界面友好,功能强大,备份策略可自定义。

关于oxidized的搭建过程,可参考:

瞿同学(Darren):网络设备配置备份-Oxidized8 赞同 · 4 评论文章

自写脚本

  1. Python脚本
  2. Go脚本

自写脚本场景:自定义强,适应度高,可根据自身的需求不断优化迭代。

Python脚本备份

需求说明:

公司内部企业网交换机和防火墙以及其他配套设备需要做配置备份,需要满足以下几点需求:

  1. 支持华为、思科、华三、锐捷等主流厂商交换机系统
  2. 支持密码加密和解密
  3. 支持成功备份邮件通知
  4. 支持TFTP和FTP登录上传备份文件
  5. 支持可自定义备份策略

思路拓展:

1、首先是如何备份思考?

  • 无外乎就是执行查看全局配置的命令,然后抓取回显,保存成配置文件。
  • TFTP/FTP登录网元,拷贝配置文件,或者反过来网元作为客户端,FTP/TFTP搭建在Linux/Windows主机中。
  • 现在很多厂商新设备都是支持自动备份配置命令,这也是一种手段。

2、多厂商交换机系统支持,如果采用FTP就不需要考虑太多,因为基本上每家厂商都支持FTP。

3、每台设备的密码都不尽相同,需要针对密码做加密和解密操作,执行命令的时候不直接显示明文密码。

4、备份成功后,需通过阿里云邮箱发送邮件通知。

5、自定义备份策略,需要单独开启模块。

开源参考:

GitHub - BarryCui/NetDevOps: 网络设备配置备份。可以备份cisco ios交换机,华为quidway交换机以及fortigate防火墙的配置。

蜗牛勇士/网络设备自动备份。

Feiinbeer:python脚本自动备份网络设备配置文件

自我实现:

1、前期完成基础的需求,并正式运用在测试环境中进行测试,并不断改善,后续应用在生产环境中。

2、中期思考如何结合Web UI进行监控和操作。

3、后期可以考虑形成一个比较完善的网络设备备份系统平台,进行总体闭环,并开源到Github和Gitee。

程序实例

网络设备配置备份程序 摘要

  • 网络设备的自动化实现方式:SNMP/NetConf + NAPALM/RestConf/OpenConfig
  • 本程序是通过最原始的ssh协议来备份网络设备的配置。
  • 使用场景为:不支持API的网络设备或运维人员不会调用API的场景。
  • 备份说明:备份方式是通过ssh协议结合ftp协议进行,读取网络配置文件,通过paramiko模块连接网络设备发送put命令,把配置文件上传FTP服务器,完成网络设备配置文件的备份。
  • 目前支持备份:目前只支持华为、华三系列设备,后续将进行适配支持更多其他厂商设备。

程序结构说明

当前目录依赖包生成: pip install pipreqs pipreqs . --encoding=utf8 --force

目录结构

  • AutoBakCfg.yml 是存放FTP服务器配置信息(登录用户、登录密码、FTP服务器IP、网络设备明文密码加密开关)
  • requirements.txt 当前程序python依赖库
  • dev_info.xlsx 网络设备信息(登录用户、登录密码、设备管理IP、配置文件路径)
  • password_aes.py 对明文密码进行AES加密和解密处理
  • configuration_rw.py 对YAML配置文件的读写处理
  • write_excels.py 对xlsx文本数据的读写处理
  • ftp_server.py 模拟FTP客户端进行FTP服务器验证
  • utils.py 本程序依赖的一些工具函数
  • run_backup.py 程序主入口

环境说明

  • Python 3.11.0
  • Windows 10、11系统
  • Pycharm 2022.1.2专业版

Linux发行版暂未测试,后续发出Linux版本上的运行环境说明。

网络设备支持

  • HUAWEI
  • H3C

后续进行优化迭代,加入思科、锐捷和其他主流厂商的支持。

FTP服务器

  • Universal FTP Server(Microsoft Store可直接下载)

Universal FTP Server

程序运行结果说明

  • 程序会自动把网络设备配置文件备份到服务器的根目录下

代码示例

示例

AutoBakCfg.yml 测试文件:FTP服务器信息和加密开关参数

base-info: ftp_password: '123456' ftp_server: 10.10.10.2 ftp_user: python is_encrypt: '1'

requirements.txt 项目依赖库

pandas==1.5.3 paramiko==3.0.0 pycryptodome==3.17 PyYAML==6.0

dev_info.xlsx 网络设备信息

设备信息

password_aes.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Linux下和Windows下安装pycryptodome 把读取的xlsx文档的密码进行加密和解密处理 # pip install pycryptodome from Crypto.Cipher import AES from Crypto.Util.Padding import pad from Crypto.Util.Padding import unpad from binascii import b2a_hex, a2b_hex import re class CryptPassword: """ AES.new 初始化AES对象 """ def __init__(self, key, init_vector): """ :param key: 初始化AES加密密钥 AES.new传入参数必须是bytes类型 :param init_vector: 初始化iv偏移量 AES.new传入参数必须是bytes类型 """ self.key = key self.init_vector = init_vector self.mode = AES.MODE_CBC # self.cryptor = AES.new(self.key, self.mode, self.init_vector) # AES加密 待加密的字符串为str类型并返回str类型数据 def encrypt_str(self, pass_str) -> str: # encode('ascii') 字符串转字节串,把字符串:abc,转为字节串:b'abc' cryptor = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii')) # pass_str str类型 pad的data_to_pad必须为bytes类型 故pass_str需要转换为字节串 padtext = pad(pass_str.encode('ascii'), 16, style='pkcs7') # 调用加密方法 cipher_text = cryptor.encrypt(padtext) # 字符串 --> 十六进制 str_cipher = b2a_hex(cipher_text) return str_cipher.decode('ascii') # AES解密 待解密的字符串为str类型并返回str类型数据 def decode_str(self, text_str) -> str: if not CryptPassword.is_hex(text_str) or (len(text_str) < 32): return "错误的16进制加密字符串" else: # 十六进制 --> 字符串 cipher_text = a2b_hex(text_str) decrypter = AES.new(self.key.encode('ascii'), self.mode, self.init_vector.encode('ascii')) # 调用解密方法 plain_text = decrypter.decrypt(cipher_text) # 不够16位,补全到16位 try: unpadtext = unpad(plain_text, 16, 'pkcs7') # decode('ascii') 字节串转字符串,把字节串:b'abc',转为字符串:abc return unpadtext.decode('ascii') except ValueError: return "请输入正确的加密字符串" # 16进制检查 @staticmethod def is_hex(hex_str) -> bool: # 16进制的正则表达式 regex = "[A-Fa-f0-9]+#34; result = re.match(regex, hex_str) if result: return True else: return False # 测试用例 if __name__ == '__main__': key_word = 'QWErtyuio@098765' init_vector_word = 'MEIYuanTian--Xia' password = 'T5fx2L3aFt' data = CryptPassword(key_word, init_vector_word) print(data.encrypt_str(password)) print(CryptPassword.is_hex(data.encrypt_str(password))) text = 'c0cf24e87b328810c0b5711b2f326c8f' print(data.decode_str(text))

configuration_rw.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 针对yaml/yml等配置文件进行读取和写入的实现类 """ import yaml import os import pathlib class YamlReadWrite: # 初始化 def __init__(self, filepath): self.filepath = filepath def yaml_read(self, file_name): # filename = str(file_name) # print(filename) if os.path.isfile(file_name): with open(self.filepath + '\\' + file_name, 'r', encoding='utf-8') as fp: line_data = yaml.load(fp, Loader=yaml.FullLoader) return line_data else: print("配置文件不存在") def yaml_write(self, file_name, encrypt_str): # 读取yaml文件数据 if os.path.isfile(file_name): old_file_data = YamlReadWrite(self.filepath).yaml_read(file_name) # 修改读取数据 old_file_data['base-info']['is_encrypt'] = encrypt_str with open(self.filepath + '\\' + file_name, 'w', encoding='utf-8') as fp: yaml.dump(old_file_data, fp) return "密码已加密,配置文件加密参数修改为0" else: print("配置文件不存在") if __name__ == '__main__': path = os.path.dirname(__file__) read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml') print(read_data['base-info']['ftp_password']) write_data = YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0') print(write_data) print("修改完成")

write_excels.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 利用pandas模块读写excel文本数据,并对文本数据中的密码进行判断,如果不是16进制字符串,那就进行加密并重新写入,读取16进制字符串解密进行ftp登录 依赖模块: pandas,openpyxl """ import pandas as pd from password_aes import CryptPassword class MachiningText: def __init__(self, xlsx_file_name): self.xlsx_file_name = xlsx_file_name # 更新格式化后的密码并写入xlsx文件中 def write_xlsx_file(self, dev_name, dev_ip, dev_user, dev_pw, file_path): data_df = pd.DataFrame() data_df["设备名称"] = dev_name data_df["设备IP"] = dev_ip data_df["账号"] = dev_user data_df["密码"] = dev_pw data_df["文件路径"] = file_path try: writer = pd.ExcelWriter(self.xlsx_file_name) data_df.to_excel(writer, sheet_name='dev_info', index=False) writer.close() except PermissionError: print("请先关闭文档,再次尝试") def update_password(self, key, init_vector): dev_name = [] dev_ip = [] dev_user = [] dev_pw = [] file_path = [] dev_pd = pd.read_excel(self.xlsx_file_name, sheet_name='dev_info') # 赋予i=0 密码已加密,i=1 把明文密码转为加密密码 i = 0 for line_str in dev_pd.values: dev_name = dev_name + [line_str[0].strip()] dev_ip = dev_ip + [line_str[1].strip()] dev_user = dev_user + [line_str[2].strip()] # try: # dev_pw = dev_pw + [line_str[3].strip()] # except AttributeError: # dev_pw = dev_pw + [line_str[3]] file_path = file_path + [line_str[4].strip()] # 如果密码是单纯的数字 len(123)会出错,需要转换为str类型 try: if int(line_str[3]): password_len = len(str(line_str[3])) passwd_str = str(line_str[3]) if not CryptPassword.is_hex(passwd_str) or (password_len < 32): new_password = CryptPassword(key, init_vector).encrypt_str(passwd_str) # print(new_password) # print(dev_pw) dev_pw = dev_pw + [new_password] i = 1 elif str(line_str[3].strip()): password_len = len(line_str[3].strip()) if not CryptPassword.is_hex(line_str[3].strip()) or (password_len < 32): new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].encode('ascii')) dev_pw = dev_pw + [new_password] i = 1 else: return "不存在的类型" except ValueError: # 密码不是16进制,则进行加密 password_len = len(line_str[3].strip()) if (not CryptPassword.is_hex(line_str[3].strip())) or (password_len < 32): new_password = CryptPassword(key, init_vector).encrypt_str(line_str[3].strip()) dev_pw = dev_pw + [new_password] i = 1 else: new_password = [line_str[3]] # 如果密码是16进制,new_password 取表中密码,赋予Dev_pw dev_pw = dev_pw + new_password # 密码有加密,则重新写xls表,否则不需要重写 # print(dev_ip) # print(dev_user) # print(dev_pw) # print(file_path) if i == 1: MachiningText(self.xlsx_file_name).write_xlsx_file(dev_name, dev_ip, dev_user, dev_pw, file_path) if __name__ == '__main__': key_word = 'QWErtyuio@098765' init_vector_word = 'MEIYuanTian--Xia' MachiningText(r'./dev_info.xlsx').update_password(key_word, init_vector_word) print("完成")

ftp_server.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # 模拟客户端进行FTP服务器验证 import ftplib import utils class FTPServer: def __init__(self, ftp_server): self.ftp_server = ftp_server def connect_ftp_server(self, ftp_user, ftp_password) -> bool: print("正在检测FTP服务器,请稍候......") try: ftp = ftplib.FTP(self.ftp_server) print(ftp.getwelcome()) ftp.login(ftp_user, ftp_password) ftp.encoding = 'utf-8' # 230 Logged in 230代表登录成功 login_response = ftp.login(ftp_user, ftp_password) # 通过指定分隔符对字符串进行切片 login_response.split(" ")[0] if int(login_response.split(" ")[0]) == 230: print("FTP服务器登陆成功!") utils.writelog("FTP服务器登陆成功!\n") return True else: return False except ConnectionRefusedError: print("FTP服务器无法登录,请检查服务是否启动!") utils.writelog("FTP服务器无法登录,请检查服务是否启动!\n") return False if __name__ == '__main__': user = 'python' password = '123456' server = '10.10.10.2' FTPServer(server).connect_ftp_server(user, password) print("完成")

utils.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # 代码运行日志生成器 import datetime import msvcrt def writelog(logstr): try: f = open('./result.log', 'a') out = f.write(str(datetime.datetime.now()) + ":-->" + logstr) f.close() return out except Exception as _result: return print(_result) # 如果不打包exe不需要 借鉴前人经验 def prompt_msg(tmep_str): print("------------------------------------------------------------------") print("如发现程序有问题,请联系作者,QQ:1450561049@qq.com\n") print("按任意键退出......") print("------------------------------------------------------------------") return ord(msvcrt.getch())

run_backup.py

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # 利用paramiko进行网络设备ssh连接和备份命令操作 from configuration_rw import YamlReadWrite from write_excels import MachiningText from ftp_server import FTPServer from password_aes import CryptPassword import pandas as pd import paramiko import time import utils import os def connect_device_ssh(dev_name, dev_ip, dev_username, dev_pw, filename_path): try: # 创建一个SSH客户端 ssh_client = paramiko.SSHClient() # AutoAddPolicy: 自动添加主机名和主机密钥 # set_missing_host_key_policy: 一个连接主机的策略(没有本地主机秘钥或者HostKeys对象时) ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh_client.connect(hostname=dev_ip, port=22, username=dev_username, password=dev_pw) # 打开连接到的终端,使用ssh shell通道,相当于使用ssh远程到了主机上 command = ssh_client.invoke_shell() # send发送内容,然后将字符串进行编码,编码后中文仍乱码 command.send(("ftp " + _FTP_HOST).encode()) command.send(b"\n") command.send(_FTP_USER + "\n") time.sleep(1) command.send(_FTP_PW + "\n") time.sleep(1) command.send(b"\n") time.sleep(1) command.send(b"\n") # 备份flash:/vrpcfg.zip,并提取配置名称 cfg_name = filename_path.split("/")[1].split(".") expanded_name = "." + cfg_name[1] # 拼接备份文件名称 bak_file_name = cfg_name[0] + "-" + dev_ip + "-" + time.strftime("%Y-%m-%d") + expanded_name # print (Bak_File_Name) command.send("put " + filename_path + " " + bak_file_name + "\n") # print("put " + filename_path + " " + Bak_File_Name + "\n") time.sleep(1) command.send(b"\n") received = command.recv(65535).decode() # 备份成功recv 包含“226 File received ok” if "226 File has been recieved" in received: print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!") utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份成功!\n") ssh_client.close() except: print(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。") utils.writelog(dev_name + "_" + dev_ip + "-" + filename_path + " --> 备份失败。######\n") # recv = command.recv(65535).decode() # recv接收响应,字节为65535,并进行解码 # result = recv.splitlines() # print(recv) if __name__ == '__main__': # key: 16或16的倍数 key = 'QWErtyuio@098765' # init_vector: 16位 init_vector = 'MEIYuanTian--Xia' # 读取程序当前目录路径 path = os.path.dirname(__file__) read_data = YamlReadWrite(path).yaml_read(file_name='AutoBakCfg.yml') if os.path.exists('./result.log'): os.remove('./result.log') # 私有常量 _IS_ENCRYPT = int(read_data['base-info']['is_encrypt']) _FTP_HOST = read_data['base-info']['ftp_server'] _FTP_USER = read_data['base-info']['ftp_user'] _FTP_PW = read_data['base-info']['ftp_password'] # _IS_ENCRYPT = 1, 把'./dev_info.xlsx中密码字段的明文密码改为密文(AES) if _IS_ENCRYPT == 1: xlsx_file_name = r'./Dev_info.xlsx' MachiningText(xlsx_file_name).update_password(key, init_vector) # 加密后把 _IS_ENCRYPT = 0 YamlReadWrite(path).yaml_write(file_name='AutoBakCfg.yml', encrypt_str='0') utils.writelog("把当前明文密码修改成功为密文密码并更改配置文件为is_encrypt参数为0") else: utils.writelog("dev_info.xlsx文件中密码字段已加密!") print("dev_info.xlsx文件中密码字段已加密!") if FTPServer(_FTP_HOST).connect_ftp_server(_FTP_USER, _FTP_PW): dev_file_name = './dev_info.xlsx' dev_pd = pd.read_excel(dev_file_name, sheet_name='dev_info') dev_info = dev_pd.values # Dev_Info.head() for line_str in dev_info: # print(line_str[0], line_str[1], line_str[2], line_str[3], line_str[4]) dev_name_str = line_str[0] dev_ip_str = line_str[1] dev_username_str = line_str[2] dev_pw_str = CryptPassword(key, init_vector).decode_str(line_str[3]) # print (PassWord) filename_path_str = line_str[4] connect_device_ssh(dev_name_str, dev_ip_str, dev_username_str, dev_pw_str, filename_path_str) # prompt_msg("备份完成,备份文件保存在FTP服务器的目录下") utils.writelog("备份完成,备份文件保存在FTP服务器的目录下") else: # prompt_msg("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。") utils.writelog("连接FTP服务器失败,请检查FTP服务是否启动,服务器IP,账号、密码是否正确。")


目前代码仅仅是刚刚成型,发现bug的地方还请评论反馈。

利用paramiko进行连接,可以对华为和华三的网络设备配文件进行自动化备份。后续将使用netmiko进行重构,添加对其他厂商网络设备的支持。

后续将重构和分享项目中使用到的模块和基础知识,以实践来驱动知识的学习和巩固,欢迎有兴趣的小伙伴一起加入,可新建一个群进行共同探讨。

再次感谢各位开源贡献者们和免费分享NetDevOps相关技术的前辈们,如 @弈心 @朱嘉盛 以及整个圈子的大佬们的分享。