手工巡检需要逐台登录服务器,耗费大量时间和精力,同时容易因操作疏漏而遗漏关键指标,导致数据不统一、风险增大;而自动化巡检利用脚本批量采集数据,实现实时监控和标准化输出,大幅提升效率并降低人为错误,从而为系统稳定运行提供坚实保障。

为了解决这些问题,我们可以使用 自动化脚本 来批量巡检服务器,并自动生成巡检表。如下图所示:

获取本文的脚本可以关注公众号【攻城狮成长日记】,私信回复脚本链接即可获取。

自动化巡检的优势

相比手工巡检,使用脚本进行自动化巡检具有以下优势:

  • 批量执行:一次性检查所有服务器,提升巡检效率;
  • 减少人工干预:降低人为错误,提高数据准确性;
  • 标准化输出:巡检数据统一格式,方便存储和分析;
  • 可扩展性:脚本可根据需求扩展,支持更多巡检项。

编写批量巡检脚本

为了让脚本更灵活,适应更多情况,我们换了个新思路:让用户自己定义要检查的项目,并用正则表达式来提取结果,同时并定义展示的模板。数据类似如下:

# 定义巡检配置项
inspection_configs=[
    {
    "name": "内核版本",
    "command": "uname -r",  # 修正命令为获取内核版本的正确命令
    "regex_pattern": r"^(\d+\.\d+\.\d+-\d+-\w+)",  # 精确匹配
    "format_template": "内核版本: {0}"
    },
    ....   
]

[!info] 解析

  • command: 是指定要执行的命令,例如上述的uname -r
  • regex_pattern:是指通过正则匹配想要的结果。
  • format_template:是指在巡检表展示的数据。例如内核版本: {0},其中{0}会被填入匹配到的真实数据。

通过以下的ssh_connect函数进行远程SSH连接到服务,然后,再通过execute_inspection函数执行巡检命令。

def ssh_connect(server):
    """建立SSH连接"""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    try:
        client.connect(
            hostname=server["hostname"],
            port=server["port"],
            username=server["username"],
            password=server["password"]
        )
        return client
    except Exception as e:
        print(f"连接{server['hostname']}失败: {str(e)}")
        return None

def execute_inspection(client, config):
    """执行单个巡检项并解析结果"""
    stdin, stdout, stderr = client.exec_command(config["command"])
    output = stdout.read().decode()
    error = stderr.read().decode()
    
    if error:
        print(f"命令执行错误: {config['command']}\n{error}")
        return None
    
    #使用正则表达式提取数据
    match = re.search(config["regex_pattern"], output)
    if match:
        return match.groups()
    else:
        print(f"未匹配到数据: {config['name']}")
        return None

通过以下函数进行数据格式化,

def format_result(config, result):
    if "regex_groups" in config:
        params = dict(zip(config["regex_groups"], result))
        return config["format_template"].format(**params)
    else:
        return config["format_template"].format(*result)

通过generate_report函数,生成巡检报告。

def generate_report(data, filename):
    """生成合并IP单元格的巡检报告"""
    report_data = []
    for ip, inspections in data.items():
        ip_entry = {
            "IP": ip,
            "巡检项": [],
            "结果": []
        }
        for item_name, result in inspections.items():
            ip_entry["巡检项"].append(item_name)
            ip_entry["结果"].append(
                format_result(
                    next(c for c in inspection_configs if c["name"] == item_name),
                    result
                )
            )
        report_data.append(ip_entry)

    # 转换为二维表格(保留空值用于单元格合并)
    max_items = max(len(entry["巡检项"]) for entry in report_data)
    rows = []
    for entry in report_data:
        rows.append([entry["IP"], entry["巡检项"][0], entry["结果"][0]])
        for i in range(1, len(entry["巡检项"])):
            rows.append(["", entry["巡检项"][i], entry["结果"][i]])

    df = pd.DataFrame(rows, columns=["IP", "巡检项", "结果"])

    # 生成Excel文件
    with pd.ExcelWriter(filename, engine='xlsxwriter') as writer:
        df.to_excel(writer, index=False, sheet_name='巡检报告')
        
        workbook = writer.book
        worksheet = writer.sheets['巡检报告']
        
        # 设置列宽
        worksheet.set_column('A:A', 15)  # IP列
        worksheet.set_column('B:B', 25)  # 巡检项列
        worksheet.set_column('C:C', 45)  # 结果列
        
        # 标题格式
        header_format = workbook.add_format({
            'bold': True, 
            'bg_color': '#4CAF50',
            'font_color': 'white',
            'border': 1,
            'valign': 'vcenter'
        })
        for col_num, value in enumerate(df.columns.values):
            worksheet.write(0, col_num, value, header_format)

        # 数据格式
        data_format = workbook.add_format({
            'border': 1,
            'valign': 'top'
        })
        for row in range(1, len(df)+1):
            for col in range(3):
                worksheet.write(row, col, df.iat[row-1, col], data_format)

        # 合并IP单元格
        ip_col = 0
        current_ip = None
        merge_start = 1
        
        for row_num in range(1, len(df)+1):
            cell_value = df.iat[row_num-1, ip_col]
            if cell_value:
                if current_ip is not None and merge_start < row_num:
                    worksheet.merge_range(
                        merge_start, ip_col,
                        row_num-1, ip_col,
                        current_ip, 
                        workbook.add_format({
                            'valign': 'vcenter',
                            'border': 1
                        })
                    )
                current_ip = cell_value
                merge_start = row_num
                
        # 处理最后一个IP
        if current_ip is not None and merge_start <= len(df):
            worksheet.merge_range(
                merge_start, ip_col,
                len(df), ip_col,
                current_ip,
                workbook.add_format({
                    'valign': 'vcenter',
                    'border': 1
                })
            )

    print(f"报告已生成: {os.path.abspath(filename)}")

