• 正文
  • 相关推荐
申请入驻 产业图谱

Python 实战 构建轻量级远程服务器批量控制平台

07/02 10:17
450
加入交流群
扫码加入
获取工程师必备礼包
参与热点资讯讨论

一、背景与目标

在企业或个人 DevOps 场景中,我们常常需要:

  • 同时向多台服务器下发命令(如同步脚本、重启服务)

  • 实时查看每台服务器的执行结果

  • 支持主机分组(按环境或角色划分)

  • 不借助 Ansible 等大型工具,保持轻量灵活

本项目使用纯 Python 实现一个“迷你 SSH 命令控制台”,并具备以下能力:

  • 支持多台主机批量命令下发

  • 支持主机配置、分组、备注

  • 支持并发执行命令与结果汇总

  • 支持失败重连、错误提示、执行日志保存

二、项目结构设计

bash
ssh_center/
├── main.py # 主入口,控制台界面
├── config.yaml # 主机配置文件(YAML 格式)
├── ssh_client.py # SSH 客户端封装
├── manager.py # 控制逻辑,广播命令
├── logs/
│ └── exec_log.txt # 执行日志记录

三、主机配置文件(config.yaml)

yaml
groups:
dev:
- name: dev1
host: 192.168.1.101
port: 22
user: root
password: password1
- name: dev2
host: 192.168.1.102
port: 22
user: root
password: password2
prod:
- name: prod1
host: 10.0.0.11
port: 22
user: admin
password: secret

四、SSH 客户端封装(ssh_client.py)

python

import paramiko

class SSHClient:
def __init__(self, host, port, user, password):
self.info = f"{user}@{host}:{port}"
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(host, port=port, username=user, password=password, timeout=5)

def exec_command(self, cmd):
stdin, stdout, stderr = self.client.exec_command(cmd)
out = stdout.read().decode()
err = stderr.read().decode()
return out.strip(), err.strip()

def close(self):
self.client.close()

五、批量执行管理器(manager.py)

python
import yaml
import threading
from ssh_client import SSHClient
from datetime import datetime
from pathlib import Path
def load_config(path="config.yaml"):
with open(path, 'r') as f:
return yaml.safe_load(f)

def save_log(entry):
Path("logs").mkdir(exist_ok=True)
with open("logs/exec_log.txt", "a") as f:
f.write(f"[{datetime.now()}] {entry}n")

def exec_on_host(info, cmd, results):
try:
ssh = SSHClient(info['host'], info['port'], info['user'], info['password'])
out, err = ssh.exec_command(cmd)
ssh.close()
msg = f"{info['name']} ({info['host']})n>>> {cmd}n{out or err}"
results.append(msg)
save_log(msg)
except Exception as e:
results.append(f"{info['name']} ERROR: {e}")
save_log(f"{info['name']} ERROR: {e}")

def broadcast_command(group_name, cmd):
config = load_config()
targets = config['groups'].get(group_name, [])
threads = []
results = []

for host in targets:
t = threading.Thread(target=exec_on_host, args=(host, cmd, results))
t.start()
threads.append(t)

for t in threads:
t.join()
return results

六、控制台交互入口(main.py)

python

from manager import broadcast_command, load_config

def main():
config = load_config()
print("远程主机分组:")
for group in config['groups']:
print(f" - {group}")

group = input("选择组名:")
if group not in config['groups']:
print("组名不存在")
return

while True:
cmd = input(f"[{group}] 请输入要执行的命令(输入 exit 退出):n> ")
if cmd.strip().lower() == "exit":
break
results = broadcast_command(group, cmd)
print("n=== 执行结果 ===")
for res in results:
print(res)
print("-" * 40)

if __name__ == "__main__":
main()

七、运行示例

bash

$ python main.py

远程主机分组:
- dev
- prod
选择组名:dev
[dev] 请输入要执行的命令(输入 exit 退出):
> uptime

=== 执行结果 ===
dev1 (192.168.1.101)
>>> uptime
14:23:12 up 3 days, 2 users, load average: 0.01, 0.05, 0.03
----------------------------------------
dev2 (192.168.1.102)
>>> uptime
14:23:13 up 1 day, 1 user, load average: 0.00, 0.01, 0.00
----------------------------------------

八、扩展方向建议

方向 描述
主机连接超时管理 添加每台主机的连接超时时间限制,避免阻塞
执行成功统计 成功 / 失败主机数量展示
支持密钥登录 可选择使用 ssh key 而非密码连接
命令别名支持 定义一批常用命令别名,如 restart_nginx
执行日志按日期归档 每天生成独立的日志文件
支持 YAML 外部任务定义 提前配置要运行的命令脚本集合

九、项目总结与价值

本项目实现了一个具备以下特性的“远程服务器批量控制平台”:

  • ✅ YAML 配置简洁可维护,支持分组管理

  • ✅ 使用标准库 + Paramiko,无需额外平台依赖

  • ✅ 线程并发处理任务,性能良好

  • ✅ 结构清晰,便于未来扩展为 GUI、Web 平台

可用于:

  • 小型企业内部服务器维护

  • 云主机 SSH 命令分发

  • 运维日常操作自动化

相关推荐