Python3 七牛云客户端模板
2023-04-23 本文已影响0人
Lyudmilalala
from qiniu import Auth, put_data, put_file
class Qiniu:
""" 单例模式七牛云文件上传 """
__instance = None
def __new__(cls, **kwargs):
if not cls.__instance:
cls.__instance = super().__new__(cls)
return cls.__instance
def __init__(self, **kwargs):
# print('kwargs = ', kwargs)
need_key = ('access_key', 'secret_key', 'domain', 'bucket_name')
for key in need_key:
val = kwargs.get(key, None)
if not val:
raise ValueError('{} is necessary.'.format(key))
setattr(self, key, val)
self._auth = Auth(self.access_key, self.secret_key)
def uploadData(self, data, save_file_name):
"""
Upload data from an IO stream
:param source_file_path: 源文件路径
:param save_file_name: 保存至七牛云的文件名
:return:
"""
token = self._auth.upload_token(self.bucket_name, save_file_name, 3600)
ret, info = put_data(token, save_file_name, data)
return ret, info
def uploadFilepath(self, filepath, save_file_name):
"""
Upload a file by its path
:param source_file_path: 源文件路径
:param save_file_name: 保存至七牛云的文件名
:return:
"""
token = self._auth.upload_token(self.bucket_name, save_file_name, 3600)
# print('filepath = ', filepath)
ret, info = put_file(token, save_file_name, filepath)
return ret, info
def generateDownloadLink(self, key, timeout=None):
"""
Generate download link for a file
:key: 七牛文件名
:timeout 下载链接多久超时【秒】
:return:
"""
if timeout is None or timeout <= 0:
return self._auth.private_download_url(self.domain + key)
else:
return self._auth.private_download_url(self.domain + key, expires=timeout)