AWS Lambda笔记-Lambda Upload Cloud

2020-09-01  本文已影响0人  lazy_zhu

CloudSearch支持SDK方式上传数据,可通过创建一个Lambda函数来读取SNS通知消息,并将值写入CloudSearch中。那么实现通过/search?q=newuser搜索到新注册的用户信息的功能,就可以在新用户注册发布到SNS中信息以JSON格式写入,再用一个Lambda订阅JSON格式数据并写入CloudSearch实现。

  1. User对象序列化为JSON格式
  2. 新用户注册Lambda,发布Json格式的User到SNS
  3. 订阅SNS消息并反序列化User对象发邮件
  4. 订阅SNS消息并上传Json格式User到CloudSearch

工程说明

工程主要是通过SNS发布订阅,lambda-userregistration-cloudsearch(工程)接受订阅信息并写入CloudSearch中,lambda-userregistration-welcomemail(工程)接受订阅信息并发送邮件给新注册用户。


工程关系配置

1. User对象序列化为JSON格式

为方便Lambda接收SNS的用户数据为JSON,所以在新用户注册成功后发布到SNS,就需要将User对象格式化为JSON。具体需要将service-user工程中的User类添加Json注释(@JsonProperty)。

/*
 User POJO类
*/
public class User {

    @DynamoDBHashKey(attributeName = "UserId")
    @JsonProperty("userid")
    private String id;

    @DynamoDBIndexHashKey(globalSecondaryIndexName = "UsernameIndex", attributeName = "Username")
    @JsonProperty("username")
    private String username;

    @DynamoDBIndexHashKey(globalSecondaryIndexName = "EmailIndex", attributeName = "Email")
    @JsonProperty("email")
    private String email;
    //getter/setter 方法
}

2. 新用户注册Lambda,发布Json格式的User到SNS

lambda-userregistration工程的Handler类,在用户注册成功后,将User对象格式为JSON格式,并发布到SNS中。重点关注notifySnsSubscribers的new ObjectMapper().writeValueAsString(user)代码,将序列化的User对象发布到UserRegistrationSnsTopic的主题中。

    private void notifySnsSubscribers(User user) {
      try {
        //发布UserRegistrationSnsTopic主题,内容序列化的User对象
        amazonSNSClient.publish(System.getenv("UserRegistrationSnsTopic"), new ObjectMapper().writeValueAsString(user));
        LOGGER.info("SNS notification sent for "+user.getEmail());
      } catch (Exception anyException) {
        LOGGER.info("SNS notification failed for "+user.getEmail(), anyException);
      }
    }

3. 订阅SNS消息并反序列化User对象发邮件

lambda-userregistration-welcomemail订阅SNS消息,并反序列化User对象,为方便后续其他Lambda(如:lambda-userregistration-cloudsearch工程的Lambda)接受SNS消息并反序列化的重复操作,我们创建一个接受SNS消息并反序列化的Lambda基类(SnsLambdaHandler)。

public abstract class SnsLambdaHandler<I> implements RequestHandler<SNSEvent, Void> {

    private static final Logger LOGGER = Logger.getLogger(SnsLambdaHandler.class);

    private final ObjectMapper objectMapper;

    protected SnsLambdaHandler() {
        objectMapper=new ObjectMapper();
    }
    //需要子类实现的方法
    public abstract void handleSnsRequest(I input, Context context);

    @SuppressWarnings("unchecked")
    private Class<I> getJsonType() {
        return (Class<I>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
    }

    @Override
    public Void handleRequest(SNSEvent input, Context context) {
        //接受SNS消息
        input.getRecords().forEach(snsMessage -> {
            try {
                //接受消息,并反序列化
                I deserializedPayload = objectMapper.readValue(snsMessage.getSNS().getMessage(), getJsonType());
                handleSnsRequest(deserializedPayload, context);
            } catch (IOException anyException) {
                LOGGER.error("JSON could not be deserialized", anyException);
            }
        });
        return null;
    }
}

订阅SNS的Lambda类修改为继承SnsLambdaHandler类,并在类中实现public abstract void handleSnsRequest(I input, Context context);方法。由于之前是Eamil文本,现将User格式化为JSON,影响之前发送邮件业务逻辑,稍作修改,如果不关注发邮件逻辑可以直接跳过。

public class Handler extends SnsLambdaHandler<User> {
  //.....
  private void sendEmail(final User user) {
    final String emailAddress = user.getEmail();
    //收件地址
    Destination destination = new Destination().withToAddresses(emailAddress);
    Message message = new Message()
        .withBody(new Body().withText(new Content("Welcome to our forum!")))
        .withSubject(new Content("Welcome!"));
    //发送邮件,发件地址从配置的环境变量中获取
    //......
  }

