1. 说明

Paramiko 实现 SFTP 连接一般有两种方式:

  1. 先建立 SSH 连接通道,在此通道上打开 SFTP 连接通道。

  2. 直接建立 SFTP 连接通道。

借鉴了 torndb 自动重连机制,封装一个 SFTP Client Class,实现了自动连接、可验证服务器公钥,封装了 ls 命令,除此外,以 “代理” 方式支持如下 SFTP commands(paramiko doc):

chdir, getcwd, mkdir, rmdir, open, get, getfo, put, putfo, listdir, listdir_attr, listdir_iter, remove, normalize, posix_rename, rename, stat

2. 依赖

  • python 3.6+
  • paramiko 3.0.0+

3.代码

# coding: utf8
#
# python 3.6+
# pip install paramiko
#

from paramiko import Transport, PKey, SFTPError
from time import time
from base64 import b64decode
from typing import Optional, List


class SFTP(object):
    """SFTP Client: AutoConnect(server), Verify(hostkey), Execute(command)."""

    _ALLOWED_FUNCS = (
        "chdir",
        "getcwd",
        "mkdir",
        "rmdir",
        "open",
        "get",
        "getfo",
        "put",
        "putfo",
        "listdir",
        "listdir_attr",
        "listdir_iter",
        "remove",
        "normalize",
        "posix_rename",
        "rename",
        "stat",
    )

    def __init__(
        self,
        host: str,
        port: int,
        username: str,
        password: str,
        *,
        max_idle_time: int = 600,
        server_public_key: Optional[str] = None,
    ):
        """Initiate SFTP client.

        :param str host: SFTP server IP address or FQDN
        :param int port: SFTP server port
        :param str username: the username for logging into the SFTP server
        :param str password: the password of the user logging into the SFTP server
        :param int max_idle_time: maximum idle connection waiting time(second)
        :param str server_public_key: the public key of the SFTP server, if provided, it will verify legality.
        """
        self._opts = dict(
            host=host,
            port=port,
            user=username,
            pwd=password,
            hkey=server_public_key,
        )
        self._transport = None
        self._sftp = None
        self._max_idle_time = float(max_idle_time)
        self._last_use_time = time()
        self.reconnect()

    def reconnect(self):
        if isinstance(self._transport, Transport):
            if self._transport.is_active():
                return
        self._transport = Transport((self._opts["host"], self._opts["port"]))
        self._transport.connect(
            hostkey=self._key_to_obj(self._opts["hkey"]),
            username=self._opts["user"],
            password=self._opts["pwd"],
        )
        self._sftp = self._transport.open_sftp_client()
        self._last_use_time = time()

    def close(self):
        """Close sftp connect and transport"""
        if self._sftp:
            self._sftp.close()
        if self._transport:
            self._transport.close()

    def is_active(self) -> bool:
        """Return true if this session is active (open)."""
        return self._transport.is_active()

    def _ensure_connected(self):
        """Referring to the MySQL idle waiting mechanism,
        dropzone can customize the maximum idle waiting time and
        automatically reconnect based on the operation interval time.
        """
        if (self.is_active() is False) or (
            time() - self._last_use_time > self._max_idle_time
        ):
            self.reconnect()
        self._last_use_time = time()

    def _key_to_obj(self, pubkey: Optional[str]) -> Optional[PKey]:
        """Load and verify host key.

        :param str pubkey: server public key, eg: [KeyType] <PubKey> [Comment]
        :raises paramiko.pkey.UnknownKeyType: if bad key type
        :raises paramiko.ssh_exception.SSHException: if bad key content
        """
        if pubkey:
            pk = pubkey.strip().split()
            if len(pk) == 1:
                keytype = "ssh-rsa"
                encrypted = pk[0]
            else:
                keytype = pk[0]
                encrypted = pk[1]
            return PKey.from_type_string(keytype, b64decode(encrypted))

    def ls(self, path: str, *, long: bool = False) -> List[str]:
        """Implement the sftp ls command.

        :param str path: the SFTP path to be queried
        :param bool long: longname format, same as: ls -la
        """
        self._ensure_connected()
        if long is True:
            return [obj.longname for obj in self._sftp.listdir_iter(path)]
        return self._sftp.listdir(path)

    @property
    def host_key(self):
        return self._transport.host_key

    def _unsupported_func(self, *args, **kw):
        raise SFTPError("Unsupported sftp command")

    def __getattr__(self, func):
        if func in self._ALLOWED_FUNCS:
            self._ensure_connected()
            return getattr(self._sftp, func, self._unsupported_func)
        raise AttributeError("Unsupport")

    def __str__(self):
        return f"<SFTP({self._opts['user']}@{self._opts['host']}:{self._opts['port']} {'Active' if self.is_active() else 'Inactive'}) at {hex(id(self))}>"

    __repr__ = __str__


if __name__ == "__main__":
    # examples
    pubkey = "ssh-rsa XYZ"

    sftp = SFTP("1.2.3.4", 22, "test", "test", server_public_key=pubkey)

    # 1. 查看文件或目录
    sftp.ls("/")
    sftp.ls("/", long=True)
    sftp.listdir("/")

    # 2. 创建目录
    sftp.mkdir("/demo")

    # 3. 改变目录
    sftp.chdir("/demo")

    # 4. 查看当前目录
    print(sftp.getcwd())  # /demo
    sftp.chdir("/")
    print(sftp.getcwd())  # /
    """
    如果不调用 sftp.chdir(path) 的时候, 那么打印 sftp.getcwd() 时结果为 None
    """

    # 5. 删除空目录(非空目录需要先删除其中的文件和子目录)
    sftp.rmdir("/demo")

    # 6. 删除文件
    sftp.remove("/remote/path/to/an-existing-file")

    # 7. 重命名文件或目录
    sftp.rename("/an-existing-file", "/newfilename")

    # 8. 查看文件或目录状态
    print(sftp.stat("/"))
    """
    drwxr-xr-x   1 0        0            4096 28 Nov 16:35 ?
    不使用 print 时,为 SFTPAttributes 对象。
    """

    # 9. 下载文件
    sftp.get("/remote/path/to/an-existing-file", "/local/path/to/filename")

    # 10. 上传文件
    sftp.put("/local/path/to/an-existing-file", "/remote/path/<filename>")

    # 最后关闭 sftp 和 transport
    sftp.close()

4. 使用

上述代码已经有参考示例,需要注意的是初始化 SFTP 类时传入的 max_idle_time 参数, 原生 Linux SFTP 一般不限超时,可以设置为 0 或忽略,因为连接一直是 Active, 不会自动重连。

from sftp import SFTP
host = "<new dropzone fqdn>
port = 22
user = "<account>"
passwd = "<your account password>"
pubkey = "ssh-rsa <server public key>"

sftp = SFTP(host, port, user, passwd, server_public_key=pubkey)
# usage with help(sftp) / dir(sftp)

·End·