每个公司的网络环境大都划分 办公网络、线上网络,之所以划分的主要原因是为了保证线上 *** 作安全;
对于外部用户而言也只能访问线上网络的特定开放端口,那么是什么控制了用户访问线上网络的呢?
防火墙过滤......!
对于内部员工而言对线上系统日常运维、代码部署如何安全访问线上业务系统呢?如何监控、记录技术人员的 *** 作记录?
堡垒机策略:
1.回收所有远程登录linux主机的用户名、密码;
2.中间设置堡垒机(保存所有线上linux主机的用户名、密码);
3.所有技术人员都要通过堡垒机去获取用户名、密码,然后在再去连接 线上系统,并记录 *** 作日志;
堡垒机策略优点:
1.记录用户 *** 作;
2.实现远程 *** 作权限集中管理;
一、堡垒机表结构设计
@H_404_57@
from django.db import modelsfrom django.contrib.auth.models import User# Create your models here.class IDC(models.Model): name = models.CharFIEld(max_length=64,unique=True) def __str__(self): return self.nameclass Host(models.Model): """存储所有主机信息""" hostname = models.CharFIEld(max_length=64,unique=True) ip_addr = models.GenericIPAddressFIEld(unique=True) port = models.IntegerFIEld(default=22) IDc = models.ForeignKey("IDC") #host_groups = models.ManyToManyFIEld("HostGroup") #host_users = models.ManyToManyFIEld("HostUser") enabled = models.BooleanFIEld(default=True) def __str__(self): return "%s-%s" %(self.hostname,self.ip_addr)class HostGroup(models.Model): """主机组""" name = models.CharFIEld(max_length=64,unique=True) host_user_binds = models.ManyToManyFIEld("HostUserBind") def __str__(self): return self.nameclass HostUser(models.Model): """存储远程主机的用户信息 root 123 root abc root sfsfs """ auth_type_choices = ((0,'ssh-password'),(1,'ssh-key')) auth_type = models.SmallintegerFIEld(choices=auth_type_choices) username = models.CharFIEld(max_length=32) password = models.CharFIEld(blank=True,null=True,max_length=128) def __str__(self): return "%s-%s-%s" %(self.get_auth_type_display(),self.username,self.password) class Meta: unique_together = ('username','password')class HostUserBind(models.Model): """绑定主机和用户""" host = models.ForeignKey("Host") host_user = models.ForeignKey("HostUser") def __str__(self): return "%s-%s" %(self.host,self.host_user) class Meta: unique_together = ('host','host_user')class SessionLog(models.Model): ''' 记录每个用户登录 *** 作,ID传给 shell生成文件命名 ''' account=models.ForeignKey('Account') host_user_bind=models.ForeignKey('HostUserBind') start_date=models.DateFIEld(auto_Now_add=True) end_date=models.DateFIEld(blank=True,null=True) def __str__(self): return '%s-%s'%(self.account,self.host_user_bind)class AuditLog(models.Model): """审计日志"""class Account(models.Model): """堡垒机账户 1. 扩展 2. 继承 user.account.host_user_bind """ user = models.OnetoOneFIEld(User) name = models.CharFIEld(max_length=64) host_user_binds = models.ManyToManyFIEld("HostUserBind",blank=True) host_groups = models.ManyToManyFIEld("HostGroup",blank=True)models.py
二、通过堡垒机远程登录linux主机
2种堡垒机登录方式:
命令行登录堡垒机方式:
方式1:通过 修改open_shh源码扩展-Z option生成唯一 ssh进程,使用linux的strace 命令对唯一 ssh进程进行检测生成日志文件;
0.用户执行audit_shell出现交互界面,提示用户输入机组和主机;
import sys,os,djangoos.environ.setdefault("DJANGO_SETTINGS_MODulE","zhanggen_audit.settings")django.setup() #在Django视图之外,调用Django功能设置环境变量!from audit.backend import user_interactiveif __name__ == '__main__': shell_obj=user_interactive.UserShell(sys.argv) shell_obj.start()audit_shell.py
from django.contrib.auth import authenticateclass UserShell(object): '''用户登录堡垒机,启动自定制shell ''' def __init__(self,sys_argv): self.sys_argv=sys_argv self.user=None def auth(self): count=0 while count < 3: username=input('username:').strip() password=input('password:').strip() user=authenticate(username=username,password=password) #none 代表认证失败,返回用户对象认证成功! if not user: count+=1 print('无效的用户名或者,密码!') else: self.user=user return True else: print('输入次数超过3次!') def start(self): """启动交互程序""" if self.auth(): # print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index, group in enumerate(host_groups): print("%s.t%s[%s]" % (index, group, group.host_user_binds.count())) print("%s.t未分组机器[%s]" % (len(host_groups), self.user.account.host_user_binds.count())) choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_List = None if choice >= 0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_List = selected_group.host_user_binds.all() elif choice == len(host_groups): # 选择的未分组机器 # selected_group = self.user.account.host_user_binds.all() host_bind_List = self.user.account.host_user_binds.all() if host_bind_List: while True: for index, host in enumerate(host_bind_List): print("%s.t%s" % (index, host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >= 0 and choice2 < len(host_bind_List): selected_host = host_bind_List[choice2] print("selected host", selected_host) elif choice2 == 'b': breakuser_interactive.py
知识点:
在Django视图之外,调用Django功能设置环境变量!(切记放在和Django manage.py 同级目录);
import sys,os,djangoos.environ.setdefault("DJANGO_SETTINGS_MODulE","zhanggen_audit.settings")
注意:在Django启动时会自动加载一些 文件,比如每个app中admin.py,不能在这些文件里面设置加载环境变量,因为已经加载完了,如果违反这个规则会导致Django程序启动失败;
1.实现ssh用户指令检测
1.0 修改open_shh源码,扩展 ssh -Z 唯一标识符;(这样每次ssh远程登录,都可以利用唯一标识符,分辨出 每个ssh会话进程;)
修改OpenSsh下的ssh.c文件的608和609行、935行增加; while ((opt = getopt(ac, av, "1246ab:c:e:fgi:kl:m:no:p:qstvxz:" "ACD:E:F:GI:J:KL:MNO:PQ:R:S:TVw:W:XYyZ:")) != -1) { case 'Z': break;ssh.c
知识点:
OpenSSH 是 SSH (Secure SHell) 协议的免费开源实现项目。
1.1 修改openssh之后,编译、安装
chmod 755 configure./configure --prefix=/usr/local/opensshmakechmod 755 mkinstalldirsmake installsshpass -p xxxxxx123 /usr/local/openssh/bin/ssh root@172.17.10.112 -Z s1123ssssd212
1.2 每个ssh会话进程可以唯一标识之后,在堡垒机使用会话脚本shell脚本检测 ssh会话进程;(strace命令进行监控,并生产 log日志文件);
#!/usr/bin/bashfor i in $(seq 1 30);do echo $i process_ID=`ps -ef | grep | grep -v 'ession_check.sh' | grep -v grep | grep -v sshpass | awk '{print }'` echo "process: $process_ID" if [ ! -z "$process_ID" ];then echo 'start run strace.....' strace -fp $process_ID -t -o .log; break; fi sleep 5done;ssh 会话检测脚本
知识点:
strace 检测进程的IO调用,监控用户shell输入的命令字符;
strace -fp 60864 -o /ssh.log cat /ssh.log |grep 'write(8' rz -E #从xshell上传文件
sshpass无需提示输入密码登录
[root@localhost sshpass-1.06]# sshpass -p wsnb ssh root@172.16.22.1 -o StrictHostKeyChecking=no Last login: Tue Jul 10 16:39:53 2018 from 192.168.113.84[root@ecdb ~]#
python生成唯一标识符
s=string.ascii_lowercase+string.digitsrandom_tag=''.join(random.sample(s,10))
解决普通用户,无法执行 strace命令;
方式1:执行文件 +s权限
chmod u+s `which strace`
方式2:修改sudo配置文件,使普通用户sudo时无需输入密码!
修改sudo配置文件,防止修改出错,一定要切换到root用户;%普通用户 ALL=(ALL) nopASSWD: ALLwq! #退出vim /etc/sudoers
#!/usr/bin/python3# -*- Coding: utf-8 -*from django.contrib.auth import authenticateimport subprocess,string,randomfrom audit import modelsfrom django.conf import settingsclass UserShell(object): '''用户登录堡垒机,启动自定制shell ''' def __init__(self,sys_argv): self.sys_argv=sys_argv self.user=None def auth(self): count=0 while count < 3: username=input('username:').strip() password=input('password:').strip() user=authenticate(username=username,password=password) #none 代表认证失败,返回用户对象认证成功! if not user: count+=1 print('无效的用户名或者,密码!') else: self.user=user return True else: print('输入次数超过3次!') def start(self): """启动交互程序""" if self.auth(): # print(self.user.account.host_user_binds.all()) #select_related() while True: host_groups = self.user.account.host_groups.all() for index, group in enumerate(host_groups): print("%s.t%s[%s]" % (index, group, group.host_user_binds.count())) print("%s.t未分组机器[%s]" % (len(host_groups), self.user.account.host_user_binds.count())) choice = input("select group>:").strip() if choice.isdigit(): choice = int(choice) host_bind_List = None if choice >= 0 and choice < len(host_groups): selected_group = host_groups[choice] host_bind_List = selected_group.host_user_binds.all() elif choice == len(host_groups): # 选择的未分组机器 # selected_group = self.user.account.host_user_binds.all() host_bind_List = self.user.account.host_user_binds.all() if host_bind_List: while True: for index, host in enumerate(host_bind_List): print("%s.t%s" % (index, host,)) choice2 = input("select host>:").strip() if choice2.isdigit(): choice2 = int(choice2) if choice2 >= 0 and choice2 < len(host_bind_List): selected_host = host_bind_List[choice2] s = string.ascii_lowercase + string.digits random_tag = ''.join(random.sample(s, 10)) session_obj=models.SessionLog.objects.create(account=self.user.account,host_user_bind=selected_host) session_tracker_scipt='/bin/sh %s %s %s'%(settings.SESSION_TRACKER_SCRIPT,random_tag,session_obj.pk) session_tracker_process=subprocess.Popen(session_tracker_scipt,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE) cmd='sshpass -p %s /usr/local/openssh/bin/ssh %s@%s -p %s -o stricthostkeychecking=no -Z %s' % (selected_host.host_user.password, selected_host.host_user.username, selected_host.host.ip_addr, selected_host.host.port,random_tag) subprocess.run(cmd,shell=True)#开启子进程交互 print(session_tracker_process.stdout.readlines(), session_tracker_process.stderr.readlines()) elif choice2 == 'b': break汇总
2.shell远程登录程序检查日志文件,分析;
tab补全的命令,需要搜素write(5,该脚本实现思路,按键去尝试,循环多种条件判断;
import reclass AuditLogHandler(object): '''分析audit log日志''' def __init__(self,log_file): self.log_file_obj = self._get_file(log_file) def _get_file(self,log_file): return open(log_file) def parse(self): cmd_List = [] cmd_str = '' catch_write5_flag = False #for tab complication for line in self.log_file_obj: #print(line.split()) line = line.split() try: pID,time_clock,io_call,char = line[0:4] if io_call.startswith('read(4'): if char == '"7",':#回退 char = '[1<-del]' if char == '"OB",': #vim中下箭头 char = '[down 1]' if char == '"OA",': #vim中下箭头 char = '[up 1]' if char == '"OC",': #vim中右移 char = '[->1]' if char == '"OD",': #vim中左移 char = '[1<-]' if char == '"33[2;2R",': #进入vim模式 continue if char == '"[>1;95;0c",': # 进入vim模式 char = '[----enter vim mode-----]' if char == '"[A",': #命令行向上箭头 char = '[up 1]' catch_write5_flag = True #取到向上按键拿到的历史命令 if char == '"[B",': # 命令行向上箭头 char = '[down 1]' catch_write5_flag = True # 取到向下按键拿到的历史命令 if char == '"[C",': # 命令行向右移动1位 char = '[->1]' if char == '"[D",': # 命令行向左移动1位 char = '[1<-]' cmd_str += char.strip('"",') if char == '"\t",': catch_write5_flag = True continue if char == '"\r",': cmd_List.append([time_clock,cmd_str]) cmd_str = '' # 重置 if char == '"':#space cmd_str += ' ' if catch_write5_flag: #to catch tab completion if io_call.startswith('write(5'): if io_call == '"7",': #空键,不是空格,是回退不了就是这个键 pass else: cmd_str += char.strip('"",') catch_write5_flag = False except ValueError as e: print("总结
以上是内存溢出为你收集整理的批量执行(Linux命令,上传/下载文件)全部内容,希望文章能够帮你解决批量执行(Linux命令,上传/下载文件)所遇到的程序开发问题。
如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)