Share and publish your Snowflake information to AWS Information Change utilizing Amazon Redshift information sharing

0
6
Adv1


Adv2

Amazon Redshift is a completely managed, petabyte-scale information warehouse service within the cloud. You can begin with only a few hundred gigabytes of knowledge and scale to a petabyte or extra. Right this moment, tens of 1000’s of AWS prospects—from Fortune 500 firms, startups, and the whole lot in between—use Amazon Redshift to run mission-critical enterprise intelligence (BI) dashboards, analyze real-time streaming information, and run predictive analytics. With the fixed improve in generated information, Amazon Redshift prospects proceed to realize successes in delivering higher service to their end-users, bettering their merchandise, and working an environment friendly and efficient enterprise.

On this put up, we talk about a buyer who’s at the moment utilizing Snowflake to retailer analytics information. The client wants to supply this information to purchasers who’re utilizing Amazon Redshift through AWS Information Change, the world’s most complete service for third-party datasets. We clarify intimately how you can implement a completely built-in course of that may mechanically ingest information from Snowflake into Amazon Redshift and provide it to purchasers through AWS Information Change.

Overview of the answer

The answer consists of 4 high-level steps:

  1. Configure Snowflake to push the modified information for recognized tables into an Amazon Easy Storage Service (Amazon S3) bucket.
  2. Use a custom-built Redshift Auto Loader to load this Amazon S3 landed information to Amazon Redshift.
  3. Merge the information from the change information seize (CDC) S3 staging tables to Amazon Redshift tables.
  4. Use Amazon Redshift information sharing to license the information to prospects through AWS Information Change as a public or personal providing.

The next diagram illustrates this workflow.

Solution Architecture Diagram

Stipulations

To get began, you want the next conditions:

Configure Snowflake to trace the modified information and unload it to Amazon S3

