在自动化运维领域,Ansible是一款非常强大的工具,它可以帮助我们管理和配置大量的服务器。为了让Ansible能够有效地管理这些服务器,我们需要一个hosts清单文件,该文件定义了Ansible要管理的目标主机。在实际应用中,我们可能会有一个包含大量服务器设备台账信息的文件,例如ip.txt,我们希望将这些IP地址转换为Ansible hosts清单的格式。本文将介绍如何使用Python脚本来实现这一目标。

场景描述

假设我们有一个ip.txt文件,其中包含了一系列需要管理的服务器IP地址。这些IP地址可能是分散的,也可能是连续的。我们希望将这些IP地址转换为Ansible hosts清单的格式,以便Ansible能够识别和管理这些服务器。具体来说,我们希望实现以下功能:

  • 读取ip.txt文件中的IP地址。

  • 对IP地址进行排序。

  • 将连续的IP地址范围合并为一个范围表示,例如192.168.1.[100:105]

  • 生成Ansible hosts清单文件,其中包含每个主机的IP地址或IP地址范围,以及相应的连接信息,如用户名、密码和端口号。

实现步骤

读取IP地址

首先,我们需要编写一个Python脚本来读取ip.txt文件中的IP地址。我们可以使用Python的内置函数open()来打开文件,并使用readlines()方法读取文件的每一行。以下是读取IP地址的代码示例:

def read_data_from_csv(file_path):
    data = []
    with open(file_path, 'r') as file:
        reader = csv.DictReader(file)
        for row in reader:
            data.append(row)
    return data

对IP地址进行排序

读取IP地址后,我们需要对它们进行排序。Python的内置函数sorted()可以很方便地对列表进行排序。以下是对IP地址进行排序的代码示例:

def sort_ips_in_group(grouped_data):
    for subnet, rows in grouped_data.items():
        grouped_data[subnet] = sorted(rows, key=lambda x: ip_to_int(x['IP']))
    return grouped_data

def ip_to_int(ip):
    parts = list(map(int, ip.split('.')))
    return (parts[0] << 24) + (parts[1] << 16) + (parts[2] << 8) + parts[3]

合并连续的IP地址范围

为了将连续的IP地址范围合并为一个范围表示,我们可以遍历排序后的IP地址列表,并检查每个IP地址是否与前一个IP地址连续。如果连续,则将它们合并为一个范围。以下是合并连续IP地址范围的代码示例:

def merge_ip_ranges(sorted_ips):
    merged_ranges = []
    current_range_start = None
    current_range_end = None

    for ip in sorted_ips:
        if current_range_start is None:
            current_range_start = ip
            current_range_end = ip
        elif int(ip.split('.')[-1]) == int(current_range_end.split('.')[-1]) + 1:
            current_range_end = ip
        else:
            if current_range_start == current_range_end:
                merged_ranges.append(f"{current_range_start}")
            else:
                merged_ranges.append(f"{current_range_start.split('.')[0]}.{current_range_start.split('.')[1]}.{current_range_start.split('.')[2]}.[{current_range_start.split('.')[-1]}:{current_range_end.split('.')[-1]}]")
            current_range_start = ip
            current_range_end = ip

    # 处理最后一个范围
    if current_range_start is not None:
        if current_range_start == current_range_end:
            merged_ranges.append(f"{current_range_start}")
        else:
            merged_ranges.append(f"{current_range_start.split('.')[0]}.{current_range_start.split('.')[1]}.{current_range_start.split('.')[2]}.[{current_range_start.split('.')[-1]}:{current_range_end.split('.')[-1]}]")

    return merged_ranges

生成Ansible hosts清单

为了简化IP地址管理,我们可以将连续的IP地址范围合并成一个更大的范围。具体来说,我们可以通过遍历已经排序的IP地址列表,并逐一检查每个IP地址是否与前一个地址连续。如果发现它们是连续的,我们就把它们合并在一起。这样做的好处是,不仅便于后续的管理和使用,还能让我们以每个IP段作为组名进行组织。最后,我们将这些合并后的IP地址范围转换成Ansible hosts清单的格式。Ansible hosts清单文件一般包括主机组、主机名或IP地址以及相关的连接信息等。下面是一个生成Ansible hosts清单的代码示例:

