This streaming data pipeline consists of five stages. The tweets are fetched using the twitter stream API. The stream is consumed by AWS Kinesis Data Firehose and written to AWS S3 bucket. The S3 bucket is used as a staging layer for Snowflake. Snowpipe is used to load the data continuously in the table.
Create the delivery stream using Kinesis Data Firehose.
1: Name and source → give a name for the stream and select source as “direct PUT”
2: Process records → Leave this setting disabled as we are not doing any transformations.
3: Choose a destination → Select S3 bucket. Create new bucket or select a existing one
4: Configure settings →Set buffer Size as 2mb and buffer interval as 5 mins. The stream contents will be written to S3 , if it exceeds 2mb or the interval from previous write becomes 5mins.
5: Review → Finalize and create.
Getting data from Twitter
Tweepy allows us to interact with Twitter API without dealing with the low-level details.
To install tweepy.
pip3 install tweepy
The above code can be used retrieve the data using tweepy and write into the Kinesis stream we created in step1.
Create a snowflake account and spin up a warehouse.
Creating a S3 stage:
CREATE STAGE stage_name
URL = 's3://bucket-name'
CREDENTIALS = (AWS_KEY_ID = 'xxx' AWS_SECRET_KEY = 'yyy');
Create a Table:
CREATE TABLE table_name (v variant);
To store the twitter JSON payload, use variant datatype.
Creating a Snowpipe:
CREATE PIPE pipename
COMMENT = 'pipe_description'
AUTO_INGEST = true
COPY INTO table_name FROM @stage_name
FILE_FORMAT = (TYPE = 'JSON');
Once the snowpipe is created , copy the ARN specified .
Add the copied ARN to S3 bucket event notifications queue with PUT function as trigger.
As Firehose starts writing files in S3 , notification will be sent to the ARN , triggering the snowpipe to start the load.