But How to Ingest Data in Snowflake from AWS

But How to Ingest Data in Snowflake from AWS

Here is the comprehensive guide on how to ingest data in Snowflake using snowpipe. Snowpipe is the Continuous Data Ingestion service offered by Snowflake. While reading the article helps, it will be great if you learn by doing. So if you plan to follow along make sure you fulfill the following prerequisites.

Prerequisites:

  • You should have an active AWS account.

  • You should have an active Snowflake account.

If you are done with the above points let's jump in.

  • In your console search S3 and click on it.

  • Click on Create bucket(or you can use the existing one).

  • Give bucket name. In the real world you'll probably change some settings but for now, leave the default settings and click Create bucket at the last.

  • You will see a bucket with the name you provided created. Click on it.

  • Let's create a folder to create a grouping within the bucket. Click Create folder.

  • Give a name and then click Create folder. We will upload the data in this folder later in the blog.

  • Now go to the console and search IAM. Then click on Policies on the left panel.

  • Click on Create policy. Here we will define permissions for who has access to resources and what actions they can perform on those resources, etc.

  • Click on JSON tab and put the below json body in the editor.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:GetObjectVersion",
                "s3:DeleteObject",
                "s3:DeleteObjectVersion"
            ],
            "Resource": "arn:aws:s3:::<bucket_name>/*"
        },
        {
            "Effect": "Allow",
            "Action": "s3:ListBucket",
            "Resource": "arn:aws:s3:::<bucket_name>"
        }
    ]
}

Replace <bucket_name> with the name of the bucket you created earlier. This policy grants the permissions(like adding and deleting files) necessary to access the bucket we created and its content. Now click on Next: Tags button.

  • This is the same as the tags we use in social media. Here we put tags to help identify, organize, or search for resources. For now, leave and click on Next: Review button.

  • Here give a meaningful name to the policy and click Create policy.

  • The policy should be assigned to a role. Let's create a role. Click on Roles.

  • Click of Create role button.

  • Select AWS account in the available options. Then click NEXT

  • You will get a list of policies. From here, select the policy you created. Click Next.

  • Here give a name to that role. Then Create role.

  • Here you will get a list of roles. Click on the role you created.

  • It will show you the summary page. Copy the ARN name. We will use this later.

  • Now let's open your Snowflake account.

  • We have to create cloud storage Integration in Snowflake and map S3 role with it. For that use the below query

CREATE OR REPLACE STORAGE INTEGRATION s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN ='<role_arn>'
STORAGE_ALLOWED_LOCATIONS =('<s3>');

In <role_arn> use the ARN that you copied earlier and replace <s3> with your s3 bucket name. Here s3_int is the name of the integration.

  • Let's see what information we have in this integration. Execute below command
DESC INTEGRATION s3_int;

We will get information like storage provider(S3), storage bucket name, and other details. From the list of details, copy the STORAGE_AWS_IAM_USER_ARN.

  • Now inside AWS go to IAM> Roles. Select the role you created. Then go to Trust relationships.

  • You will see a JSON structure. Click on Edit trust policy button and replace the value written in front of "AWS" with the value of STORAGE_AWS_IAM_USER_ARN you copied. The click Update policy.

  • Now go to the snowflake and create a file format for our CSV file. To do that copy the below command and run it.

      CREATE FILE FORMAT "<your_db>"."PUBLIC".CSV_FORMAT TYPE = 'CSV' COMPRESSION = 'AUTO' FIELD_DELIMITER = ',' RECORD_DELIMITER = '\n' SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = 'NONE' TRIM_SPACE = FALSE ERROR_ON_COLUMN_COUNT_MISMATCH = TRUE ESCAPE = 'NONE' ESCAPE_UNENCLOSED_FIELD = '\134' DATE_FORMAT = 'AUTO' TIMESTAMP_FORMAT = 'AUTO' NULL_IF = ('\\N');
    

Replace <your_db> with the name of your database. Click on Databases in the top left corner and you can use existing databases.

  • Create a stage in snowflake pointing to your S3 bucket:
CREATE OR REPLACE STAGE STUDENT_SP
URL ='s3://<bucket_name>/<folder>'
file_format = CSV_HEALTHCARE
storage_integration = s3_int;

Replace <bucket_name> and <folder> with your bucket name and folder respectively. Here STUDENT_SP is the name of the stage.

  • Now run the below command to check if we are able the point S3 successfully.

      SHOW STAGES;
    
  • On successful execution of the above query, you will see details like STAGE name, created on, etc.

  • Now for the demonstration, I'm going to create a table Student.

      CREATE OR REPLACE TABLE Student(
          student_id INT PRIMARY KEY,
          Student_name varchar(20)
      );
    

Remember this is SNOWFLAKE's syntax.

  • Now comes the interesting part. We will create snowpipe that recognizes csv that is ingested from the external stage and copies the data into the student's table.

      CREATE OR REPLACE PIPE STUDENT_SNOWPIPE AUTO_INGEST = TRUE AS
      COPY INTO "<your_db>"."<schema>"."<table_name>"
      FROM @STUDENT_SP
      FILE_FORMAT = CSV_HEALTHCARE;
    

Replace <your_db>, <schema>, and <table_name> with appropriate names. Here STUDENT_SNOWPIPE is the name of the pipe.

  • Now to test our pipeline we have to put data in the AWS bucket let's go to our bucket and then folder. You will see something like this

    Click on Upload.

  • Create a CSV file or download it online. My csv file looks like this

  • Once you click on Upload you will see something this

  • Click on Add files and select the file you created. Then click on Upload button at the bottom.

  • Now go to Snowflake to see whether the data is uploaded to your table or not. Run below command

      SELECT * FROM "STUDENT";
    

    Replace STUDENT with your table name. Sometimes it takes longer as for me it took 80 seconds to upload 3 records. My output of the above query looks like this

  • Now the supply of data should be continuous, so we expect that if we enter another file then it should reflect on the table. Let's add another file in our AWS folder using the same steps as mentioned.

  • It took 140 seconds to upload the second file with the same 3 records. You can run the below command to see whether the name of your csv file is visible or not to debug.

      AlTER STUDENT_SNOWPIPE REFRESH;
    

    Here is the sample output of the above command

  • Here is the table content after uploading the second file

    As you can see the latest data gets uploaded over previous data.

  • At last, let's delete all the created objects.

      DROP INTEGRATION s3_int;
      DROP STAGE "<your_db>"."<schema>"."STUDENT_SP";
      DROP PIPE "<your_db>"."<schema>"."STUDENT_SNOWPIPE";
    

Congratulations you have learned how to ingest data in Snowflake using AWS. As you have learned this what are you going to build. Let me know in the comments.