forked from bin862324915/serv00-automation
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
84 lines (75 loc) · 2.88 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import paramiko
import requests
import json
from datetime import datetime, timezone, timedelta
def ssh_multiple_connections(hosts_info, command):
users = []
hostnames = []
for host_info in hosts_info:
hostname = host_info['hostname']
username = host_info['username']
password = host_info['password']
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname=hostname, port=22, username=username, password=password)
stdin, stdout, stderr = ssh.exec_command(command)
user = stdout.read().decode().strip()
users.append(user)
hostnames.append(hostname)
ssh.close()
except Exception as e:
print(f"用户:{username},连接 {hostname} 时出错: {str(e)}")
return users, hostnames
ssh_info_str = os.getenv('SSH_INFO', '[]')
hosts_info = json.loads(ssh_info_str)
command = 'whoami'
user_list, hostname_list = ssh_multiple_connections(hosts_info, command)
user_num = len(user_list)
content = "SSH服务器登录信息:\n"
for user, hostname in zip(user_list, hostname_list):
content += f"用户名:{user},服务器:{hostname}\n"
beijing_timezone = timezone(timedelta(hours=8))
time = datetime.now(beijing_timezone).strftime('%Y-%m-%d %H:%M:%S')
menu = requests.get('https://api.zzzwb.com/v1?get=tg').json()
loginip = requests.get('https://api.ipify.org?format=json').json()['ip']
content += f"本次登录用户共: {user_num} 个\n登录时间:{time}\n登录IP:{loginip}"
push = os.getenv('PUSH')
def mail_push(url):
data = {
"body": content,
"email": os.getenv('MAIL')
}
response = requests.post(url, json=data)
try:
response_data = json.loads(response.text)
if response_data['code'] == 200:
print("推送成功")
else:
print(f"推送失败,错误代码:{response_data['code']}")
except json.JSONDecodeError:
print("连接邮箱服务器失败了")
def telegram_push(message):
url = f"https://api.telegram.org/bot{os.getenv('TELEGRAM_BOT_TOKEN')}/sendMessage"
payload = {
'chat_id': os.getenv('TELEGRAM_CHAT_ID'),
'text': message,
'parse_mode': 'HTML',
'reply_markup': json.dumps({
"inline_keyboard": menu,
"one_time_keyboard": True
})
}
headers = {
'Content-Type': 'application/json'
}
response = requests.post(url, json=payload, headers=headers)
if response.status_code != 200:
print(f"发送消息到Telegram失败: {response.text}")
if push == "mail":
mail_push('https://zzzwb.us.kg/test')
elif push == "telegram":
telegram_push(content)
else:
print("推送失败,推送参数设置错误")