Snowflake(1)

2024-02-16  本文已影响0人  山猪打不过家猪

1. 从s3加载数据

1. aws s3设置以及挂载bucket到snowflake里

  1. 挂载外部STAGE S3
CREATE STORAGE INTEGRATION s3_integration
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = S3
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam:role/snowflake-role'  #s3里的arn
  STORAGE_ALLOWED_LOCATIONS = ('s3://lgbucket101/snowflake/'); #folder文件夹
image.png

3.查看storage的属性

DESC INTEGRATION s3_integration
image.png

4.建立snowflake与s3的联系
在s3里添加③的参数


image.png

2. snowflake导入s3里的csv表

1.创建一个HEALTHCARE的表

  1. 创建csv的格式规则
create or replace file format demo_db.public.csv_format
                    type = csv
                    field_delimiter = '|'
                    skip_header = 1
                    null_if = ('NULL', 'null')
                    empty_field_as_null = true;
  1. 给外部表指定格式规则
create or replace stage demo_db.public.ext_csv_stage
  URL = 's3://lgbucket101/snowflake/csv/health_pipe.csv'
  STORAGE_INTEGRATION = s3_integration
  file_format = demo_db.public.csv_format;

4.将s3的csv拷贝到snowflake的表里

-- Use copy command to ingest data from S3
copy into healthcare
from @demo_db.public.ext_csv_stage
on_error = CONTINUE;

3. snowflake导入s3里的parquet表

  1. 创建parquet的表
CREATE or replace TABLE HEALTHCARE_PARQUET(
    AVERAGE_COVERED_CHARGES    VARCHAR(150)  
   ,AVERAGE_TOTAL_PAYMENTS    VARCHAR(150)  
   ,TOTAL_DISCHARGES    VARCHAR(150)   
   ,BACHELORORHIGHER    VARCHAR(150)   
   ,HSGRADORHIGHER    VARCHAR(150)   
   ,TOTALPAYMENTS    VARCHAR(128)  
   ,REIMBURSEMENT    VARCHAR(128)  
   ,TOTAL_COVERED_CHARGES    VARCHAR(128) 
   ,REFERRALREGION_PROVIDER_NAME    VARCHAR(256)  
   ,REIMBURSEMENTPERCENTAGE    VARCHAR(150)   
   ,DRG_DEFINITION    VARCHAR(256)  
   ,REFERRAL_REGION    VARCHAR(26)  
   ,INCOME_PER_CAPITA    VARCHAR(150)   
   ,MEDIAN_EARNINGSBACHELORS    VARCHAR(150)   
   ,MEDIAN_EARNINGS_GRADUATE    VARCHAR(150) 
   ,MEDIAN_EARNINGS_HS_GRAD    VARCHAR(150)   
   ,MEDIAN_EARNINGSLESS_THAN_HS    VARCHAR(150) 
   ,MEDIAN_FAMILY_INCOME    VARCHAR(150)   
   ,NUMBER_OF_RECORDS    VARCHAR(150)  
   ,POP_25_OVER    VARCHAR(150)  
   ,PROVIDER_CITY    VARCHAR(128)  
   ,PROVIDER_ID    VARCHAR(150)   
   ,PROVIDER_NAME    VARCHAR(256)  
   ,PROVIDER_STATE    VARCHAR(128)  
   ,PROVIDER_STREET_ADDRESS    VARCHAR(256)  
   ,PROVIDER_ZIP_CODE    VARCHAR(150) 
   ,filename    VARCHAR
   ,file_row_number VARCHAR
   ,load_timestamp timestamp default TO_TIMESTAMP_NTZ(current_timestamp)
);
  1. 创建名为ext_parquet_stage的external stage
create or replace stage demo_db.public.ext_parquet_stage
  URL = 's3://lgbucket101/snowflake/parquet/health.parquet'
  STORAGE_INTEGRATION = s3_integration
  file_format = demo_db.public.parquet_format;

