Azure Synapse 04

2023-08-11  本文已影响0人  山猪打不过家猪

WORDS

  1. 用requires 代替need 表示 依靠,依赖,需要
  2. persisted 持久化

1 What's Synapse?

Synapse 主要有几个功能:

  1. 集成分析,可以将gen2里的各类数据源整合在一起,如data lake, Nosql,使得用户可以从多个数据源中获取数据
  2. 数据仓库,提供了大规模数据存储和查询,采用了分布式架构
  3. 大数据分析:集成了apache spark引擎
  4. 实时数据处理:集成了kafka
  5. 高级的安全性和身份信息:数据加密,身份验证,访问控制


    image.png

2 Sql data warehouse

Host a data warehouse via the azure synapse service。
sql database 和azure sql warehouse的区别:

  1. database适用于传统的web类似于 服务器上托管的sql2008,对于大数据查询性能不好,data warehouse则是分布式的,对大数据友好。
  2. 如果只是web的功能或者只是想进行表格的增删改查,sql database是个很好的选择
  3. 但是除了想增删改查,但是又想进行数据分析,这会给传统的database造成压力,所以data warehouse就是最好的选择
  4. 总而言之,只是进行简单的业务操作使用Sql, 但是既然想业务操作,又想数据分析则warehouse。
image.png

3 Create an Azure synapse

创建synapse需要已经部署好的azure data lake gen2 storage account的账号,我们可以使用现有的,也可以创建一个新的


image.png

4 使用synapse链接外部文件(09,10)

回顾:首先你需要

  1. 一个azure sql database名字为sqldatabase01lg
  2. 创建一个data lake gen2 account 名字为datalake01lg,并且container里面创建两个containers 分别csv用来存放.csv文件,parquert用来存放二进制的qarquert文件,上穿Log.csv Log.qarquert


    image.png

创建一个synapse(06,07)

image.png

use synapse (09,10)

  1. 选择添加外部数据源


    image.png
  2. 选择添加外部数据源的类型,这里是最前面的gen2 container里的数据


    image.png
    image.png
  3. 成功后,我们在Linked里面就可以找到,刚才添加的datalake01lg的外部数据源


    image.png
  4. Then, we clicked the csv and executed to query top 100 data, there was an error occurred,it means we do not have the authority to use this container.


    image.png
  5. datalake02lg needs to be authorized by datalake01lg to use the containers. Now go back to the datalake01lg , find the Access Control and add a role assessment.(give permissions to datalake02lg)


    image.png
给外部account授权使用datalake01lg的权限
  1. 当我们授权给datalake02lg可以读取01里的Container的成功后,在执行上的query,就可以查询成功


    image.png

使用外部数据的重要指令

  1. 创建外部数据源, Create external data source
  2. 指定外部数据源格式, Create external file format
  3. 创建外部数据表, Create external table

读取外部CSV文件(10)

CREATE DATABASE [appdb]; --创建appdb数据库

CREATE MASTER KEY ENCRYPTION BY PASSWORD = 'P@ssword@123' --主加密 密钥

CREATE DATABASE SCOPED CREDENTIAL SasToken  --数据库范围凭证
WITH IDENTITY= 'SHARED ACCESS SIGNATURE',
SECRET = 'sv=2022-11-02&ss=b&srt=sco&sp=rlx&se=2023-08-11T09:42:48Z&st=2023-08-11T01:42:48Z&spr=https&sig=TBRMpm7332EjuSYSFC2LKMUTkcjB8dfi%2B3NTNsPhiRU%3D'


CREATE EXTERNAL DATA SOURCE log_data -- 外部数据源
WITH(LOCATION='https://datalake01lg.blob.core.windows.net/csv',  --读取外部数据源的位置,文件夹
    CREDENTIAL = SasToken ) -- 获取SasToken
--创建外部文件源
CREATE EXTERNAL FILE FORMAT TextFileFormat WITH(
    FORMAT_TYPE = DELIMITEDTEXT,
    FORMAT_OPTIONS(
        FIELD_TERMINATOR=',',
        FIRST_ROW = 2
    )
)

