1. 说明
Paramiko 实现 SFTP 连接一般有两种方式:
先建立 SSH 连接通道,在此通道上打开 SFTP 连接通道。
直接建立 SFTP 连接通道。
借鉴了 torndb 自动重连机制,封装一个 SFTP Client Class,实现了自动连接、可验证服务器公钥,封装了 ls 命令,除此外,以 “代理” 方式支持如下 SFTP commands(paramiko doc):
chdir, getcwd, mkdir, rmdir, open, get, getfo, put, putfo, listdir, listdir_attr, listdir_iter, remove, normalize, posix_rename, rename, stat
2. 依赖
- python 3.6+
- paramiko 3.0.0+
3.代码
# coding: utf8
#
# python 3.6+
# pip install paramiko
#
from paramiko import Transport, PKey, SFTPError
from time import time
from base64 import b64decode
from typing import Optional, List
class SFTP(object):
"""SFTP Client: AutoConnect(server), Verify(hostkey), Execute(command)."""
_ALLOWED_FUNCS = (
"chdir",
"getcwd",
"mkdir",
"rmdir",
"open",
"get",
"getfo",
"put",
"putfo",
"listdir",
"listdir_attr",
"listdir_iter",
"remove",
"normalize",
"posix_rename",
"rename",
"stat",
)
def __init__(
self,
host: str,
port: int,
username: str,
password: str,
*,
max_idle_time: int = 600,
server_public_key: Optional[str] = None,
):
"""Initiate SFTP client.
:param str host: SFTP server IP address or FQDN
:param int port: SFTP server port
:param str username: the username for logging into the SFTP server
:param str password: the password of the user logging into the SFTP server
:param int max_idle_time: maximum idle connection waiting time(second)
:param str server_public_key: the public key of the SFTP server, if provided, it will verify legality.
"""
self._opts = dict(
host=host,
port=port,
user=username,
pwd=password,
hkey=server_public_key,
)
self._transport = None
self._sftp = None
self._max_idle_time = float(max_idle_time)
self._last_use_time = time()
self.reconnect()
def reconnect(self):
if isinstance(self._transport, Transport):
if self._transport.is_active():
return
self._transport = Transport((self._opts["host"], self._opts["port"]))
self._transport.connect(
hostkey=self._key_to_obj(self._opts["hkey"]),
username=self._opts["user"],
password=self._opts["pwd"],
)
self._sftp = self._transport.open_sftp_client()
self._last_use_time = time()
def close(self):
"""Close sftp connect and transport"""
if self._sftp:
self._sftp.close()
if self._transport:
self._transport.close()
def is_active(self) -> bool:
"""Return true if this session is active (open)."""
return self._transport.is_active()
def _ensure_connected(self):
"""Referring to the MySQL idle waiting mechanism,
dropzone can customize the maximum idle waiting time and
automatically reconnect based on the operation interval time.
"""
if (self.is_active() is False) or (
time() - self._last_use_time > self._max_idle_time
):
self.reconnect()
self._last_use_time = time()
def _key_to_obj(self, pubkey: Optional[str]) -> Optional[PKey]:
"""Load and verify host key.
:param str pubkey: server public key, eg: [KeyType] <PubKey> [Comment]
:raises paramiko.pkey.UnknownKeyType: if bad key type
:raises paramiko.ssh_exception.SSHException: if bad key content
"""
if pubkey:
pk = pubkey.strip().split()
if len(pk) == 1:
keytype = "ssh-rsa"
encrypted = pk[0]
else:
keytype = pk[0]
encrypted = pk[1]
return PKey.from_type_string(keytype, b64decode(encrypted))
def ls(self, path: str, *, long: bool = False) -> List[str]:
"""Implement the sftp ls command.
:param str path: the SFTP path to be queried
:param bool long: longname format, same as: ls -la
"""
self._ensure_connected()
if long is True:
return [obj.longname for obj in self._sftp.listdir_iter(path)]
return self._sftp.listdir(path)
@property
def host_key(self):
return self._transport.host_key
def _unsupported_func(self, *args, **kw):
raise SFTPError("Unsupported sftp command")
def __getattr__(self, func):
if func in self._ALLOWED_FUNCS:
self._ensure_connected()
return getattr(self._sftp, func, self._unsupported_func)
raise AttributeError("Unsupport")
def __str__(self):
return f"<SFTP({self._opts['user']}@{self._opts['host']}:{self._opts['port']} {'Active' if self.is_active() else 'Inactive'}) at {hex(id(self))}>"
__repr__ = __str__
if __name__ == "__main__":
# examples
pubkey = "ssh-rsa XYZ"
sftp = SFTP("1.2.3.4", 22, "test", "test", server_public_key=pubkey)
# 1. 查看文件或目录
sftp.ls("/")
sftp.ls("/", long=True)
sftp.listdir("/")
# 2. 创建目录
sftp.mkdir("/demo")
# 3. 改变目录
sftp.chdir("/demo")
# 4. 查看当前目录
print(sftp.getcwd()) # /demo
sftp.chdir("/")
print(sftp.getcwd()) # /
"""
如果不调用 sftp.chdir(path) 的时候, 那么打印 sftp.getcwd() 时结果为 None
"""
# 5. 删除空目录(非空目录需要先删除其中的文件和子目录)
sftp.rmdir("/demo")
# 6. 删除文件
sftp.remove("/remote/path/to/an-existing-file")
# 7. 重命名文件或目录
sftp.rename("/an-existing-file", "/newfilename")
# 8. 查看文件或目录状态
print(sftp.stat("/"))
"""
drwxr-xr-x 1 0 0 4096 28 Nov 16:35 ?
不使用 print 时,为 SFTPAttributes 对象。
"""
# 9. 下载文件
sftp.get("/remote/path/to/an-existing-file", "/local/path/to/filename")
# 10. 上传文件
sftp.put("/local/path/to/an-existing-file", "/remote/path/<filename>")
# 最后关闭 sftp 和 transport
sftp.close()
4. 使用
上述代码已经有参考示例,需要注意的是初始化 SFTP 类时传入的 max_idle_time 参数, 原生 Linux SFTP 一般不限超时,可以设置为 0 或忽略,因为连接一直是 Active, 不会自动重连。
from sftp import SFTP
host = "<new dropzone fqdn>
port = 22
user = "<account>"
passwd = "<your account password>"
pubkey = "ssh-rsa <server public key>"
sftp = SFTP(host, port, user, passwd, server_public_key=pubkey)
# usage with help(sftp) / dir(sftp)