Introduction

With the accumulation of data into the Databases, over time velocity of data generation is also increasing day-by-day. As we were struggling to deal with the volume of the data. The new obstacle was emerging in front of us i.e. Streaming Data. Now dealing with streaming data was a headache for the analysts, but Spark Streaming was the painkiller to this headache. Spark Streaming not only handles the Velocity of data but also manages it proficiently and efficiently. Let’s explore Spark Streaming in detail and recoointre it effectively.

Learning Objectives

  1. What is Spark Streaming?
  2. Advantages of Spark Streaming
  3. Architecture of Spark Streaming
  4. Getting Started with Spark Streaming on EMR Cluster

What is Spark Streaming?

Spark streaming is a natural extension of Apache spark. Spark Streaming is used to process endless amounts of data at scale. It is available in two APIs one in low-level also known as DStreams and another a high level termed as DataFrame + SQL APIs. Spark Streaming is scalable and fault-tolerant from the word go. It basically works in two output modes one is called micro-batch which adds outputs in regular intervals which will simply add up all the data gathered so far and also in continuous execution mode which offers low latency.

Advantages of Spark Streaming

  1. Fit for massive scale data
  2. Fault-tolerant
  3. Two API levels for hands-off or high control
  4. Late data handling with watermarks
  5. Highly configurable with massive performance benefits if configured properly
  6. Can easily connect to Kafka for both read/write
  7. Usually excellent documentation and maintenance

Architecture of Spark Streaming

Spark Streaming discretizes the streaming data into small quantities instead of processing the entire record at once. First, Spark Streaming’s Receivers accept data. Then, buffer the data in the memory of Spark’s workers nodes. After that, Spark engine runs tasks to process the batches and output the results.

Each batch of data is RDD, which is the basic abstraction of a fault-tolerant dataset in Spark.

Getting Started with Spark Streaming on EMR Cluster

Before getting started with Spark Streaming, let's create an EMR Cluster on AWS.

Open the JupyterHub in EMR Cluster. Let’s work on Spark Streaming practical concepts.

First of all, let’s import all the necessary libraries:

from pyspark.sql import SparkSession

from pyspark.sql.functions import explode

from pyspark.sql.functions import split

Create dataframe representing the stream of input lines from connection to <instance ``private dns``>:9999

Using these streams we can easily handle the real-time high velocity data.

lines = spark\ .readStream\ .format('socket')\ .option('host', 'ip-172-31-2-226.ap-south-1.compute.internal')\ .option('port', 9999)\ .load()

Note: For host, use your cluster’s DNS. You can check it by typing pwd in cli.

Isn’t it easy to merge the streaming concepts with the generic spark concepts? Let’s proceed further.

Split the RDD into words using explode() and split()

words = lines.select( explode( split(lines.value, ' ') ).alias('word') )

Calculate the count of words using groupBy() and count the frequency of words

wordCounts = words.groupBy('word').count()

Run nc -lk <hostname -f> 9999 in new terminal. Start running the query that prints the running counts to the memory (set outputMode = complete) and format = memory

query = wordCounts\ .writeStream\ .queryName("mappedDF")\ .outputMode('complete')\ .format('memory')\ .start()

import time for i in range(3): spark.sql('SELECT * FROM mappedDF').show() time.sleep(1)

query.stop()

Good Job!! We have successfully implemented the basic of Spark Streaming concepts along with the Spark

Now let’s proceed to some advanced concepts about spark streaming. Use curl to pull data from the Mockaroo REST endpoint, and pipe it into socket

Create a bash script to Inject some data and use while to loop, and awk to inject delay:

while [ 1 -eq 1 ] do curl "https://my.api.mockaroo.com/logs.csv?count=10&key=c69dc360" | \ awk '{print $0;system("sleep 0.5");}' | \ nc -lk 9999 done

Importing necessary libraries:

from pyspark.sql import SparkSession

from pyspark.sql.functions import explode

from pyspark.sql.functions import split

from pyspark.sql.functions import window, col

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

from pyspark.sql.functions import from_csv

Now Create the Schema:

schema = "name string, ip_address string, time timestamp, city string, domain string"

Create dataframe representing the stream of input lines from connection to <instance ``private dns``>:9999

lines = spark\ .readStream\ .format('socket')\ .option('host', 'ip-172-31-9-166.ap-south-1.compute.internal')\ .option('port', 9999)\ .load()\ .select(col("value").cast("STRING"))\ .select(from_csv(col("value"), schema))

Let’s print the schema:

lines.printSchema()

The fields of a complex object can be referenced with a "dot" notation as in:

`col("from_csv(value).name")`

A large number of these fields/columns can become unwieldy.

For that reason, it is common to extract the sub-fields and represent them as first-level columns as seen below:

from pyspark.sql.functions import isnull, unix_timestamp anonDF = (lines .select(col("from_csv(value).name").alias("name"), col("from_csv(value).ip_address").alias("ip_address"), col("from_csv(value).time").alias("time"), col("from_csv(value).city").alias("city"), col("from_csv(value).domain").alias("domain")) )

Again print the schema:

anonDF.printSchema()

Windowing

If we were using a static DataFrame to produce an aggregate count, we could use `groupBy()` and `count()`. Instead we accumulate counts within a sliding window, answering questions like "How many records are we getting every second?"

The following illustration, from the Structured Streaming Programming Guide guide, helps us understanding how it works:

Creating the window of 1 hour on ‘city’ column:

countsDF = (anonDF .groupBy(col("city"), window(col("time"), "1 hour")) .count() )

Start running the query that prints the running counts to the memory ``(set outputMode = complete)`` and ``format = memory``

query = countsDF\ .writeStream\ .queryName("city")\ .outputMode('complete')\ .format('memory')\ .start()

Running the sql query for select the data from temporary table inside the for loop:

import time for i in range(3): spark.sql('SELECT * FROM city').show() time.sleep(2)

Good Job!! We have successfully implemented the windowing concepts under Stream Processing.

Conclusion

Spark Streaming is used to process endless amounts of data at scale. It is available in two APIs one in low-level also known as DStreams and another a high level termed as DataFrame + SQL APIs. Spark Streaming is scalable and fault-tolerant from the word go. We have successfully covered the basic concepts of spark streaming and also implemented the windowing concepts in this blog.