百度AI人脸注册源码详解
2019-07-30 本文已影响0人
一个无趣的人W
Summer
1、起点AppConfig.client.addUser(image, imageType, groupId, userId, options)
调用人脸注册api,从addUser进入
def addUser(self, image, image_type, group_id, user_id, options=None):
"""
人脸注册
"""
options = options or {}
data = {}
data['image'] = image
data['image_type'] = image_type
data['group_id'] = group_id
data['user_id'] = user_id
data.update(options)
return self._request(self.__userAddUrl, json.dumps(data, ensure_ascii=False), {
'Content-Type': 'application/json',
})
详解:由此可见我们需要传四个必要参数image, image_type, group_id, user_id,一个可选参数options;
接下来是 return self._request(self.__userAddUrl, json.dumps(data, ensure_ascii=False),根据__userAddUrl和序列化字段以及Content-Type来发送请求,看一下__userAddUrl是__userAddUrl = 'https://aip.baidubce.com/rest/2.0/face/v3/faceset/user/add',也就是说是向这个路由发送请求;
2、那就要看一下_request请求是怎么发送的,点进去看
def _request(self, url, data, headers=None):
"""
self._request('', {})
"""
try:
result = self._validate(url, data) # 判断路由和数据是否有效
if result != True:
return result
authObj = self._auth() # 实例化认证对象
params = self._getParams(authObj) # 实例化参数对象,传入的是认证对象,这里的参数就是access token
data = self._proccessRequest(url, params, data, headers) # 对请求进行处理,传入路由,参数,数据和报头
headers = self._getAuthHeaders('POST', url, params, headers) # 对报头进行认证,我猜测这里面有反爬措施
# 通过实例化的对象发送请求,携带的参数为路由,数据,access token参数,报头,认证,超时时间;
# 这里的__connectTimeout是自定义的连接超时时间,__socketTimeout是通过打开的连接传输数据的超时时间,单位都为ms
response = self.__client.post(url, data=data, params=params,
headers=headers, verify=False, timeout=(
self.__connectTimeout,
self.__socketTimeout,
), proxies=self._proxies
)
obj = self._proccessResult(response.content) # 最后返回的结果对象
# 如果云端未注册并且调用接口的返回错误码是110,表示Access Token失效,这时就要再次进行认证
if not self._isCloudUser and obj.get('error_code', '') == 110:
authObj = self._auth(True)
params = self._getParams(authObj)
response = self.__client.post(url, data=data, params=params,
headers=headers, verify=False, timeout=(
self.__connectTimeout,
self.__socketTimeout,
), proxies=self._proxies
)
obj = self._proccessResult(response.content)
# 抛异常,连接超时
except (requests.exceptions.ReadTimeout,requests.exceptions.ConnectTimeout) as e:
return {
'error_code': 'SDK108',
'error_msg': 'connection or read data timeout',
}
return obj
3、至此调用基本结束,接下来看里面用到的几个方法:
- ._auth()
def _auth(self, refresh=False):
"""
api access auth,获得access_token
"""
#未过期
if not refresh:
# 设置超时时间为一个月
tm = self._authObj.get('time', 0) + int(self._authObj.get('expires_in', 0)) - 30
if tm > int(time.time()):
return self._authObj
# 如果access token过期,重新获取,传入参数:grant_type,client_id,client_secret和设置超时时间
obj = self.__client.get(self.__accessTokenUrl, verify=False, params={
'grant_type': 'client_credentials',
'client_id': self._apiKey,
'client_secret': self._secretKey,
}, timeout=(
self.__connectTimeout,
self.__socketTimeout,
), proxies=self._proxies).json()
self._isCloudUser = not self._isPermission(obj)
obj['time'] = int(time.time()) # 按照当前时间重置access token超时时间
self._authObj = obj
return obj # 返回认证过access token的对象
- ._getParams(authObj)
# 得到的参数就是access_token,认证校验传输都要携带
def _getParams(self, authObj):
"""
api request http url params
"""
params = {}
if self._isCloudUser == False:
params['access_token'] = authObj['access_token']
return params
- ._proccessRequest(url, params, data, headers)
def _proccessRequest(self, url, params, data, headers):
"""
参数处理:在参数中加入语言、版本,比如python3
"""
params['aipSdk'] = 'python'
params['aipVersion'] = self.__version
return data
- ._getAuthHeaders('POST', url, params, headers)
def _getAuthHeaders(self, method, url, params=None, headers=None):
"""
api request http headers
"""
headers = headers or {}
params = params or {}
if self._isCloudUser == False: # 不是云端用户,返回报头
return headers
urlResult = urlparse(url) # 解析路由
for kv in urlResult.query.strip().split('&'): # 对路由进行切分,取出有用的值把它赋给参数
if kv:
k, v = kv.split('=')
params[k] = v
# UTC timestamp,获取时间戳,主机号,版本等信息,保证唯一性
timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
headers['Host'] = urlResult.hostname
headers['x-bce-date'] = timestamp
version, expire = '1', '1800'
# 1 Generate SigningKey
# 拼接签名钥匙,使用hmac(HMAC是密钥相关的哈希运算消息认证码,HMAC运算利用哈希算法,以一个密钥和一个消息为输入,生成一个消息摘要作为输出)
# 还要使用hashlib.sha256进行二次加密
val = "bce-auth-v%s/%s/%s/%s" % (version, self._apiKey, timestamp, expire)
signingKey = hmac.new(self._secretKey.encode('utf-8'), val.encode('utf-8'),
hashlib.sha256
).hexdigest()
# 2 Generate CanonicalRequest
# 2.1 Genrate CanonicalURI
canonicalUri = quote(urlResult.path)
# 2.2 Generate CanonicalURI: not used here
# 2.3 Generate CanonicalHeaders: only include host here
canonicalHeaders = []
for header, val in headers.items():
canonicalHeaders.append(
'%s:%s' % (
quote(header.strip(), '').lower(),
quote(val.strip(), '')
)
)
canonicalHeaders = '\n'.join(sorted(canonicalHeaders))
# 2.4 Generate CanonicalRequest
canonicalRequest = '%s\n%s\n%s\n%s' % (
method.upper(),
canonicalUri,
'&'.join(sorted(urlencode(params).split('&'))),
canonicalHeaders
)
# 3 Generate Final Signature
signature = hmac.new(signingKey.encode('utf-8'), canonicalRequest.encode('utf-8'),
hashlib.sha256
).hexdigest()
headers['authorization'] = 'bce-auth-v%s/%s/%s/%s/%s/%s' % (
version,
self._apiKey,
timestamp,
expire,
';'.join(headers.keys()).lower(),
signature
)
return headers # 最后头部携带信息进行传输,其中最重要的是signature,为此进行了两次大加密,大加密中又有两次小加密。不得不说,重要信息在传输过程中一定要慎重!!!
- ._proccessResult(response.content)
def _proccessResult(self, content):
"""
formate result,格式化返回结果
"""
if sys.version_info.major == 2:
return json.loads(content) or {}
else:
return json.loads(content.decode()) or {}