Python3 七牛云客户端模板

2023-04-23  本文已影响0人  Lyudmilalala
from qiniu import Auth, put_data, put_file

class Qiniu: 
    """ 单例模式七牛云文件上传 """ 
    __instance = None 
    
    def __new__(cls, **kwargs): 
        if not cls.__instance: 
            cls.__instance = super().__new__(cls) 
        return cls.__instance 
    
    def __init__(self, **kwargs): 
        # print('kwargs = ', kwargs)
        need_key = ('access_key', 'secret_key', 'domain', 'bucket_name') 
        for key in need_key: 
            val = kwargs.get(key, None) 
            if not val: 
                raise ValueError('{} is necessary.'.format(key)) 
            setattr(self, key, val) 
        self._auth = Auth(self.access_key, self.secret_key) 
        
    def uploadData(self, data, save_file_name): 
        """ 
            Upload data from an IO stream
            :param source_file_path: 源文件路径 
            :param save_file_name: 保存至七牛云的文件名 
            :return: 
        """ 
        token = self._auth.upload_token(self.bucket_name, save_file_name, 3600) 
        ret, info = put_data(token, save_file_name, data) 
        
        return ret, info 
    
    def uploadFilepath(self, filepath, save_file_name): 
        """ 
            Upload a file by its path
            :param source_file_path: 源文件路径 
            :param save_file_name: 保存至七牛云的文件名 
            :return: 
        """ 
        token = self._auth.upload_token(self.bucket_name, save_file_name, 3600) 
        # print('filepath = ', filepath)
        ret, info = put_file(token, save_file_name, filepath) 
        
        return ret, info

    def generateDownloadLink(self, key, timeout=None):
        """ 
            Generate download link for a file
            :key: 七牛文件名
            :timeout 下载链接多久超时【秒】 
            :return: 
        """ 
        if timeout is None or timeout <= 0:
            return self._auth.private_download_url(self.domain + key)
        else:
            return self._auth.private_download_url(self.domain + key, expires=timeout)
上一篇下一篇

猜你喜欢

热点阅读