  @Override
  public void handleSnsRequest(User input, Context context){
    //收到的是标准的SNSEvent事件
    //getRecords()返回的是一个列表,表示Lambda可能一次收多条SNS消息。
    //input.getRecords().forEach(snsMessage -> sendEmail(snsMessage.getSNS().getMessage()));
    //return null;
    sendEmail(input);
  }
}

4. 订阅SNS消息并上传Json格式User到CloudSearch

写入CloudSearch的Lambda同样需要订阅SNS并且需要将User反序列化,所以继承SnsLambdaHandler减少重复接受订阅消息和反序列化。
JSON格式数据写入CloudSearch,我们需要CloudSearch的AmazonCloudSearchDomainClient类帮忙,创建该类的同时需要设置Endpoint(即某个CloudSearch的Search Endpoint)的值。

Search Endpoint的值
]
还有一点需要注意,在uploadDocument时JSON格式需要如下方式(支持批量),id,type,fields都是必填字段。id是用来表示唯一性字段,type字段有add和delete分别是用来新增和删除文档内容。fields则是需要搜索的字段内容。下面类的uploadDocument方法主要功能是组装并写入CloudSearch。
[{
    "id": "1234-1234-1234",
    "type": "add",
    "fields": {
        "userid": "1234-1234-1234",
        "eamil": "abc@abc.com",
        "username": "testtest"
    }
}]

写入CloudSearch的Lambda函数:

public class Handler extends SnsLambdaHandler<User> {
    private static final Injector INJECTOR = Guice.createInjector();
    private static final Logger LOGGER = Logger.getLogger(Handler.class);
    private AmazonCloudSearchDomainClient amazonCloudSearchDomainClient;
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Inject
    public Handler setAmazonCloudSearchDomainClient(AmazonCloudSearchDomainClient amazonCloudSearchDomainClient) {
        //获取CloudSearch的端点
        this.amazonCloudSearchDomainClient = amazonCloudSearchDomainClient;
        this.amazonCloudSearchDomainClient.setEndpoint(System.getenv("CloudSearchDomain"));
        return this;
    }

    public Handler() {
        INJECTOR.injectMembers(this);
        Objects.nonNull(amazonCloudSearchDomainClient);
    }

    //更新CloudSearch文档
    private void uploadDocument(User user) {
        try {
            //创建CloudSearchAPI需要的数据格式,add,id,fields键是必须的。
            final Map<String, Object> documentRequest = new HashMap<>();
            documentRequest.put("type", "add");
            documentRequest.put("id", user.getId());
            documentRequest.put("fields", user);
            LOGGER.info("User with id " + user.getId() + " is being uploaded to CloudSearch");
            //documentRequest对象转为byte数组
            byte[] jsonAsByteStream = objectMapper.writeValueAsBytes(new Map[]{documentRequest});
            if (jsonAsByteStream != null) {
                ByteArrayInputStream document = new ByteArrayInputStream(jsonAsByteStream);
                amazonCloudSearchDomainClient.uploadDocuments(new UploadDocumentsRequest()
                        .withDocuments(document)
                        .withContentLength((long) document.available())
                        .withContentType(ContentType.Applicationjson)
                );
            }
        } catch (JsonProcessingException jsonProcessingException) {
            LOGGER.error("Object could not be converted to JSON", jsonProcessingException);
        } catch (Exception anyException) {
            LOGGER.error("Upload was failing", anyException);
        }
    }

    @Override
    public void handleSnsRequest(User input, Context context) {
        uploadDocument(input);
    }
}

该类中重点的依赖包:aws-java-sdk-cloudsearch是cloudsearch的SDK,具体依赖关系配置详见该工程下的build.gradle文件。
同样配置Lambda的cloudformation与其他Lambda配置相同,详见cloudformation.template中的UserRegistrationCloudSearchLambda,UserRegistrationCloudSearchLambdaPermission
最后一步:./gradlew deploy部署工程,部署成功后。
通过https://<youdomain>/users,body数据
{"username":"testuser24","email":"lazy24@163.com"}提交注册信息。
通过https://<youdomain>/search?q=testuser24检索到新注册用户数据。

检索到的数据

异常一

com.amazonaws.services.cloudsearchdomain.model.AmazonCloudSearchDomainException: User: arn:aws:sts::083845954160:assumed-role/serverlessbook-LambdaExecutionRole-1CQQ1SF5ASHEB/serverlessbook-UserRegistrationCloudSearchLambda-WI941096GZTW is not authorized to perform: cloudsearch:document on resource: serverlessbook (Service: AmazonCloudSearchDomain; Status Code: 403; Error Code: AccessDenied; Request ID: ebb327dc-6ff3-4a3f-8e92-65986e76babd; Proxy: null)

提示主要是Lambda在uploaddocument的时候没有权限。
解决方案:在对应的Lambda的Role中添加"arn:aws:iam::aws:policy/CloudSearchFullAccess"。该工程cloudformation.tempalte中涉及的是LambdaExecutionRole

"LambdaExecutionRole": {
    "Type": "AWS::IAM::Role",
    "Properties": {
        "Path": "/",
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17",
            "Statement": [{
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "lambda.amazonaws.com",
                        "apigateway.amazonaws.com"
                    ]
                },
                "Action": [
                    "sts:AssumeRole"
                ]
            }]
        },
        "ManagedPolicyArns": [
            "arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole",
            "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
            "arn:aws:iam::aws:policy/AWSLambdaFullAccess",
            "arn:aws:iam::aws:policy/CloudSearchFullAccess"
        ]
    }
}

Github代码地址:https://github.com/zhujinhuant/serverlessbook/tree/master/serverlessbook-15

上一篇 下一篇

猜你喜欢

热点阅读