--创建外部表
CREATE EXTERNAL TABLE [logdata]
(
    [Correlation id] [varchar](200) NULL, -- 必须和csv文件里的表头一致
    [Operation name] [varchar](200) NULL,
    [Status] [varchar](100) NULL,
    [Event category] [varchar](100) NULL,
    [Level] [varchar](100) NULL,
    [Time] [datetime] NULL,
    [Subscription] [varchar](200) NULL,
    [Event initiated by] [varchar](1000) NULL,
    [Resource type] [varchar](1000) NULL,
    [Resource group] [varchar](1000) NULL,
    [Resource] [varchar](2000) NULL
)
WITH(
    LOCATION ='/Log.csv',  --上面地址的根目录下的文件
    DATA_SOURCE = log_data, --上面的创建的data source名
    FILE_FORMAT = TextFileFormat --上面指定的文件格式
)

SELECT * FROM [logdata]

image.png

读取外部parquet 文件

  1. drop 外部表
  2. drop 外部数据源
  3. drop 过期的token
  4. 用上面的方法重新生成新的sastoken
DROP EXTERNAL TABLE [logdata]

DROP EXTERNAL DATA SOURCE log_data

DROP DATABASE SCOPED CREDENTIAL SasToken

CREATE DATABASE SCOPED CREDENTIAL SasToken  --创建数据库代码凭证
WITH IDENTITY= 'SHARED ACCESS SIGNATURE',
SECRET = 'sv=2022-11-02&ss=b&srt=sco&sp=rlx&se=2023-08-11T10:15:25Z&st=2023-08-11T02:15:25Z&spr=https&sig=nzwVeqZUBPDHuQETSfeN0nIBy7mOAGYhXzWzXqVKAZ0%3D'


CREATE EXTERNAL DATA SOURCE log_data_parquet -- 创建外部数据格式
WITH(LOCATION='https://datalake01lg.blob.core.windows.net/parquet',  --读取外部数据源的位置,文件夹parquet
    CREDENTIAL = SasToken ) -- 获取SasToken


--创建外部文件源parquet
CREATE EXTERNAL FILE FORMAT ParquetFileFormat WITH(
    FORMAT_TYPE = PARQUET,
    DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec'
    )


--创建外部表 
CREATE EXTERNAL TABLE [logdata_parquet]
(
    [Correlation id] [varchar](200) NULL, -- 必须和csv文件里的表头一致
    [Operation name] [varchar](200) NULL,
    [Status] [varchar](100) NULL,
    [Event category] [varchar](100) NULL,
    [Level] [varchar](100) NULL,
    [Time] [varchar](100) NULL,
    [Subscription] [varchar](200) NULL,
    [Event initiated by] [varchar](1000) NULL,
    [Resource type] [varchar](1000) NULL,
    [Resource group] [varchar](1000) NULL,
    [Resource] [varchar](2000) NULL
)
WITH(
    LOCATION ='/Log.parquet',  --上面地址的根目录下的文件
    DATA_SOURCE = log_data_parquet, --上面的创建的data source名
    FILE_FORMAT = ParquetFileFormat --上面指定的文件格式
)

SELECT * FROM [logdata_parquet]

image.png
  1. csv表中的Time是时间类型,而parquet里面都是字符串
  2. 其次在parquet文件中,列名是不能有空格的,所以这个结果下所有有空格的列的值都为空,所以删除列明中的空格就可以成功的显示数据

创建一个SQL pool(016)

使用sql pool读取外部的csv表格(017)

CREATE DATABASE SCOPED CREDENTIAL AzureStorageCredential
WITH
  IDENTITY = 'datalake01lg',
  SECRET = 'ZuTNLYAYPx1gmjZHE2+pIizxi4zwOHBYX395HDVQ/wM47yfhHBEBppzMKVegfROIi99SCwCWD04F+AStQjvUxw==';


CREATE EXTERNAL DATA SOURCE log_data
WITH (    LOCATION   = 'abfss://csv@datalake01lg.dfs.core.windows.net/parquet',
          CREDENTIAL = AzureStorageCredential,
          TYPE = HADOOP
)

在sql pool 里加载数据 019

复制csv到sql pool里
COPY INTO [pool_logdata] FROM 'https://datalake010lg.blob.core.windows.net/csv/Log.csv'
WITH(
    FILE_TYPE='CSV',
    CREDENTIAL=(IDENTITY='Storage Account Key',SECRET='t7Wc/rXNJ5fXU45W6NVxMYUG3XG5ge8dUVlmvSbpMMGonY0puqMDuA4DIcGT3IwA6sYIqQSHcu9U+AStvZLNcw=='),
    FIRSTROW=2)

