Streaming Twitter data into Snowflake

Data Pipeline

data pipeline

This streaming data pipeline consists of five stages. The tweets are fetched using the twitter stream API. The stream is consumed by AWS Kinesis Data Firehose and written to AWS S3 bucket. The S3 bucket is used as a staging layer for Snowflake. Snowpipe is used to load the data continuously in the table.

Step 1

1: Name and source → give a name for the stream and select source as “direct PUT”

2: Process records → Leave this setting disabled as we are not doing any transformations.

3: Choose a destination → Select S3 bucket. Create new bucket or select a existing one

4: Configure settings →Set buffer Size as 2mb and buffer interval as 5 mins. The stream contents will be written to S3 , if it exceeds 2mb or the interval from previous write becomes 5mins.

5: Review → Finalize and create.

Step 2

Tweepy allows us to interact with Twitter API without dealing with the low-level details.

To install tweepy.

pip3 install tweepy

Code:

The above code can be used retrieve the data using tweepy and write into the Kinesis stream we created in step1.

Get the required keys/credentials from developer.twitter.com and the aws console.

Step 3

Creating a S3 stage:

CREATE STAGE stage_name 
URL = 's3://bucket-name'
CREDENTIALS = (AWS_KEY_ID = 'xxx' AWS_SECRET_KEY = 'yyy');

Create a Table:

CREATE TABLE table_name (v variant);

To store the twitter JSON payload, use variant datatype.

Creating a Snowpipe:

CREATE PIPE pipename 
COMMENT = 'pipe_description'
AUTO_INGEST = true
AS
COPY INTO table_name FROM @stage_name
FILE_FORMAT = (TYPE = 'JSON');

Once the snowpipe is created , copy the ARN specified .

Add the copied ARN to S3 bucket event notifications queue with PUT function as trigger. s3:ObjectCreated:Put

As Firehose starts writing files in S3 , notification will be sent to the ARN , triggering the snowpipe to start the load.

an odd brief blip