[go: nahoru, domu]

Destination Snowflake

Load data from Data Platform destination to Snowflake

From Destination AWS S3

Pre-requisits
  • Have Dreamdata's data platform AWS S3 destination enabled (go to this blog to enable it if you haven't!)
From now on, <S3_BUCKET> refers to the bucket name used in Dreamdata's data platform AWS S3 destination

Configuring the Snowflake storage integration

This section is based on snowflake's documentation, which is already very complete.

  1. Create an S3 storage integration (Snowflake)
    CREATE STORAGE INTEGRATION <S3_INT>
    TYPE = EXTERNAL_STAGE
    STORAGE_PROVIDER = 'S3'
    ENABLED = TRUE
    STORAGE_AWS_ROLE_ARN = '<AWS_ROLE_ARN>'
    STORAGE_ALLOWED_LOCATIONS = ('s3://<S3_BUCKET>')
    From now on, <S3_INT> refers to the storage integration created above
  2. Fetch AWS credentials created by Snowflake as part of the storage integration (Snowflake)
    DESC INTEGRATION <S3_INT>;

    -- Find AWS IAM User ARN under STORAGE_AWS_IAM_USER_ARN
    -- Find AWS External ID under STORAGE_AWS_EXTERNAL_ID
    From now on, <STORAGE_AWS_IAM_USER_ARN> will be used
    From now on, <STORAGE_AWS_EXTERNAL_ID> will be used
  3. Create an AWS Role (AWS)
    From now on, <AWS_ROLE_ARN> refers to the ARN of the newly created role
  4. Create an AWS S3 policy that allows access to the S3 bucket (AWS)
    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",

    "Action": [
    "s3:GetObject",
    "s3:GetObjectVersion"
    ],
    "Resource": "arn:aws:s3:::<S3_BUCKET>/*"

    },
    {
    "Effect": "Allow",
    "Action": [
    "s3:ListBucket",
    "s3:GetBucketLocation"
    ],
    "Resource": "arn:aws:s3:::<S3_BUCKET>"
    }
    ]
    }
  5. Attach the policy created in (4) to the role created in (3) (AWS)
  6. Only allow <STORAGE_AWS_IAM_USER_ARN> to assume the role created in (3). To do so, change role's trusted policies to (AWS)
    {
    "Version": "2012-10-17",
    "Statement": [
    {
    "Effect": "Allow",
    "Principal": {
    "AWS": "<STORAGE_AWS_IAM_USER_ARN>"
    },
    "Action": "sts:AssumeRole",
    "Condition": {
    "StringEquals": {
    "sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>"
    }
    }
    }
    ]
    }

Load the data
  1. Define the variables that we will need
    -- vars
    SET BUCKET = '<S3_BUCKET>';
    SET STORAGE_INT = '<S3_INT>';
  2. Parse the receipt file
    SET RECEIPT_FILE = 'receipt.json';
    SET RECEIPT_URL = concat($BUCKET, '/', $RECEIPT_FILE);

    CREATE OR REPLACE STAGE receipt
    STORAGE_INTEGRATION = $STORAGE_INT
    URL = $RECEIPT_URL
    FILE_FORMAT = (TYPE = JSON);

    CREATE OR REPLACE TEMPORARY TABLE receipt AS
    SELECT
    key AS table_name,
    $BUCKET || '/' || value:folder AS folder,
    FROM
    @receipt,
    LATERAL FLATTEN(PARSE_JSON($1):tables);
  3. Create a stage per table found in the receipt
    CREATE OR REPLACE PROCEDURE STAGES()
    RETURNS string
    LANGUAGE JAVASCRIPT
    EXECUTE AS CALLER
    AS
    $$
    try {
    // Define your SQL statement
    var sqlStatement = "SELECT TABLE_NAME, FOLDER FROM receipt";

    // Execute the SQL statement
    var stmt = snowflake.createStatement({sqlText: sqlStatement});
    var rs = stmt.execute();

    // Get the URL value from the result set
    var url = "";
    var stage = "";
    while (rs.next()) {
    stage = rs.getColumnValue("TABLE_NAME");
    url = rs.getColumnValue("FOLDER");

    // Build the CREATE STAGE statement
    var createStageStatement = "CREATE OR REPLACE STAGE " + stage + " " +
    "STORAGE_INTEGRATION = s3_int " +
    "URL = '" + url + "' " +
    "FILE_FORMAT = (TYPE = PARQUET)";

    // Execute the CREATE STAGE statement
    stmt = snowflake.createStatement({sqlText: createStageStatement});
    stmt.execute();

    return "stages created successfully"
    }
    } catch (err) {
    // Raise an error message if any exception occurs
    throw "Error creating stages: " + err.message;
    }
    $$;

    CALL STAGES();
    SHOW stages;
  4. See the data
    SELECT * from @companies parse_json($1);
    SELECT * from @contacts parse_json($1);
    SELECT * from @events parse_json($1);
    SELECT * from @paid_ads parse_json($1);
    SELECT * from @revenue parse_json($1);
    SELECT * from @revenue_attribution parse_json($1);


How did we do?