服务器巡检不再头疼:一键脚本批量检查,高效生成巡检报告
手工巡检需要逐台登录服务器,耗费大量时间和精力,同时容易因操作疏漏而遗漏关键指标,导致数据不统一、风险增大;而自动化巡检利用脚本批量采集数据,实现实时监控和标准化输出,大幅提升效率并降低人为错误,从而为系统稳定运行提供坚实保障。
为了解决这些问题,我们可以使用 自动化脚本 来批量巡检服务器,并自动生成巡检表。如下图所示:
获取本文的脚本可以关注公众号【攻城狮成长日记】,私信回复
脚本链接
即可获取。
自动化巡检的优势
相比手工巡检,使用脚本进行自动化巡检具有以下优势:
- 批量执行:一次性检查所有服务器,提升巡检效率;
- 减少人工干预:降低人为错误,提高数据准确性;
- 标准化输出:巡检数据统一格式,方便存储和分析;
- 可扩展性:脚本可根据需求扩展,支持更多巡检项。
编写批量巡检脚本
为了让脚本更灵活,适应更多情况,我们换了个新思路:让用户自己定义要检查的项目,并用正则表达式来提取结果,同时并定义展示的模板。数据类似如下:
# 定义巡检配置项
inspection_configs=[
{
"name": "内核版本",
"command": "uname -r", # 修正命令为获取内核版本的正确命令
"regex_pattern": r"^(\d+\.\d+\.\d+-\d+-\w+)", # 精确匹配
"format_template": "内核版本: {0}"
},
....
]
[!info] 解析
command
: 是指定要执行的命令,例如上述的uname -r
。regex_pattern
:是指通过正则匹配想要的结果。format_template
:是指在巡检表展示的数据。例如内核版本: {0}
,其中{0}
会被填入匹配到的真实数据。
通过以下的ssh_connect
函数进行远程SSH
连接到服务,然后,再通过execute_inspection
函数执行巡检命令。
def ssh_connect(server):
"""建立SSH连接"""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(
hostname=server["hostname"],
port=server["port"],
username=server["username"],
password=server["password"]
)
return client
except Exception as e:
print(f"连接{server['hostname']}失败: {str(e)}")
return None
def execute_inspection(client, config):
"""执行单个巡检项并解析结果"""
stdin, stdout, stderr = client.exec_command(config["command"])
output = stdout.read().decode()
error = stderr.read().decode()
if error:
print(f"命令执行错误: {config['command']}\n{error}")
return None
#使用正则表达式提取数据
match = re.search(config["regex_pattern"], output)
if match:
return match.groups()
else:
print(f"未匹配到数据: {config['name']}")
return None
通过以下函数进行数据格式化,
def format_result(config, result):
if "regex_groups" in config:
params = dict(zip(config["regex_groups"], result))
return config["format_template"].format(**params)
else:
return config["format_template"].format(*result)
通过generate_report
函数,生成巡检报告。
def generate_report(data, filename):
"""生成合并IP单元格的巡检报告"""
report_data = []
for ip, inspections in data.items():
ip_entry = {
"IP": ip,
"巡检项": [],
"结果": []
}
for item_name, result in inspections.items():
ip_entry["巡检项"].append(item_name)
ip_entry["结果"].append(
format_result(
next(c for c in inspection_configs if c["name"] == item_name),
result
)
)
report_data.append(ip_entry)
# 转换为二维表格(保留空值用于单元格合并)
max_items = max(len(entry["巡检项"]) for entry in report_data)
rows = []
for entry in report_data:
rows.append([entry["IP"], entry["巡检项"][0], entry["结果"][0]])
for i in range(1, len(entry["巡检项"])):
rows.append(["", entry["巡检项"][i], entry["结果"][i]])
df = pd.DataFrame(rows, columns=["IP", "巡检项", "结果"])
# 生成Excel文件
with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
df.to_excel(writer, index=False, sheet_name='巡检报告')
workbook = writer.book
worksheet = writer.sheets['巡检报告']
# 设置列宽
worksheet.set_column('A:A', 15) # IP列
worksheet.set_column('B:B', 25) # 巡检项列
worksheet.set_column('C:C', 45) # 结果列
# 标题格式
header_format = workbook.add_format({
'bold': True,
'bg_color': '#4CAF50',
'font_color': 'white',
'border': 1,
'valign': 'vcenter'
})
for col_num, value in enumerate(df.columns.values):
worksheet.write(0, col_num, value, header_format)
# 数据格式
data_format = workbook.add_format({
'border': 1,
'valign': 'top'
})
for row in range(1, len(df)+1):
for col in range(3):
worksheet.write(row, col, df.iat[row-1, col], data_format)
# 合并IP单元格
ip_col = 0
current_ip = None
merge_start = 1
for row_num in range(1, len(df)+1):
cell_value = df.iat[row_num-1, ip_col]
if cell_value:
if current_ip is not None and merge_start < row_num:
worksheet.merge_range(
merge_start, ip_col,
row_num-1, ip_col,
current_ip,
workbook.add_format({
'valign': 'vcenter',
'border': 1
})
)
current_ip = cell_value
merge_start = row_num
# 处理最后一个IP
if current_ip is not None and merge_start <= len(df):
worksheet.merge_range(
merge_start, ip_col,
len(df), ip_col,
current_ip,
workbook.add_format({
'valign': 'vcenter',
'border': 1
})
)
print(f"报告已生成: {os.path.abspath(filename)}")
脚本演示
- 通过定义需要巡检的设备,如下所示:
# 服务器配置列表
servers = [
{
"hostname": "192.168.31.100",
"port": 22,
"username": "root",
"password": "password" # 推荐使用密钥认证
},
{
"hostname": "192.168.31.101",
"port": 22,
"username": "root",
"password": "password" # 推荐使用密钥认证
},
{
"hostname": "192.168.31.102",
"port": 22,
"username": "root",
"password": "password" # 推荐使用密钥认证
},
# 添加更多服务器...
]
- 然后,再定义巡检项目,内容如下:
# 定义巡检配置项
inspection_configs = [
{
"name": "内存使用",
"command": "free -m",
"regex_pattern": r"Mem:\s+(\d+)\s+(\d+)\s+(\d+)",
"description": "获取内存总量、已用、空闲(单位:MB)",
"format_template": "总量: {0} MB, 已用: {1} MB, 空闲: {2} MB"
},
{
"name": "内核版本",
"command": "uname -r",
"regex_pattern": r"^(\d+\.\d+\.\d+-\d+-\w+)",
"format_template": "内核版本: {0}"
},
{
"name": "磁盘使用率",
"command": "df -h",
"regex_pattern": r"(\d+)%\s+/(?!.*snap)",
"description": "获取根分区使用率",
"format_template": "使用率: {0}%"
},
{
"name": "CPU核心数",
"command": "lscpu",
"regex_pattern": r"CPU\(s\):\s+(\d+)",
"description": "获取CPU核心数量",
"format_template": "CPU核心数: {0}%"
},
{
"name": "系统负载",
"command": "uptime",
"regex_pattern": r"load average:\s+([\d.]+),\s+([\d.]+),\s+([\d.]+)",
"description": "获取1分钟、5分钟、15分钟系统负载",
"format_template": "1分钟: {0} , 5分钟: {1} , 15分钟: {2} "
}
]
- 定义主函数方法,如下:
def main():
all_results = {}
for server in servers:
client = ssh_connect(server)
if client:
print(f"正在检查 {server['hostname']}...")
results = {}
for config in inspection_configs:
value = execute_inspection(client, config)
if value:
results[config["name"]] = value
all_results[server['hostname']] = results
client.close()
if all_results:
filename = f"server_inspection_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx"
generate_report(all_results, filename)
else:
print("未获取到任何服务器数据")
执行该方法后,会在当前目录下生成一个巡检表。内容如下所示:
总结
通过编写Shell
脚本,我们可以高效地进行服务器巡检,并生成标准化的巡检表,极大提升运维效率。同时,结合日志分析、异常告警、定时任务等功能,可构建更完善的自动化运维体系,减少手工操作带来的负担。
推荐阅读
本文是原创文章,采用 CC BY-NC-ND 4.0 协议,完整转载请注明来自 攻城狮小林
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果