SELECT * FROM [pool_logdata]
复制parquet到sql pool里
CREATE TABLE [pool_logdata_parquet]
(
    [Correlationid] [varchar](200) NULL,
    [Operationname] [varchar](200) NULL,
    [Status] [varchar](100) NULL,
    [Eventcategory] [varchar](100) NULL,
    [Level] [varchar](100) NULL,
    [Time] [varchar](500) NULL,
    [Subscription] [varchar](200) NULL,
    [Eventinitiatedby] [varchar](1000) NULL,
    [Resourcetype] [varchar](1000) NULL,
    [Resourcegroup] [varchar](1000) NULL,
    [Resource] [varchar](2000) NULL)
COPY INTO [pool_logdata_parquet] FROM 'https://datalake244434.blob.core.windows.net/parquet/log.parquet'
WITH(
    FILE_TYPE='PARQUET',
    CREDENTIAL=(IDENTITY='Storage Account Key',SECRET='vDV2bSKSR44lbE6x05HtFz57DvlK3O2WNkb11te+H+GrBjeXCojnHjiTw3KdYBWXJRSAnOAZNdgB+AStAasz8w==') )

SELECT * FROM [pool_logdata_parquet]

SELECT * FROM [logdata_parquet]

使用Bulk load 加载数据到表中022

使用PolyBase复制external数据到sql pool的表里023

一种更加有效的方法来加载外部的数据到sql pool里
之前的方法是1. 创建数据库凭证 2.创建外部数据源链接 3.创建外部表结构,这样就建立了外部数据的链接,
如果我们想复用这些的话,可以使用下面代码,读取系统里的外部表,这样我们就可以很轻松的创建各种表,只用创建一次1,2,3

SELECT * FROM sys.database_scoped_credentials  --读取外部数据凭证

SELECT * FROM sys.external_data_sources  --查询外部数据源

SELECT * FROM sys.external_file_formats --查询外部文件

使用synapse的pipeline复制外部数据到pool里

使用pipeline复制database的数据到sql pool里

设计data warehouse

创建一个Fact table

创建一个Dimension表

和上面的表一样,我们使用View试图创建表

-- Lab - Building a dimension table
-- Lets build a view for the customers

CREATE VIEW Customer_view 
AS
  SELECT ct.[CustomerID],ct.[CompanyName],ct.[SalesPerson]
  FROM [SalesLT].[Customer] as ct  

-- Lets create a customer dimension table

SELECT [CustomerID],[CompanyName],[SalesPerson]
INTO DimCustomer
FROM Customer_view 


-- Let's build a view of the products

CREATE VIEW Product_view 
AS
SELECT prod.[ProductID],prod.[Name] as ProductName,model.[ProductModelID],model.[Name] as ProductModelName,category.[ProductcategoryID],category.[Name] AS ProductCategoryName
FROM [SalesLT].[Product] prod
LEFT JOIN [SalesLT].[ProductModel] model ON prod.[ProductModelID] = model.[ProductModelID]
LEFT JOIN [SalesLT].[ProductCategory] category ON prod.[ProductcategoryID]=category.[ProductcategoryID]

-- Lets create a product dimension table

SELECT [ProductID],[ProductModelID],[ProductcategoryID],[ProductName],[ProductModelName],[ProductCategoryName]
INTO DimProduct
FROM Product_view 

-- If you want to drop the views and the tables

DROP VIEW Customer_view 

DROP TABLE DimCustomer

DROP VIEW Product_view 

DROP TABLE DimProduct

从sqldatabase里迁移数据到sql pool里面

设计表

创建一个哈希表(Hash Table)

Fact table 使用哈希表, dimension table使用复制表

CREATE TABLE [dbo].[FactSales](
    [ProductID] [int] NOT NULL,
    [SalesOrderID] [int] NOT NULL,
    [CustomerID] [int] NOT NULL,
    [OrderQty] [smallint] NOT NULL,
    [UnitPrice] [money] NOT NULL,
    [OrderDate] [datetime] NULL,
    [TaxAmt] [money] NULL
)
WITH  
(   
    DISTRIBUTION = HASH (CustomerID)
)

创建一个复制表(replicated Tables)

-- Replicated Tables

