1. 程式人生 > >python利用paramiko實現ssh連線及遠端執行命令

python利用paramiko實現ssh連線及遠端執行命令

python3.6, windows下

1,安裝paramiko模組

依賴cryptography:使用命令easy_install安裝該模組時,如果報錯“...microsoft visual c++ 14.0 is required.”,

可以下載.whl檔案,使用命令pip install cryptography.whl

其他依賴模組bcrypt,pynacl類似安裝

2, 需求

a, 實現執行遠端命令,並列印其返回值

b,實現執行遠端指令碼,並列印其返回值

#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import paramiko
from functools import wraps
from datetime import datetime


def timethis(func):
    """
    時間裝飾器,計算函式執行所消耗的時間
    :param func:
    :return:
    """
    @wraps(func)
    def wrapper(*args, **kwargs):
        start = datetime.now()
        result = func(*args, **kwargs)
        end = datetime.now()
        print(func.__name__, end-start)
        return result
    return wrapper


class SSHManager:
    def __init__(self, host, usr, passwd):
        self._host = host
        self._usr = usr
        self._passwd = passwd
        self._ssh = None
        self._sftp = None
        self._sftp_connect()
        self._ssh_connect()

    def __del__(self):
        if self._ssh:
            self._ssh.close()
        if self._sftp:
            self._sftp.close()

    def _sftp_connect(self):
        try:
            transport = paramiko.Transport((self._host, 22))
            transport.connect(username=self._usr, password=self._passwd)
            self._sftp = paramiko.SFTPClient.from_transport(transport)
        except Exception as e:
            raise RuntimeError("sftp connect failed [%s]" % str(e))

    def _ssh_connect(self):
        try:
            # 建立ssh物件
            self._ssh = paramiko.SSHClient()
            # 允許連線不在know_hosts檔案中的主機
            self._ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            # 連線伺服器
            self._ssh.connect(hostname=self._host,
                              port=22,
                              username=self._usr,
                              password=self._passwd,
                              timeout=5)
        except Exception:
            raise RuntimeError("ssh connected to [host:%s, usr:%s, passwd:%s] failed" %
                               (self._host, self._usr, self._passwd))

    def ssh_exec_cmd(self, cmd, path='~'):
        """
        通過ssh連線到遠端伺服器,執行給定的命令
        :param cmd: 執行的命令
        :param path: 命令執行的目錄
        :return: 返回結果
        """
        try:
            result = self._exec_command('cd ' + path + ';' + cmd)
            print(result)
        except Exception:
            raise RuntimeError('exec cmd [%s] failed' % cmd)

    def ssh_exec_shell(self, local_file, remote_file, exec_path):
        """
        執行遠端的sh指令碼檔案
        :param local_file: 本地shell檔案
        :param remote_file: 遠端shell檔案
        :param exec_path: 執行目錄
        :return:
        """
        try:
            if not self.is_file_exist(local_file):
                raise RuntimeError('File [%s] not exist' % local_file)
            if not self.is_shell_file(local_file):
                raise RuntimeError('File [%s] is not a shell file' % local_file)

            self._check_remote_file(local_file, remote_file)

            result = self._exec_command('chmod +x ' + remote_file + '; cd' + exec_path + ';/bin/bash ' + remote_file)
            print('exec shell result: ', result)
        except Exception as e:
            raise RuntimeError('ssh exec shell failed [%s]' % str(e))

    @staticmethod
    def is_shell_file(file_name):
        return file_name.endswith('.sh')

    @staticmethod
    def is_file_exist(file_name):
        try:
            with open(file_name, 'r'):
                return True
        except Exception as e:
            return False

    def _check_remote_file(self, local_file, remote_file):
        """
        檢測遠端的指令碼檔案和當前的指令碼檔案是否一致,如果不一致,則上傳本地指令碼檔案
        :param local_file:
        :param remote_file:
        :return:
        """
        try:
            result = self._exec_command('find' + remote_file)
            if len(result) == 0:
                self._upload_file(local_file, remote_file)
            else:
                lf_size = os.path.getsize(local_file)
                result = self._exec_command('du -b' + remote_file)
                rf_size = int(result.split('\t')[0])
                if lf_size != rf_size:
                    self._upload_file(local_file, remote_file)
        except Exception as e:
            raise RuntimeError("check error [%s]" % str(e))

    @timethis
    def _upload_file(self, local_file, remote_file):
        """
        通過sftp上傳本地檔案到遠端
        :param local_file:
        :param remote_file:
        :return:
        """
        try:
            self._sftp.put(local_file, remote_file)
        except Exception as e:
            raise RuntimeError('upload failed [%s]' % str(e))

    def _exec_command(self, cmd):
        """
        通過ssh執行遠端命令
        :param cmd:
        :return:
        """
        try:
            stdin, stdout, stderr = self._ssh.exec_command(cmd)
            return stdout.read().decode()
        except Exception as e:
            raise RuntimeError('Exec command [%s] failed' % str(cmd))


if __name__ == '__main__':
    ip = '192.168.159.142'
    usr = 'leo'
    passwd = '123'
    ssh = SSHManager(ip, usr, passwd)
    ssh.ssh_exec_cmd('ls')
    ssh.ssh_exec_shell('./test.sh', '/home/leo/test.sh', '/home/leo')

out: