本次实验将使用Django 3.0 配合 dwebsocket websocket组件,实现一个网页版的SSH命令行工具,其支持 vim , 支持 ping等交互命令,唯一的一个小缺点是略卡,不知道是我电脑问题还是这个socket框架不稳定呢,如果做项目建议不要手撸代码,其实已经有非常好的解决方案了 https://github.com/huashengdun/webssh
基本用法
命令执行
from django.shortcuts import render,HttpResponse
import subprocess,json
def term(request):
if request.method == "POST":
data = request.body.decode("utf-8")
if data == "ok":
proc = subprocess.Popen("ipconfig",stdout=subprocess.PIPE,shell=True)
cc = str(proc.stdout.readlines())
return HttpResponse(json.dumps({"cmd":cc}))
return render(request, "index.html")
光标问题已经解决了,找了很久的, *** 蛋啊。
from django.shortcuts import render,HttpResponse
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,cmd):
ssh.connect("192.168.1.20",port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
if not result:
result=stderr.read()
ssh.close()
return result.decode()
def term(request):
if request.method == "POST":
data = request.body.decode("utf-8")
if data == "ok":
a = ssh_cmd("root","123","22","ifconfig")
return HttpResponse(a)
return render(request, "index.html")
来我们继续,加上边框,进行定制。
{% extends "admin/base_site.html" %}
{% load i18n static %}
{% block content %}
批量命令执行CMD工具
{% endblock %}
from django.shortcuts import render,HttpResponse
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_shell(address,username,password,port,command):
ssh.connect(address,port=port,username=username,password=password)
stdin, stdout, stderr = ssh.exec_command(command)
result = stdout.read()
if not result:
result=stderr.read()
ssh.close()
return result.decode()
def term(request):
if request.method == "POST":
data = request.body.decode("utf-8")
if data == "ok":
a = ssh_shell("192.168.1.20","root","123","22","yum")
print(a)
return HttpResponse(a)
return render(request, "index.html")
继续改造啊,先玩着,后期上websocket.
views.py
from django.shortcuts import render,HttpResponse
import paramiko,json,time
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_shell(address,username,password,port,command):
try:
ssh.connect(address,port=port,username=username,password=password)
stdin, stdout, stderr = ssh.exec_command(command)
result = stdout.read()
if not result:
result=stderr.read()
ssh.close()
return result.decode()
except Exception:
ssh.close()
def term(request):
if request.method == "POST":
data = request.body.decode("utf-8")
json_data = json.loads(data)
address = json_data.get("address")
command = json_data.get("command")
if len(address) >=2 and len(command) >=2:
ret = ssh_shell(address,"root","123","22",command)
if ret !=None:
times = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
times = "---> \x1B[1;3;32m 执行时间: [ {} ] \x1B[0m".format(times)
address = "\x1B[1;3;33m 主机地址: [ {} ] \x1B[0m".format(address)
command = "\x1B[1;3;35m 执行命令: [ {} ] \x1B[0m".format(command)
retn = times + address + command + "\x1B[1;3;25m 回执: [ok] \x1B[0m"
return HttpResponse(retn)
else:
times = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
times = "---> \x1B[1;3;32m 执行时间: [ {} ] \x1B[0m".format(times)
address = "\x1B[1;3;33m 主机地址: [ {} ] \x1B[0m".format(address)
command = "\x1B[1;3;35m 执行命令: [ {} ] \x1B[0m".format(command)
retn = times + address + command + "\x1B[1;3;20m 回执: [Error] \x1B[0m"
return HttpResponse(retn)
else:
return HttpResponse("主机地址或命令行不能为空...")
return render(request, "index.html")
{% extends "admin/base_site.html" %}
{% load i18n static %}
{% block content %}
批量命令执行CMD工具
{% endblock %}
from MyWeb import views
urlpatterns = [
path('admin/', admin.site.urls),
path('term/',views.term)
]
Dwebsocket
网上找到一个老版本的,改造了一下 可以支持 django 3.0 。
index.html
django-websocket
Received Messages
views.py
from django.shortcuts import render,HttpResponse
from dwebsocket.decorators import accept_websocket,require_websocket
def index(request):
return render(request,"index.html")
@accept_websocket
def echo(request):
if not request.is_websocket():#判断是不是websocket连接
try:#如果是普通的http方法
message = request.GET['message']
return HttpResponse(message)
except:
return render(request,'index.html')
else:
for message in request.websocket:
request.websocket.send(message)#发送消息到客户端
from MyWeb import views
urlpatterns = [
path('', views.index),
path('echo/', views.echo),
]
页面加载后自动连接
from django.shortcuts import render,HttpResponse
from dwebsocket.decorators import accept_websocket
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd(user,passwd,port,cmd):
ssh.connect("192.168.1.20",port=port,username=user,password=passwd)
cmd=cmd
stdin, stdout, stderr = ssh.exec_command(cmd)
result = stdout.read()
if not result:
result=stderr.read()
ssh.close()
return result.decode()
def index(request):
return render(request,"index.html")
@accept_websocket
def echo(request):
if not request.is_websocket():#判断是不是websocket连接
try:
message = request.GET['message']
a = ssh_cmd("root","123","22","ls -lh")
print(a)
return HttpResponse(a)
except:
return render(request,'index.html')
else:
for message in request.websocket:
request.websocket.send(message)#发送消息到客户端
继续尝试,ssh隧道怎末开?
from django.shortcuts import render,HttpResponse
from dwebsocket.decorators import accept_websocket
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd():
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect("192.168.1.20", username="root", password="123")
ssh_session = client.get_transport().open_session()
return ssh_session
@accept_websocket
def echo(request):
sessions = ssh_cmd()
if not request.is_websocket():
return render(request,'index.html')
else:
for message in request.websocket:
sessions.exec_command(message)
ret = sessions.recv(2048)
request.websocket.send(ret)
简单的隧道。
import paramiko
tran = paramiko.Transport(('192.168.1.20', 22,))
tran.start_client()
tran.auth_password('root', '123')
chan = tran.open_session()
chan.get_pty()
chan.invoke_shell()
while True:
a = chan.recv(1024)
print(a)
chan.send('ls -lh\r')
from django.shortcuts import render,HttpResponse
from dwebsocket.decorators import accept_websocket
import paramiko
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_cmd():
tran = paramiko.Transport(('192.168.1.20', 22,))
tran.start_client()
tran.auth_password('root', '123')
chan = tran.open_session()
chan.get_pty()
chan.invoke_shell()
return chan
@accept_websocket
def echo(request):
sessions = ssh_cmd()
if not request.is_websocket():
return render(request,'index.html')
else:
for message in request.websocket:
while True:
ret = sessions.recv(2048)
request.websocket.send(ret)
完成了 webssh 代码还不太稳定,完善后放出来。
代码,其实比想象中的简单得多,我尝试了很久。
from django.shortcuts import render,HttpResponse
from dwebsocket.decorators import accept_websocket
import paramiko
def ssh_cmd():
tran = paramiko.Transport(('192.168.1.20', 22,))
tran.start_client()
tran.auth_password('root', '123')
chan = tran.open_session()
chan.get_pty(height=492,width=1312)
chan.invoke_shell()
return chan
sessions = ssh_cmd()
@accept_websocket
def echo(request):
if not request.is_websocket():
return render(request,'index.html')
else:
try:
for message in request.websocket:
ret = sessions.recv(10240)
request.websocket.send(ret)
sessions.send(message)
except Exception:
print("error")
最后加一个在线编辑器代码。
dwebsocket 最终代码: 通过使用dwebsocket插件实现一个简单的WebSSH命令行工具.
# name: views.py
from django.shortcuts import render,HttpResponse
from dwebsocket.decorators import accept_websocket
import paramiko
def ssh_cmd():
tran = paramiko.Transport(('192.168.1.20', 22,))
tran.start_client()
tran.auth_password('root', '123')
chan = tran.open_session()
chan.get_pty(height=492,width=1312)
chan.invoke_shell()
return chan
sessions = ssh_cmd()
@accept_websocket
def echo(request):
if not request.is_websocket():
return render(request,'index.html')
else:
try:
for message in request.websocket:
ret = sessions.recv(4096)
request.websocket.send(ret)
sessions.send(message)
except Exception:
print("error")
from MyWeb import views
urlpatterns = [
path('admin/', admin.site.urls),
path('echo/',views.echo)
]
批量CMD执行工具: 利用DjangoAdmin与Socket通信实现的主机批量执行并回显.
{% extends "admin/base_site.html" %}
{% load i18n static %}
{% block content %}
批量命令执行CMD工具
{% endblock %}
# name:views.py
from django.shortcuts import render
from dwebsocket.decorators import accept_websocket
import paramiko,time
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
def ssh_shell(address,username,password,port,command):
try:
ssh.connect(address,port=port,username=username,password=password,timeout=1)
stdin, stdout, stderr = ssh.exec_command(command)
result = stdout.read()
if not result:
result=stderr.read()
ssh.close()
return result.decode()
except Exception:
return 0
def CalculationIP(Addr_Count):
ret = []
try:
IP_Start = str(Addr_Count.split("-")[0]).split(".")
IP_Heads = str(IP_Start[0] + "." + IP_Start[1] + "." + IP_Start[2] +".")
IP_Start_Range = int(Addr_Count.split(".")[3].split("-")[0])
IP_End_Range = int(Addr_Count.split("-")[1])
for item in range(IP_Start_Range,IP_End_Range+1):
ret.append(IP_Heads+str(item))
return ret
except Exception:
return 0
@accept_websocket
def echo(request):
if not request.is_websocket():
return render(request, "index.html")
else:
for message in request.websocket:
data = eval(message)
Addr_list = CalculationIP(data['address'])
command = data['command']
for ip in Addr_list:
ret = ssh_shell(ip,"root","123","22",command)
if ret != 0:
times = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retn = "[Suces] ---> " + str(times) + " " + command + " " + ip
request.websocket.send(retn)
else:
times = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
retn = "[Error] ---> " + str(times) + " " + command + " " + ip
request.websocket.send(retn)
# name:urls.py
from MyWeb import views
urlpatterns = [
path('admin/', admin.site.urls),
path("echo/",views.echo)
]
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)