aws服务从入门到精通| Amazon Kinesis 服务之F

2018-01-31  本文已影响0人  黑客和白帽子的故事

kinesis简介(什么是Kinesis)

Amazon Kinesis 可以轻松收集、处理和分析实时视频和数据流

由于作者英语太烂了,所以附上亚马逊的原文的英文如下:

Amazon Kinesis makes it easy to collect, process, and analyze video and data streams in real time.

kinesis 操作界面简介:

控制面板

很显然kinesis 只有四大部分组成:

图片

以上就是Kinesis 的四个功能所有的服务,读者可以根据实际场景进行选取对应的服务进行操作。我今天主要讲的操作服务是Firehose

Firehose的界面操作部分:

-2、点击 create delivery stream 按钮,出现以下界面:

image.png image.png image.png 界面
image.png image.png

Firehose代码操作:

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>1.7.5</version>
        </dependency>
 /**
     * 初始化流
     */
    @SuppressWarnings("deprecation")
    public static void initClients() {
        AWSCredentials credentials = null;
        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. "
                            + "Please make sure that your credentials file is at the correct "
                            + "location (~/.aws/credentials), and is in valid format.",
                    e);
        }

        // Firehose client
        firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));

    }
    /**
     * 
     * 单个写流
     */
    public static void addFireHose(Record record, String deliveryStreamName) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordRequest.setRecord(record);
        firehoseClient.putRecord(putRecordRequest);
    }
  /**
     * 批量添加数据到流里面
     */
    public static void addBatchFireHose(List<Record> records,
            String deliveryStreamName) {
        try {
            putRecordBatch(records, deliveryStreamName);
        } catch (Exception e) {
            LOGGER.error("写流错误" + e);
        }
    }


    private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
            String deliveryStreamName) {
        PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
        putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordBatchRequest.setRecords(recordList);
        return firehoseClient.putRecordBatch(putRecordBatchRequest);
    }

    /**
     * 更新流配置
     *
     * @throws Exception
     */
    public static void updateDeliveryStream(String deliveryOpenStreamName,
            String s3DestinationUpdateName) throws Exception {
        DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
        LOGGER.info("Updating DeliveryStream Destination: "
                + deliveryOpenStreamName + " with new configuration options");
        // get(0) -> DeliveryStream currently supports only one destination per
        // DeliveryStream
        UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
                .withDeliveryStreamName(deliveryOpenStreamName)
                .withCurrentDeliveryStreamVersionId(
                        deliveryStreamDescription.getVersionId())
                .withDestinationId(
                        deliveryStreamDescription.getDestinations().get(0)
                                .getDestinationId());

        S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
        s3DestinationUpdate.withPrefix(s3DestinationUpdateName);

        updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);

        firehoseClient.updateDestination(updateDestinationRequest);
    }

    /**
     * Method to describe the delivery stream.
     *
     * @param deliveryStreamName
     *            the delivery stream
     * @return the delivery description
     */
    private static DeliveryStreamDescription describeDeliveryStream(
            String deliveryStreamName) {
        DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
        describeDeliveryStreamRequest
                .withDeliveryStreamName(deliveryStreamName);
        DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
                .describeDeliveryStream(describeDeliveryStreamRequest);
        return describeDeliveryStreamResponse.getDeliveryStreamDescription();
    }

完整 代码:

package com.sdk.wifi.util.aws.firehose;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.DeliveryStreamDescription;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamRequest;
import com.amazonaws.services.kinesisfirehose.model.DescribeDeliveryStreamResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;
import com.amazonaws.services.kinesisfirehose.model.S3DestinationUpdate;
import com.amazonaws.services.kinesisfirehose.model.UpdateDestinationRequest;

public class FireHoseUtil {

    private static final Log LOGGER = LogFactory.getLog(FireHoseUtil.class);

    // DeliveryStream properties
    private static AmazonKinesisFirehoseClient firehoseClient;
    private static final String FIRE_HOSE_REGION = "us-west-2";

    /**
     * 批量添加数据到流里面
     */
    public static void addBatchFireHose(List<Record> records,
            String deliveryStreamName) {
        try {
            putRecordBatch(records, deliveryStreamName);
        } catch (Exception e) {
            LOGGER.error("写流错误" + e);
        }
    }

    /**
     * 
     * 单个写流
     */
    public static void addFireHose(Record record, String deliveryStreamName) {
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordRequest.setRecord(record);
        firehoseClient.putRecord(putRecordRequest);
    }

    /**
     * 字符串边record
     * 
     * @param data
     * @return
     * @throws UnsupportedEncodingException 
     */
    public static Record createRecord(String data) throws UnsupportedEncodingException {
        return new Record().withData(ByteBuffer.wrap(data.getBytes("UTF-8")));
    }

    /**
     * 初始化流
     */
    public static void initClients() {
        AWSCredentials credentials = null;
        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        } catch (Exception e) {
            throw new AmazonClientException(
                    "Cannot load the credentials from the credential profiles file. "
                            + "Please make sure that your credentials file is at the correct "
                            + "location (~/.aws/credentials), and is in valid format.",
                    e);
        }

        // Firehose client
        firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        firehoseClient.setRegion(RegionUtils.getRegion(FIRE_HOSE_REGION));

    }

    /**
     * 批量写流
     * 
     * @param recordList
     * @return
     */
    private static PutRecordBatchResult putRecordBatch(List<Record> recordList,
            String deliveryStreamName) {
        PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest();
        putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName);
        putRecordBatchRequest.setRecords(recordList);
        return firehoseClient.putRecordBatch(putRecordBatchRequest);
    }

    /**
     * 更新流配置
     *
     * @throws Exception
     */
    public static void updateDeliveryStream(String deliveryOpenStreamName,
            String s3DestinationUpdateName) throws Exception {
        DeliveryStreamDescription deliveryStreamDescription = describeDeliveryStream(deliveryOpenStreamName);
        LOGGER.info("Updating DeliveryStream Destination: "
                + deliveryOpenStreamName + " with new configuration options");
        // get(0) -> DeliveryStream currently supports only one destination per
        // DeliveryStream
        UpdateDestinationRequest updateDestinationRequest = new UpdateDestinationRequest()
                .withDeliveryStreamName(deliveryOpenStreamName)
                .withCurrentDeliveryStreamVersionId(
                        deliveryStreamDescription.getVersionId())
                .withDestinationId(
                        deliveryStreamDescription.getDestinations().get(0)
                                .getDestinationId());

        S3DestinationUpdate s3DestinationUpdate = new S3DestinationUpdate();
        s3DestinationUpdate.withPrefix(s3DestinationUpdateName);

        updateDestinationRequest.setS3DestinationUpdate(s3DestinationUpdate);

        firehoseClient.updateDestination(updateDestinationRequest);
    }

    /**
     * Method to describe the delivery stream.
     *
     * @param deliveryStreamName
     *            the delivery stream
     * @return the delivery description
     */
    private static DeliveryStreamDescription describeDeliveryStream(
            String deliveryStreamName) {
        DescribeDeliveryStreamRequest describeDeliveryStreamRequest = new DescribeDeliveryStreamRequest();
        describeDeliveryStreamRequest
                .withDeliveryStreamName(deliveryStreamName);
        DescribeDeliveryStreamResult describeDeliveryStreamResponse = firehoseClient
                .describeDeliveryStream(describeDeliveryStreamRequest);
        return describeDeliveryStreamResponse.getDeliveryStreamDescription();
    }

}
上一篇下一篇

猜你喜欢

热点阅读