3.复制s3的数据到snowflake的表里

copy into demo_db.public.healthcare_parquet
from (select 
$1:average_covered_charges::varchar,
$1:average_total_payments::varchar,
$1:total_discharges::varchar,
$1:"PERCENT_Bachelor's_or_Higher"::varchar,
$1:percent_hs_grad_or_higher::varchar,
$1:total_payments::varchar,
$1:percent_reimbursement::varchar,
$1:total_covered_charges::varchar,
$1:referral_region_provider_name::varchar,
$1:reimbursement_percentage::varchar,
$1:drg_definition::varchar,
$1:referral_region::varchar,
$1:income_per_capita::varchar,
$1:median_earnings_bachelors::varchar,
$1:median_earnings_graduate::varchar,
$1:median_earnings_hs_grad::varchar,
$1:median_earnings_less_than_hs::varchar,
$1:median_family_income::varchar,
$1:number_of_records::varchar,
$1:pop_25_over::varchar,
$1:provider_city::varchar,
$1:provider_id::varchar,
$1:provider_name::varchar,
$1:provider_state::varchar,
$1:provider_street_address::varchar,
$1:provider_zip_code::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @demo_db.public.ext_parquet_stage);
  1. 查看我们已经创建的file format和external stage


    image.png

4. 加载JSON表格

  1. 创建json_data的schema,类似于命名空间
create schema json_data
  1. 创建储存s3数据的表health_json

CREATE or replace TABLE HEALTHCARE_JSON(
    id VARCHAR(50)
   ,AVERAGE_COVERED_CHARGES    VARCHAR(150)  
   ,AVERAGE_TOTAL_PAYMENTS    VARCHAR(150)  
   ,TOTAL_DISCHARGES    INTEGER
   ,BACHELORORHIGHER    FLOAT
   ,HSGRADORHIGHER    VARCHAR(150)   
   ,TOTALPAYMENTS    VARCHAR(128)  
   ,REIMBURSEMENT    VARCHAR(128)  
   ,TOTAL_COVERED_CHARGES    VARCHAR(128) 
   ,REFERRALREGION_PROVIDER_NAME    VARCHAR(256)  
   ,REIMBURSEMENTPERCENTAGE    VARCHAR(150)   
   ,DRG_DEFINITION    VARCHAR(256)  
   ,REFERRAL_REGION    VARCHAR(26)  
   ,INCOME_PER_CAPITA    VARCHAR(150)   
   ,MEDIAN_EARNINGSBACHELORS    VARCHAR(150)   
   ,MEDIAN_EARNINGS_GRADUATE    VARCHAR(150) 
   ,MEDIAN_EARNINGS_HS_GRAD    VARCHAR(150)   
   ,MEDIAN_EARNINGSLESS_THAN_HS    VARCHAR(150) 
   ,MEDIAN_FAMILY_INCOME    VARCHAR(150)   
   ,NUMBER_OF_RECORDS    VARCHAR(150)  
   ,POP_25_OVER    VARCHAR(150)  
   ,PROVIDER_CITY    VARCHAR(128)  
   ,PROVIDER_ID    VARCHAR(150)   
   ,PROVIDER_NAME    VARCHAR(256)  
   ,PROVIDER_STATE    VARCHAR(128)  
   ,PROVIDER_STREET_ADDRESS    VARCHAR(256)  
   ,PROVIDER_ZIP_CODE    VARCHAR(150) 
   ,filename    VARCHAR
   ,file_row_number VARCHAR
   ,load_timestamp timestamp default TO_TIMESTAMP_NTZ(current_timestamp)
);
  1. 创建json的format
--create json format
create or replace file format demo_db.public.json_format
  type = 'json';

  1. 创建于S3的联系storage
create or replace stage demo_db.public.ext_json_stage
  URL = 's3://lgbucket101/snowflake/json/healthcare_providers.json'
  STORAGE_INTEGRATION = s3_integration
  file_format = demo_db.public.json_format;