In Snowflake, establish the tables that you could replicate to Amazon Redshift. For the aim of this demo, we use the information within the TPCH_SF1 schema’s Buyer, LineItem, and Orders tables of the SNOWFLAKE_SAMPLE_DATA database, which comes out of the field together with your Snowflake account.

  1. Be sure that the Snowflake exterior stage title unload_to_s3 created within the conditions is pointing to the S3 prefix s3-redshift-loader-sourcecreated within the earlier step.
  2. Create a brand new schema BLOG_DEMO within the DEMO_DB database:CREATE SCHEMA demo_db.blog_demo;
  3. Duplicate the Buyer, LineItem, and Orders tables within the TPCH_SF1 schema to the BLOG_DEMO schema:
    CREATE TABLE CUSTOMER AS 
    SELECT * FROM snowflake_sample_data.tpch_sf1.CUSTOMER;
    CREATE TABLE ORDERS AS
    SELECT * FROM snowflake_sample_data.tpch_sf1.ORDERS;
    CREATE TABLE LINEITEM AS 
    SELECT * FROM snowflake_sample_data.tpch_sf1.LINEITEM;

  4. Confirm that the tables have been duplicated efficiently:
    SELECT table_catalog, table_schema, table_name, row_count, bytes
    FROM INFORMATION_SCHEMA.TABLES
    WHERE TABLE_SCHEMA = 'BLOG_DEMO'
    ORDER BY ROW_COUNT;

    unload-step-4

  5. Create desk streams to trace information manipulation language (DML) adjustments made to the tables, together with inserts, updates, and deletes:
    CREATE OR REPLACE STREAM CUSTOMER_CHECK ON TABLE CUSTOMER;
    CREATE OR REPLACE STREAM ORDERS_CHECK ON TABLE ORDERS;
    CREATE OR REPLACE STREAM LINEITEM_CHECK ON TABLE LINEITEM;

  6. Carry out DML adjustments to the tables (for this put up, we run UPDATE on all tables and MERGE on the buyer desk):
    UPDATE buyer 
    SET c_comment="Pattern remark for weblog demo" 
    WHERE c_custkey between 0 and 10; 
    UPDATE orders 
    SET o_comment="Pattern remark for weblog demo" 
    WHERE o_orderkey between 1800001 and 1800010; 
    UPDATE lineitem 
    SET l_comment="Pattern remark for weblog demo" 
    WHERE l_orderkey between 3600001 and 3600010;
    MERGE INTO buyer c 
    USING 
    ( 
    SELECT n_nationkey 
    FROM snowflake_sample_data.tpch_sf1.nation s 
    WHERE n_name="UNITED STATES") n 
    ON n.n_nationkey = c.c_nationkey 
    WHEN MATCHED THEN UPDATE SET c.c_comment="That is US based mostly customer1";

  7. Validate that the stream tables have recorded all adjustments:
    SELECT * FROM CUSTOMER_CHECK; 
    SELECT * FROM ORDERS_CHECK; 
    SELECT * FROM LINEITEM_CHECK;

    For instance, we are able to question the next buyer key worth to confirm how the occasions have been recorded for the MERGE assertion on the shopper desk:

    SELECT * FROM CUSTOMER_CHECK the place c_custkey = 60027;

    We are able to see the METADATA$ISUPDATE column as TRUE, and we see DELETE adopted by INSERT within the METADATA$ACTION column.
    unload-val-step-7

  8. Run the COPY command to dump the CDC from the stream tables to the S3 bucket utilizing the exterior stage title unload_to_s3.Within the following code, we’re additionally copying the information to S3 folders ending with _stg to make sure that when Redshift Auto Loader mechanically creates these tables in Amazon Redshift, they get created and marked as staging tables:
    COPY INTO @unload_to_s3/customer_stg/
    FROM (choose *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    COPY INTO @unload_to_s3/customer_stg/
    FROM (choose *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    COPY INTO @unload_to_s3/lineitem_stg/ 
    FROM (choose *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.lineitem_check) 
    FILE_FORMAT = (TYPE = PARQUET) 
    OVERWRITE = TRUE HEADER = TRUE;

  9. Confirm the information within the S3 bucket. There will likely be three sub-folders created within the s3-redshift-loader-source folder of the S3 bucket, and every could have .parquet information recordsdata.unload-step-9-valunload-step-9-valYou too can automate the previous COPY instructions utilizing duties, which will be scheduled to run at a set frequency for automated copy of CDC information from Snowflake to Amazon S3.
  10. Use the ACCOUNTADMIN position to assign the EXECUTE TASK privilege. On this state of affairs, we’re assigning the privileges to the SYSADMIN position:
    USE ROLE accountadmin;
    GRANT EXECUTE TASK, EXECUTE MANAGED TASK ON ACCOUNT TO ROLE sysadmin;

  11. Use the SYSADMIN position to create three separate duties to run three COPY instructions each 5 minutes: USE ROLE sysadmin;
    /* Process to dump Buyer CDC desk */ 
    CREATE TASK sf_rs_customer_cdc 
    WAREHOUSE = SMALL 
    SCHEDULE = 'USING CRON 5 * * * * UTC' 
    AS 
    COPY INTO @unload_to_s3/customer_stg/ 
    FROM (choose *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.customer_check) 
    FILE_FORMAT = (TYPE = PARQUET) 
    OVERWRITE = TRUE 
    HEADER = TRUE;
    /*Process to dump Orders CDC desk */ 
    CREATE TASK sf_rs_orders_cdc 
    WAREHOUSE = SMALL 
    SCHEDULE = 'USING CRON 5 * * * * UTC' 
    AS 
    COPY INTO @unload_to_s3/orders_stg/ 
    FROM (choose *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.orders_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    /* Process to dump Lineitem CDC desk */ 
    CREATE TASK sf_rs_lineitem_cdc 
    WAREHOUSE = SMALL 
    SCHEDULE = 'USING CRON 5 * * * * UTC' 
    AS 
    COPY INTO @unload_to_s3/lineitem_stg/ 
    FROM (choose *, sysdate() as LAST_UPDATED_TS from demo_db.blog_demo.lineitem_check)
    FILE_FORMAT = (TYPE = PARQUET)
    OVERWRITE = TRUE HEADER = TRUE;

    When the duties are first created, they’re in a SUSPENDED state.

  12. Alter the three duties and set them to RESUME state:
    ALTER TASK sf_rs_customer_cdc RESUME;
    ALTER TASK sf_rs_orders_cdc RESUME;
    ALTER TASK sf_rs_lineitem_cdc RESUME;

  13. Validate that every one three duties have been resumed efficiently: SHOW TASKS;unload-setp-13-valNow the duties will run each 5 minutes and search for new information within the stream tables to dump to Amazon S3.As quickly as information is migrated from Snowflake to Amazon S3, Redshift Auto Loader mechanically infers the schema and immediately creates corresponding tables in Amazon Redshift. Then, by default, it begins loading information from Amazon S3 to Amazon Redshift each 5 minutes. You too can change the default setting of 5 minutes.
  14. On the Amazon Redshift console, launch the question editor v2 and connect with your Amazon Redshift cluster.
  15. Browse to the dev database, public schema, and develop Tables.
    You possibly can see three staging tables created with the identical title because the corresponding folders in Amazon S3.
  16. Validate the information in one of many tables by working the next question:SELECT * FROM "dev"."public"."customer_stg";unload-step-16-val

Configure the Redshift Auto Loader utility

The Redshift Auto Loader makes information ingestion to Amazon Redshift considerably simpler as a result of it mechanically masses information recordsdata from Amazon S3 to Amazon Redshift. The recordsdata are mapped to the respective tables by merely dropping recordsdata into preconfigured places on Amazon S3. For extra particulars in regards to the structure and inside workflow, discuss with the GitHub repo.

We use an AWS CloudFormation template to arrange Redshift Auto Loader. Full the next steps:

  1. Launch the CloudFormation template.
  2. Select Subsequent.
    autoloader-step-2
  3. For Stack title, enter a reputation.
  4. Present the parameters listed within the following desk.
    CloudFormation Template Parameter Allowed Values Description
    RedshiftClusterIdentifier Amazon Redshift cluster identifier Enter the Amazon Redshift cluster identifier.
    DatabaseUserName Database consumer title within the Amazon Redshift cluster The Amazon Redshift database consumer title that has entry to run the SQL script.
    DatabaseName S3 bucket title The title of the Amazon Redshift main database the place the SQL script is run.
    DatabaseSchemaName Database title in Amazon Redshift The Amazon Redshift schema title the place the tables are created.
    RedshiftIAMRoleARN Default or the legitimate IAM position ARN hooked up to the Amazon Redshift cluster The IAM position ARN related to the Amazon Redshift cluster. Your default IAM position is ready for the cluster and has entry to your S3 bucket, go away it on the default.
    CopyCommandOptions Copy possibility; default is delimiter ‘|’ gzip

    Present the extra COPY command information format parameters.

    If InitiateSchemaDetection = Sure, then the method makes an attempt to detect the schema and mechanically set the appropriate copy command choices.

    Within the occasion of failure on schema detection or when InitiateSchemaDetection = No, then this worth is used because the default COPY command choices to load information.

    SourceS3Bucket S3 bucket title The S3 bucket the place the information is saved. Be certain the IAM position that’s related to the Amazon Redshift cluster has entry to this bucket.
    InitiateSchemaDetection Sure/No

    Set to Sure to dynamically detect the schema previous to file load and create a desk in Amazon Redshift if it doesn’t exist already. If a desk already exists, then it received’t drop or recreate the desk in Amazon Redshift.

    If schema detection fails, the method makes use of the default COPY choices as laid out in CopyCommandOptions.

    The Redshift Auto Loader makes use of the COPY command to load information into Amazon Redshift. For this put up, set CopyCommandOptions as follows, and configure any supported COPY command choices:

    delimiter '|' dateformat 'auto' TIMEFORMAT 'auto'

    autoloader-input-parameters

  5. Select Subsequent.
  6. Settle for the default values on the following web page and select Subsequent.
  7. Choose the acknowledgement verify field and select Create stack.
    autoloader-step-7
  8. Monitor the progress of the Stack creation and wait till it’s full.
  9. To confirm the Redshift Auto Loader configuration, register to the Amazon S3 console and navigate to the S3 bucket you offered.
    You must see a brand new listing s3-redshift-loader-source is created.
    autoloader-step-9

Copy all the information recordsdata exported from Snowflake below s3-redshift-loader-source.

Merge the information from the CDC S3 staging tables to Amazon Redshift tables

To merge your information from Amazon S3 to Amazon Redshift, full the next steps:

  1. Create a short lived staging desk merge_stg and insert all of the rows from the S3 staging desk which have metadata_action as INSERT, utilizing the next code. This contains all the brand new inserts in addition to the replace.
    CREATE TEMP TABLE merge_stg 
    AS
    SELECT * FROM
    (
    SELECT *, DENSE_RANK() OVER (PARTITION BY c_custkey ORDER BY last_updated_ts DESC
    ) AS rnk
    FROM customer_stg WHERE rnk = 1 AND metadata$motion = 'INSERT'

    The previous code makes use of a window operate DENSE_RANK() to pick the most recent entries for a given c_custkey by assigning a rank to every row for a given c_custkey and organize the information in descending order utilizing last_updated_ts. We then choose the rows with rnk=1 and metadata$motion = ‘INSERT’ to seize all of the inserts.

  2. Use the S3 staging desk customer_stg to delete the information from the bottom desk buyer, that are marked as deletes or updates:
    DELETE FROM buyer 
    USING customer_stg 
    WHERE buyer.c_custkey = customer_stg.c_custkey;

    This deletes all of the rows which can be current within the CDC S3 staging desk, which takes care of rows marked for deletion and updates.

  3. Use the non permanent staging desk merge_stg to insert the information marked for updates or inserts:
    INSERT INTO buyer 
    SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment 
    FROM merge_stg;

  4. Truncate the staging desk, as a result of now we have already up to date the goal desk:truncate customer_stg;
  5. You too can run the previous steps as a saved process:
    CREATE OR REPLACE PROCEDURE merge_customer()
    AS $$
    BEGIN
    /*CREATING TEMP TABLE TO GET THE MOST LATEST RECORDS FOR UPDATES/NEW INSERTS*/
    CREATE TEMP TABLE merge_stg AS
    SELECT * FROM
    (
    SELECT *, DENSE_RANK() OVER (PARTITION BY c_custkey ORDER BY last_updated_ts DESC ) AS rnk
    FROM customer_stg
    )
    WHERE rnk = 1 AND metadata$motion = 'INSERT';
    /* DELETING FROM THE BASE TABLE USING THE CDC STAGING TABLE ALL THE RECORDS MARKED AS DELETES OR UPDATES*/
    DELETE FROM buyer
    USING customer_stg
    WHERE buyer.c_custkey = customer_stg.c_custkey;
    /*INSERTING NEW/UPDATED RECORDS IN THE BASE TABLE*/ 
    INSERT INTO buyer
    SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment
    FROM merge_stg;
    truncate customer_stg;
    END;
    $$ LANGUAGE plpgsql;

    For instance, let’s have a look at the earlier than and after states of the shopper desk when there’s been a change in information for a selected buyer.

    The next screenshot reveals the brand new adjustments recorded within the customer_stg desk for c_custkey = 74360.
    merge-process-new-changes
    We are able to see two information for a buyer with c_custkey=74360 one with metadata$motion as DELETE and one with metadata$motion as INSERT. Meaning the document with c_custkey was up to date on the supply and these adjustments must be utilized to the goal buyer desk in Amazon Redshift.

    The next screenshot reveals the present state of the buyer desk earlier than these adjustments have been merged utilizing the previous saved process:
    merge-process-current-state

  6. Now, to replace the goal desk, we are able to run the saved process as follows: CALL merge_customer()The next screenshot reveals the ultimate state of the goal desk after the saved process is full.
    merge-process-after-sp

Run the saved process on a schedule

You too can run the saved process on a schedule through Amazon EventBridge. The scheduling steps are as follows:

  1. On the EventBridge console, select Create rule.
    sp-schedule-1
  2. For Identify, enter a significant title, for instance, Set off-Snowflake-Redshift-CDC-Merge.
  3. For Occasion bus, select default.
  4. For Rule Kind, choose Schedule.
  5. Select Subsequent.
    sp-schedule-step-5
  6. For Schedule sample, choose A schedule that runs at an everyday price, resembling each 10 minutes.
  7. For Charge expression, enter Worth as 5 and select Unit as Minutes.
  8. Select Subsequent.
    sp-schedule-step-8
  9. For Goal sorts, select AWS service.
  10. For Choose a Goal, select Redshift cluster.
  11. For Cluster, select the Amazon Redshift cluster identifier.
  12. For Database title, select dev.
  13. For Database consumer, enter a consumer title with entry to run the saved process. It makes use of non permanent credentials to authenticate.
  14. Optionally, you may as well use AWS Secrets and techniques Supervisor for authentication.
  15. For SQL assertion, enter CALL merge_customer().
  16. For Execution position, choose Create a brand new position for this particular useful resource.
  17. Select Subsequent.
    sp-schedule-step-17
  18. Assessment the rule parameters and select Create rule.

After the rule has been created, it mechanically triggers the saved process in Amazon Redshift each 5 minutes to merge the CDC information into the goal desk.

Configure Amazon Redshift to share the recognized information with AWS Information Change

Now that you’ve got the information saved inside Amazon Redshift, you may publish it to prospects utilizing AWS Information Change.

  1. In Amazon Redshift, utilizing any question editor, create the information share and add the tables to be shared:
    CREATE DATASHARE salesshare MANAGEDBY ADX;
    ALTER DATASHARE salesshare ADD SCHEMA tpch_sf1;
    ALTER DATASHARE salesshare ADD TABLE tpch_sf1.buyer;

    ADX-step1

  2. On the AWS Information Change console, create your dataset.
  3. Choose Amazon Redshift datashare.
    ADX-step3-create-datashare
  4. Create a revision within the dataset.
    ADX-step4-create-revision
  5. Add property to the revision (on this case, the Amazon Redshift information share).
    ADX-addassets
  6. Finalize the revision.
    ADX-step-6-finalizerevision

After you create the dataset, you may publish it to the general public catalog or on to prospects as a personal product. For directions on how you can create and publish merchandise, discuss with NEW – AWS Information Change for Amazon Redshift

Clear up

To keep away from incurring future expenses, full the next steps:

  1. Delete the CloudFormation stack used to create the Redshift Auto Loader.
  2. Delete the Amazon Redshift cluster created for this demonstration.
  3. In case you have been utilizing an present cluster, drop the created exterior desk and exterior schema.
  4. Delete the S3 bucket you created.
  5. Delete the Snowflake objects you created.

Conclusion

On this put up, we demonstrated how one can arrange a completely built-in course of that repeatedly replicates information from Snowflake to Amazon Redshift after which makes use of Amazon Redshift to supply information to downstream purchasers over AWS Information Change. You should use the identical structure for different functions, resembling sharing information with different Amazon Redshift clusters inside the similar account, cross-accounts, and even cross-Areas if wanted.


Concerning the Authors

Raks KhareRaks Khare is an Analytics Specialist Options Architect at AWS based mostly out of Pennsylvania. He helps prospects architect information analytics options at scale on the AWS platform.

Ekta Ahuja is a Senior Analytics Specialist Options Architect at AWS. She is captivated with serving to prospects construct scalable and strong information and analytics options. Earlier than AWS, she labored in a number of completely different information engineering and analytics roles. Exterior of labor, she enjoys baking, touring, and board video games.

Tahir Aziz is an Analytics Answer Architect at AWS. He has labored with constructing information warehouses and massive information options for over 13 years. He loves to assist prospects design end-to-end analytics options on AWS. Exterior of labor, he enjoys touring
and cooking.

Ahmed Shehata is a Senior Analytics Specialist Options Architect at AWS based mostly on Toronto. He has greater than twenty years of expertise serving to prospects modernize their information platforms, Ahmed is captivated with serving to prospects construct environment friendly, performant and scalable Analytic options.

Adv3