百度AI人脸注册源码详解

2019-07-30  本文已影响0人  一个无趣的人W
Summer

1、起点AppConfig.client.addUser(image, imageType, groupId, userId, options)调用人脸注册api,从addUser进入

def addUser(self, image, image_type, group_id, user_id, options=None):
    """
        人脸注册
    """
    options = options or {}

    data = {}
    data['image'] = image
    data['image_type'] = image_type
    data['group_id'] = group_id
    data['user_id'] = user_id

    data.update(options)
    return self._request(self.__userAddUrl, json.dumps(data, ensure_ascii=False), {
        'Content-Type': 'application/json',
    })

详解:由此可见我们需要传四个必要参数image, image_type, group_id, user_id,一个可选参数options;
     接下来是 return self._request(self.__userAddUrl, json.dumps(data, ensure_ascii=False),根据__userAddUrl和序列化字段以及Content-Type来发送请求,看一下__userAddUrl是__userAddUrl = 'https://aip.baidubce.com/rest/2.0/face/v3/faceset/user/add',也就是说是向这个路由发送请求;

2、那就要看一下_request请求是怎么发送的,点进去看

def _request(self, url, data, headers=None):
    """
       self._request('', {})
    """
    try:
        result = self._validate(url, data)  # 判断路由和数据是否有效
        if result != True:
            return result

        authObj = self._auth()  # 实例化认证对象
        params = self._getParams(authObj)  # 实例化参数对象,传入的是认证对象,这里的参数就是access token

        data = self._proccessRequest(url, params, data, headers)  # 对请求进行处理,传入路由,参数,数据和报头
        headers = self._getAuthHeaders('POST', url, params, headers)  # 对报头进行认证,我猜测这里面有反爬措施
         # 通过实例化的对象发送请求,携带的参数为路由,数据,access token参数,报头,认证,超时时间;
         # 这里的__connectTimeout是自定义的连接超时时间,__socketTimeout是通过打开的连接传输数据的超时时间,单位都为ms
        response = self.__client.post(url, data=data, params=params,
                        headers=headers, verify=False, timeout=(
                            self.__connectTimeout,
                            self.__socketTimeout,
                        ), proxies=self._proxies
                    ) 
        obj = self._proccessResult(response.content)  # 最后返回的结果对象
        # 如果云端未注册并且调用接口的返回错误码是110,表示Access Token失效,这时就要再次进行认证
        if not self._isCloudUser and obj.get('error_code', '') == 110:  
            authObj = self._auth(True)
            params = self._getParams(authObj)
            response = self.__client.post(url, data=data, params=params,
                            headers=headers, verify=False, timeout=(
                                self.__connectTimeout,
                                self.__socketTimeout,
                             ), proxies=self._proxies
                        )
            obj = self._proccessResult(response.content)
    # 抛异常,连接超时
    except (requests.exceptions.ReadTimeout,requests.exceptions.ConnectTimeout) as e:
        return {
            'error_code': 'SDK108',
            'error_msg': 'connection or read data timeout',
        }

    return obj

3、至此调用基本结束,接下来看里面用到的几个方法:

def _auth(self, refresh=False):
    """
        api access auth,获得access_token
    """

    #未过期
    if not refresh:
        # 设置超时时间为一个月
        tm = self._authObj.get('time', 0) + int(self._authObj.get('expires_in', 0)) - 30
        if tm > int(time.time()):
            return self._authObj
    # 如果access token过期,重新获取,传入参数:grant_type,client_id,client_secret和设置超时时间
    obj = self.__client.get(self.__accessTokenUrl, verify=False, params={
        'grant_type': 'client_credentials',
        'client_id': self._apiKey,
        'client_secret': self._secretKey,
    }, timeout=(
        self.__connectTimeout,
        self.__socketTimeout,
    ), proxies=self._proxies).json()

    self._isCloudUser = not self._isPermission(obj)
    obj['time'] = int(time.time())  # 按照当前时间重置access token超时时间
    self._authObj = obj

    return obj  # 返回认证过access token的对象
# 得到的参数就是access_token,认证校验传输都要携带
    def _getParams(self, authObj):
        """
            api request http url params
        """

        params = {}

        if self._isCloudUser == False:
            params['access_token'] = authObj['access_token']

        return params
    def _proccessRequest(self, url, params, data, headers):
        """
            参数处理:在参数中加入语言、版本,比如python3
        """

        params['aipSdk'] = 'python'
        params['aipVersion'] = self.__version

        return data
    def _getAuthHeaders(self, method, url, params=None, headers=None):
        """
            api request http headers
        """

        headers = headers or {}
        params = params or {}

        if self._isCloudUser == False:  # 不是云端用户,返回报头
            return headers

        urlResult = urlparse(url)  # 解析路由
        for kv in urlResult.query.strip().split('&'):  # 对路由进行切分,取出有用的值把它赋给参数
            if kv:
                k, v = kv.split('=')
                params[k] = v

        # UTC timestamp,获取时间戳,主机号,版本等信息,保证唯一性
        timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
        headers['Host'] = urlResult.hostname
        headers['x-bce-date'] = timestamp
        version, expire = '1', '1800'

        # 1 Generate SigningKey
        # 拼接签名钥匙,使用hmac(HMAC是密钥相关的哈希运算消息认证码,HMAC运算利用哈希算法,以一个密钥和一个消息为输入,生成一个消息摘要作为输出)
        # 还要使用hashlib.sha256进行二次加密
        val = "bce-auth-v%s/%s/%s/%s" % (version, self._apiKey, timestamp, expire)
        signingKey = hmac.new(self._secretKey.encode('utf-8'), val.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()

        # 2 Generate CanonicalRequest
        # 2.1 Genrate CanonicalURI
        canonicalUri = quote(urlResult.path)
        # 2.2 Generate CanonicalURI: not used here
        # 2.3 Generate CanonicalHeaders: only include host here

        canonicalHeaders = []
        for header, val in headers.items():
            canonicalHeaders.append(
                '%s:%s' % (
                    quote(header.strip(), '').lower(),
                    quote(val.strip(), '')
                )
            )
        canonicalHeaders = '\n'.join(sorted(canonicalHeaders))

        # 2.4 Generate CanonicalRequest
        canonicalRequest = '%s\n%s\n%s\n%s' % (
            method.upper(),
            canonicalUri,
            '&'.join(sorted(urlencode(params).split('&'))),
            canonicalHeaders
        )

        # 3 Generate Final Signature
        signature = hmac.new(signingKey.encode('utf-8'), canonicalRequest.encode('utf-8'),
                        hashlib.sha256
                    ).hexdigest()

        headers['authorization'] = 'bce-auth-v%s/%s/%s/%s/%s/%s' % (
            version,
            self._apiKey,
            timestamp,
            expire,
            ';'.join(headers.keys()).lower(),
            signature
        )

        return headers  # 最后头部携带信息进行传输,其中最重要的是signature,为此进行了两次大加密,大加密中又有两次小加密。不得不说,重要信息在传输过程中一定要慎重!!!
    def _proccessResult(self, content):
        """
            formate result,格式化返回结果
        """

        if sys.version_info.major == 2:
            return json.loads(content) or {}
        else:
            return json.loads(content.decode()) or {}
上一篇下一篇

猜你喜欢

热点阅读