5.将数据写入到snowflake的表里

copy into demo_db.json_data.healthcare_json
from (select 
$1:"_id"::varchar,
$1:" Average Covered Charges "::varchar,
$1:" Average Total Payments "::varchar,
$1:" Total Discharges "::integer,
$1:"% Bachelor's or Higher"::float,
$1:"% HS Grad or Higher"::varchar,
$1:"Total payments"::varchar,
$1:"% Reimbursement"::varchar,
$1:"Total covered charges"::varchar,
$1:"Referral Region Provider Name"::varchar,
$1:"ReimbursementPercentage"::varchar,
$1:"DRG Definition"::varchar,
$1:"Referral Region"::varchar,
$1:"INCOME_PER_CAPITA"::varchar,
$1:"MEDIAN EARNINGS - BACHELORS"::varchar,
$1:"MEDIAN EARNINGS - GRADUATE"::varchar,
$1:"MEDIAN EARNINGS - HS GRAD"::varchar,
$1:"MEDIAN EARNINGS- LESS THAN HS"::varchar,
$1:"MEDIAN_FAMILY_INCOME"::varchar,
$1:"Number of Records"::varchar,
$1:"POP_25_OVER"::varchar,
$1:"Provider City"::varchar,
$1:"Provider Id"::varchar,
$1:"Provider Name"::varchar,
$1:"Provider State"::varchar,
$1:"Provider Street Address"::varchar,
$1:"Provider Zip Code"::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @demo_db.public.ext_json_stage);

5. 使用Snowpipe service 自动加载s3的数据

自动加载csv

1.创建external stage

create or replace stage ext_csv_stage_pipe
  URL = 's3://lgbucket101/snowflake/csv'
  STORAGE_INTEGRATION = s3_integration
  file_format = csv_format;

2.创建snowpipe

  --create pipe to automate data ingestion from s3 to snowflake
create or replace pipe demo_db.public.mypipe_csv auto_ingest=true as
copy into HEALTHCARE
from @demo_db.public.ext_csv_stage_pipe
on_error = CONTINUE;

3.查看pipe的状态并复制notification

show pipes
image.png

4.S3的bukect里设置,提醒snowflake,s3有文件,就可以传输,实现自动传输


image.png
image.png

5.设置成功后,就可以看到有事件了


image.png
6.我们给s3里上传一个新的health_care文件,此时,snowflake将自动加载该文件到database里
image.png
自动加载parquet
  1. 创建pipe stage
--创建parquet pipe的stage
create or replace stage ext_parquet_stage_pipe
  URL = 's3://lgbucket101/snowflake/parquet'
  STORAGE_INTEGRATION = s3_integration
  file_format = parquet_format;
  1. 创建pipe
--create pipe to automate data ingestion from s3 to snowflake
create or replace pipe demo_db.public.mypipe_parqeut auto_ingest=true as
copy into demo_db.public.healthcare_parquet
from (select 
$1:average_covered_charges::varchar,
$1:average_total_payments::varchar,
$1:total_discharges::varchar,
$1:"PERCENT_Bachelor's_or_Higher"::varchar,
$1:percent_hs_grad_or_higher::varchar,
$1:total_payments::varchar,
$1:percent_reimbursement::varchar,
$1:total_covered_charges::varchar,
$1:referral_region_provider_name::varchar,
$1:reimbursement_percentage::varchar,
$1:drg_definition::varchar,
$1:referral_region::varchar,
$1:income_per_capita::varchar,
$1:median_earnings_bachelors::varchar,
$1:median_earnings_graduate::varchar,
$1:median_earnings_hs_grad::varchar,
$1:median_earnings_less_than_hs::varchar,
$1:median_family_income::varchar,
$1:number_of_records::varchar,
$1:pop_25_over::varchar,
$1:provider_city::varchar,
$1:provider_id::varchar,
$1:provider_name::varchar,
$1:provider_state::varchar,
$1:provider_street_address::varchar,
$1:provider_zip_code::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @demo_db.public.ext_parquet_stage_pipe);