def generate_ansible_hosts(grouped_data):
    ansible_hosts = []
    added_ips = set()  # 使用集合来存储已经添加的IP地址

    def add_ip_range(start, end):
        start_str = ip_to_str(start)
        end_str = ip_to_str(end)
        start_parts = start_str.split('.')
        end_parts = end_str.split('.')
        
        if start_parts[:3] == end_parts[:3]:
            if start == end:
                # 如果 start 和 end 相同,直接添加单个 IP 地址
                ansible_hosts.append(start_str)
            else:
                # 合并相同的前三个部分
                ansible_hosts.append(f"{start_parts[0]}.{start_parts[1]}.{start_parts[2]}.[{start_parts[3]}:{end_parts[3]}]")
        else:
            ansible_hosts.append(f"{start_str}")

    for subnet, rows in grouped_data.items():
        if not rows:
            continue  # 跳过空的子网

        ansible_hosts.append(f"[{subnet}]")
        current_range_start = None
        current_range_end = None

        for row in rows:
            try:
                ip = row['IP']
                ip_int = ip_to_int(ip)

                if ip_int not in added_ips:  # 检查IP地址是否已经添加过
                    if current_range_start is None:
                        current_range_start = ip_int
                        current_range_end = ip_int
                    elif ip_int == current_range_end + 1:
                        current_range_end = ip_int
                    else:
                        add_ip_range(current_range_start, current_range_end)
                        current_range_start = ip_int
                        current_range_end = ip_int
                    added_ips.add(ip_int)  # 将IP地址添加到集合中
                elif current_range_start is None or not (current_range_start <= ip_int <= current_range_end):
                    ansible_hosts.append(ip)

            except KeyError as e:
                print(f"Missing key in row data: {e}")
                continue

        if current_range_start is not None:
            add_ip_range(current_range_start, current_range_end)

        ansible_hosts.append(f"[{subnet}:vars]")
        try:
            ansible_hosts.append(f"ansible_ssh_user={rows[0]['Username']}")
            ansible_hosts.append(f"ansible_ssh_pass={rows[0]['Password']}")
            ansible_hosts.append(f"ansible_ssh_port={rows[0]['Port']}")
        except (KeyError, IndexError) as e:
            print(f"Error accessing vars for subnet {subnet}: {e}")
            continue

        ansible_hosts.append("")

    return ansible_hosts

脚本使用演示

首先,通过一个脚本生成一下测试IP数据,脚本如下:


import random

def generate_random_ip(subnet):
    """生成一个随机的IP地址"""
    parts = subnet.split('.')
    if len(parts) < 4:
        parts.extend(['0'] * (4 - len(parts)))
    parts[3] = str(random.randint(1, 254))
    return '.'.join(parts)

def generate_test_data(file_path, num_records):
    """生成测试数据并写入CSV文件"""
    with open(file_path, 'w') as file:
        file.write("IP,Port,Username,Password\n")
        for _ in range(num_records):
            subnet = random.choice(['192.168.31', '192.168.32', '192.168.33', '192.168.34'])
            ip = generate_random_ip(subnet)
            port = 10022
            if subnet == "192.168.31":
                username = "root"
                password = "pass@123"
            elif subnet == "192.168.33":
                username = "root"
                password = "P@ssw0rd"
            else:
                username = "admin"
                password = "password"
            file.write(f"{ip},{port},{username},{password}\n")

# 示例文件路径
file_path = 'test_data.csv'
num_records = 800

# 生成测试数据并写入文件
generate_test_data(file_path, num_records)

执行上述脚本后,会在当前目录生成一个IP数据,这些IP数据,有连续的和不连续的。如下图所示:

然后,通过执行generate_ansible_hosts.py即可生成如下图的结果:

总结

通过以上步骤,我们成功地使用Python脚本将ip.txt文件转换为Ansible hosts清单。这个脚本可以帮助我们自动化生成Ansible hosts清单,提高运维效率。在实际应用中,我们可以根据需要对脚本进行进一步的扩展和优化,例如支持更多的连接信息、处理不同格式的IP地址等。希望本文对你有所帮助,如果你有任何问题或建议,欢迎留言讨论。

推荐阅读