脚本演示

  • 通过定义需要巡检的设备,如下所示:
# 服务器配置列表
servers = [
    {
        "hostname": "192.168.31.100",
        "port": 22,
        "username": "root",
        "password": "password"  # 推荐使用密钥认证
    },
        {
        "hostname": "192.168.31.101",
        "port": 22,
        "username": "root",
        "password": "password"  # 推荐使用密钥认证
    },
        {
        "hostname": "192.168.31.102",
        "port": 22,
        "username": "root",
        "password": "password"  # 推荐使用密钥认证
    },
    # 添加更多服务器...
]
  • 然后,再定义巡检项目,内容如下:
# 定义巡检配置项
inspection_configs = [
    {
        "name": "内存使用",
        "command": "free -m",
        "regex_pattern": r"Mem:\s+(\d+)\s+(\d+)\s+(\d+)",
        "description": "获取内存总量、已用、空闲(单位:MB)",
        "format_template": "总量: {0} MB, 已用: {1} MB, 空闲: {2} MB"  
    },
    {
    "name": "内核版本",
    "command": "uname -r", 
    "regex_pattern": r"^(\d+\.\d+\.\d+-\d+-\w+)",  
    "format_template": "内核版本: {0}"
    },
    {
        "name": "磁盘使用率",
        "command": "df -h",
        "regex_pattern": r"(\d+)%\s+/(?!.*snap)",
        "description": "获取根分区使用率",
        "format_template": "使用率: {0}%"
    },
    {
        "name": "CPU核心数",
        "command": "lscpu",
        "regex_pattern": r"CPU\(s\):\s+(\d+)",
        "description": "获取CPU核心数量",
        "format_template": "CPU核心数: {0}%"
    },
    {
        "name": "系统负载",
        "command": "uptime",
        "regex_pattern": r"load average:\s+([\d.]+),\s+([\d.]+),\s+([\d.]+)",
        "description": "获取1分钟、5分钟、15分钟系统负载",
        "format_template": "1分钟: {0} , 5分钟: {1} , 15分钟: {2} " 
    }
]
  • 定义主函数方法,如下:
def main():
    all_results = {}
    
    for server in servers:
        client = ssh_connect(server)
        if client:
            print(f"正在检查 {server['hostname']}...")
            results = {}
            for config in inspection_configs:
                value = execute_inspection(client, config)
                if value:
                    results[config["name"]] = value
            all_results[server['hostname']] = results
            client.close()
    
    if all_results:
        filename = f"server_inspection_{datetime.now().strftime('%Y%m%d_%H%M')}.xlsx"
        generate_report(all_results, filename)
    else:
        print("未获取到任何服务器数据")

执行该方法后,会在当前目录下生成一个巡检表。内容如下所示:

总结

通过编写Shell脚本,我们可以高效地进行服务器巡检,并生成标准化的巡检表,极大提升运维效率。同时,结合日志分析、异常告警、定时任务等功能,可构建更完善的自动化运维体系,减少手工操作带来的负担。

推荐阅读