CREATE TABLE [dbo].[FactSales](
    [ProductID] [int] NOT NULL,
    [SalesOrderID] [int] NOT NULL,
    [CustomerID] [int] NOT NULL,
    [OrderQty] [smallint] NOT NULL,
    [UnitPrice] [money] NOT NULL,
    [OrderDate] [datetime] NULL,
    [TaxAmt] [money] NULL
)
WITH  
(   
    DISTRIBUTION = REPLICATE
)

使用surrrogate keys for dimension tables


CREATE TABLE [dbo].[DimProduct](
    [ProductSK] [int] IDENTITY(1,1) NOT NULL,  -- surrogate key
    [ProductID] [int] NOT NULL,
    [ProductModelID] [int] NOT NULL,
    [ProductcategoryID] [int] NOT NULL,
    [ProductName] varchar(50) NOT NULL, 
    [ProductModelName] varchar(50) NULL,
    [ProductCategoryName] varchar(50) NULL
)

Slowly Changing the dimension table

Indexs

创建聚集性索引(clustered index) 和非聚集性索引(none-clustered index)

CREATE TABLE [pool_logdata]
(
    [Correlation id] [varchar](200) NULL,
    [Operation name] [varchar](200) NULL,
    [Status] [varchar](100) NULL,
    [Event category] [varchar](100) NULL,
    [Level] [varchar](100) NULL,
    [Time] [datetime] NULL,
    [Subscription] [varchar](200) NULL,
    [Event initiated by] [varchar](1000) NULL,
    [Resource type] [varchar](1000) NULL,
    [Resource group] [varchar](1000) NULL,
    [Resource] [varchar](2000) NULL
)
WITH  
(   
    DISTRIBUTION = HASH ([Operation name])
)

  DBCC PDW_SHOWSPACEUSED('[dbo].[pool_logdata]')
image.png
-- Note on creating Indexes

CREATE TABLE [logdata]
(
    [Correlation id] [varchar](200) NULL,
    [Operation name] [varchar](200) NULL,
    [Status] [varchar](100) NULL,
    [Event category] [varchar](100) NULL,
    [Level] [varchar](100) NULL,
    [Time] [datetime] NULL,
    [Subscription] [varchar](200) NULL,
    [Event initiated by] [varchar](1000) NULL,
    [Resource type] [varchar](1000) NULL,
    [Resource group] [varchar](1000) NULL,
    [Resource] [varchar](2000) NULL
)
WITH  
(   
    DISTRIBUTION = HASH ([Operation name]),
    CLUSTERED INDEX ([Resource type]) -- 聚集性索引
)


CREATE INDEX EventCategoryIndex ON [logdata] ([Event category]) --非聚集性索引

创建一个heap table 临时表

少于6000w数据的临时表,使用heap table,大于这个数使用clustered columnstore table.

-- Heap tables

CREATE TABLE [logdata]
(
    [Correlation id] [varchar](200) NULL,
    [Operation name] [varchar](200) NULL,
    [Status] [varchar](100) NULL,
    [Event category] [varchar](100) NULL,
    [Level] [varchar](100) NULL,
    [Time] [datetime] NULL,
    [Subscription] [varchar](200) NULL,
    [Event initiated by] [varchar](1000) NULL,
    [Resource type] [varchar](1000) NULL,
    [Resource group] [varchar](1000) NULL,
    [Resource] [varchar](2000) NULL
)
WITH  
(   
    HEAP,
    DISTRIBUTION = ROUND_ROBIN
)

CREATE INDEX ResourceTypeIndex ON [logdata] ([Resource type])

遍历分区表

创建一个Partition table

CREATE TABLE [logdata]
(
    [Correlation id] [varchar](200) NULL,
    [Operation name] [varchar](200) NULL,
    [Status] [varchar](100) NULL,
    [Event category] [varchar](100) NULL,
    [Level] [varchar](100) NULL,
    [Time] [datetime] NULL,
    [Subscription] [varchar](200) NULL,
    [Event initiated by] [varchar](1000) NULL,
    [Resource type] [varchar](1000) NULL,
    [Resource group] [varchar](1000) NULL,
    [Resource] [varchar](2000) NULL
)
WITH  
(   
    DISTRIBUTION = HASH ([Operation name]),
    PARTITION ( [Time] RANGE RIGHT FOR VALUES
            ('2023-01-01','2023-02-01','2023-03-01','2023-04-01'))
)

读取json数据

上一篇 下一篇

猜你喜欢

热点阅读