3.此时我们show pipes会发现,同一个bukect的notification是相同的,所以我们不需要在去s3里设置了
4.上传新的parqeut去S3里,就可以自动加载到snowflake了

自动加载json

1.json pipe的stage

create or replace stage ext_json_stage_pipe
  URL = 's3://lgbucket101/snowflake/json'
  STORAGE_INTEGRATION = s3_integration
  file_format = json_format;

  1. 创建pipe
create or replace pipe demo_db.public.mypipe_json auto_ingest=true as
copy into demo_db.json_data.healthcare_json from (select 
$1:"_id"::varchar,
$1:" Average Covered Charges "::varchar,
$1:" Average Total Payments "::varchar,
$1:" Total Discharges "::integer,
$1:"% Bachelor's or Higher"::float,
$1:"% HS Grad or Higher"::varchar,
$1:"Total payments"::varchar,
$1:"% Reimbursement"::varchar,
$1:"Total covered charges"::varchar,
$1:"Referral Region Provider Name"::varchar,
$1:"ReimbursementPercentage"::varchar,
$1:"DRG Definition"::varchar,
$1:"Referral Region"::varchar,
$1:"INCOME_PER_CAPITA"::varchar,
$1:"MEDIAN EARNINGS - BACHELORS"::varchar,
$1:"MEDIAN EARNINGS - GRADUATE"::varchar,
$1:"MEDIAN EARNINGS - HS GRAD"::varchar,
$1:"MEDIAN EARNINGS- LESS THAN HS"::varchar,
$1:"MEDIAN_FAMILY_INCOME"::varchar,
$1:"Number of Records"::varchar,
$1:"POP_25_OVER"::varchar,
$1:"Provider City"::varchar,
$1:"Provider Id"::varchar,
$1:"Provider Name"::varchar,
$1:"Provider State"::varchar,
$1:"Provider Street Address"::varchar,
$1:"Provider Zip Code"::varchar,
METADATA$FILENAME,
METADATA$FILE_ROW_NUMBER,
TO_TIMESTAMP_NTZ(current_timestamp)
from @demo_db.public.ext_json_stage_pipe);

3. Azure to Snowflake

1.创建azure的integration

方法1:没权限
--创建azure 的integration
CREATE STORAGE INTEGRATION azure_integration
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = AZURE
  ENABLED = TRUE
  AZURE_TENANT_ID = 'c56908eb-1777-40d2-86f2-1d134e61a0fd'  
  STORAGE_ALLOWED_LOCATIONS = ('azure://snowflake101lg.blob.core.windows.net/snowflake');
image.png

2.查看integration

--查看integration
DESC INTEGRATION azure_integration
方法2:使用blob的access key来创建

1.azure 生成blob的access key

image.png
2.直接使用sas复制表,注意?要在复制的时候删除,任何格式都可以使用该方法复制
COPY INTO snowflake表名
FROM `azure://文件在azure上的具体位置`
CREDENTIALS=(AZURE_SAS_TOKEN='azure blob里的sas token 无?')
FILE_FORMAT = 自定义的csv格式;
COPY INTO HEALTHCARE
  FROM 'azure://snowflake101lg.blob.core.windows.net/snowflake/csv/snowpipe_sample.csv'
  CREDENTIALS=(AZURE_SAS_TOKEN='sv=2022-11-02&ss=bfqt&srt=sco&sp=rwdlacupyx&se=2024-02-18T11:15:28Z&st=2024-02-18T03:15:28Z&spr=https,http&sig=0QlPdlg5%2BCl1zXB0%2FkYtFmDOHKWcARuEmoZYAXRNLK4%3D')
  FILE_FORMAT= csv_format;

2.snowpipe azure 自己完成

pass

上一篇下一篇

猜你喜欢

热点阅读