51Testing软件测试论坛

标题: python实现ssh多层登录跳转 [打印本页]

作者: ufoofuufoofu    时间: 7 小时前
标题: python实现ssh多层登录跳转
    python的第三方库paramiko可以极大地方便我们实现ssh的各种交互操作,网上这个库的教程也很多,但通常都是说明如何批量登录或者命令处理,很少涉及到多层跳转,碰到的大多数也是要求借用openssh服务的代理机制来实现,对于环境的要求过高。其实我们可以自己编写对应的代码来实现这一功能。
  1. class SSHclient:
  2.     def __init__(self):
  3.         #self.log=paramiko.util.log_to_file(logfile,level=logging.WARNING)
  4.         self.ssh=paramiko.SSHClient()
  5.         self.channel=None
  6.         self.ssh.load_system_host_keys()
  7.         self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  8.         self.sftp=None
  9.    
  10.     def login(self,ip:str,port:int,user:str,pwd:str)->bool:
  11.         try:
  12.             self.ssh.connect(hostname=ip,port=port,username=user,password=pwd)           
  13.             self.channel=self.ssh.invoke_shell()#为保证多层跳转,需要这里开窗口
  14.             return True
  15.         except Exception as e:
  16.             print('%s连接出错%s'%(ip,e))
  17.             return False
  18.             
  19.     def cmd(self,cmd:str,timeout:int=5)->str:
  20.         # stdin,stdout,stderr=self.ssh.exec_command(cmd,timeout=timeout)
  21.         # return stdout.read().decode('utf8','ignore'),stderr.read().decode('utf8','ignore')
  22.         try:#与上面等价,只不过可以更加灵活处理
  23.             #self.channel=self.ssh.invoke_shell()
  24.             self.channel.sendall(cmd)
  25.             data=''
  26.         except Exception as e:
  27.             print(e, flush=True)
  28.         try:
  29.             datatmp = ''
  30.             time.sleep(timeout)
  31.             datatmp = self.channel.recv(49600).decode('utf-8', 'ignore')
  32.             data += datatmp
  33.             return data
  34.         except Exception as ex:
  35.             print(ex, flush=True)
  36.             return ''
复制代码
   通常教程中都会写stdin,stdout,stderr=self.ssh.exec_command(cmd,timeout=timeout); return stdout.read().decode('utf8','ignore'),stderr.read().decode('utf8','ignore');这两行代码,但这两行要求所有的命令在新开的进程中执行,执行完毕后就会关掉子进程,无法实现进一步的跳转交互。此时用self.channel=self.ssh.invoke_shell()开窗口,self.channel.sendall(cmd)发送内部命令,就可以将本层ssh当成透明的来传输。后续再通过以下代码就可以进行多层登录了
  1. def g_login(ip,port=22,user='root',pwd='root'):
  2.     conn = SSHclient() if port==22 else TelnetClient()
  3.     try:
  4.     if conn.login(ip,port,user,pwd):
  5.         print('login %s success'%ip)           
  6.         yield conn
  7.     except Exception as e:
  8.         print('login %s fail:%s'%(ip,e))
  9.     finally:
  10.         conn.logout()
  11.         print('logout %s'%ip)
复制代码







欢迎光临 51Testing软件测试论坛 (http://bbs.51testing.com/) Powered by Discuz! X3.2