TA的每日心情 | 慵懒 2024-9-26 17:55 |
---|
签到天数: 2 天 连续签到: 1 天 [LV.1]测试小兵
|
python的第三方库paramiko可以极大地方便我们实现ssh的各种交互操作,网上这个库的教程也很多,但通常都是说明如何批量登录或者命令处理,很少涉及到多层跳转,碰到的大多数也是要求借用openssh服务的代理机制来实现,对于环境的要求过高。其实我们可以自己编写对应的代码来实现这一功能。- class SSHclient:
- def __init__(self):
- #self.log=paramiko.util.log_to_file(logfile,level=logging.WARNING)
- self.ssh=paramiko.SSHClient()
- self.channel=None
- self.ssh.load_system_host_keys()
- self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.sftp=None
-
- def login(self,ip:str,port:int,user:str,pwd:str)->bool:
- try:
- self.ssh.connect(hostname=ip,port=port,username=user,password=pwd)
- self.channel=self.ssh.invoke_shell()#为保证多层跳转,需要这里开窗口
- return True
- except Exception as e:
- print('%s连接出错%s'%(ip,e))
- return False
-
- def cmd(self,cmd:str,timeout:int=5)->str:
- # stdin,stdout,stderr=self.ssh.exec_command(cmd,timeout=timeout)
- # return stdout.read().decode('utf8','ignore'),stderr.read().decode('utf8','ignore')
- try:#与上面等价,只不过可以更加灵活处理
- #self.channel=self.ssh.invoke_shell()
- self.channel.sendall(cmd)
- data=''
- except Exception as e:
- print(e, flush=True)
- try:
- datatmp = ''
- time.sleep(timeout)
- datatmp = self.channel.recv(49600).decode('utf-8', 'ignore')
- data += datatmp
- return data
- except Exception as ex:
- print(ex, flush=True)
- return ''
复制代码 通常教程中都会写stdin,stdout,stderr=self.ssh.exec_command(cmd,timeout=timeout); return stdout.read().decode('utf8','ignore'),stderr.read().decode('utf8','ignore');这两行代码,但这两行要求所有的命令在新开的进程中执行,执行完毕后就会关掉子进程,无法实现进一步的跳转交互。此时用self.channel=self.ssh.invoke_shell()开窗口,self.channel.sendall(cmd)发送内部命令,就可以将本层ssh当成透明的来传输。后续再通过以下代码就可以进行多层登录了- def g_login(ip,port=22,user='root',pwd='root'):
- conn = SSHclient() if port==22 else TelnetClient()
- try:
- if conn.login(ip,port,user,pwd):
- print('login %s success'%ip)
- yield conn
- except Exception as e:
- print('login %s fail:%s'%(ip,e))
- finally:
- conn.logout()
- print('logout %s'%ip)
复制代码
|
|