Real Time Streaming Using Apache Spark, Nifi, and Kafka

Reading Time: 6 Minutes

by  | July 19, 2018  Real Time Streaming Using  Apache Spark, Nifi, and Kafka

Apache Spark Overview

Apache Spark is a fast, in-memory data processing engine with expressive development APIs to allow data workers to execute streaming conveniently.With Spark running on Apache Hadoop YARN, developers everywhere can now create applications to exploit Spark’s power, derive insights, and enrich their data science workloads within a single, shared dataset in Hadoop.In this we will consume and transform complex data streaming from Apache Kafka.using this API we can express complex transformations like exactly-once event-time aggregation and output the results to a variety of systems.

What is Streaming

Streaming is unstructured data that is generated continuously by thousands of data sources.This Streaming data includes a wide variety of data such as log files generated by customers using your mobile or web applications, in-game player activity, information from social networks,Financial trading and telemetry from connected devices or instrumentation in data centers.

What is Structured Streaming

Structure streaming limits what can express that enable optimizations.since we can perform complex data computation that is not possible in stream processing.

Structure Streaming

Why we use Structured Streaming

In structure streaming any data stream treat as unbounded data: new records added to the stream are like rows being appended to the table. This allows us to treat both batch and streaming data as tables. Since DataFrame/Dataset queries can apply to both batch and streaming data.

Users describe the query they want to run, the input and output locations, and optionally a few more details. The system then runs their query incrementally, maintaining enough state to recover from failure, keep the results consistent in external storage.


Real Time Streaming with Apache Spark, Apache Nifi and Apache Kafka

Real-Time Streaming with Apache Kafka

Data Collection Layer

Data collection layer is the very first layer where we ingest data from different locations. For this, we will define our data flow pipelines by using the different data source in Apache NiFi or Apache MiNiFi.

  • File System
  • Cloud bucket(Google storage, Amazon S3)
  • Databases(MySql, Postgre, MongoDB, Cassandra)
  • Real-time stream from the IoT devices

Apache Nifi

Nifi is a data flow automation tool that allows a user to send, receive, route, transform and sort data as needed in an automated and configurable way. Nifi has user-friendly drag and drops graphical user interface that allows users to create data flow as per their requirement. NiFi have their predefined processor which used for fetching/pushing data to any data source. We can also perform some data transformation in Apache NiFi by using it, predefined processors.


Data Routing Layer

This layer refers to the layer in which will sends data to the multiple different data sources, i.e. Cloud bucket, database, local file system. This we can define in our Apache NiFi flow. Apache NiFi enables the routing of the data to the multiple destinations in parallel.


Apache Kafka

Apache Kafka is a distributed publish-subscribe messaging system that is used for ingestion of real-time data streams and makes them available to the consumer in a parallel and fault-tolerant manner. Kafka is suitable for building real-time streaming data pipeline that reliably moves data between different processing systems.

Kafka consists of Topics, Consumers, Producers, Brokers, Partitions and Clusters. Kafka topics divided into many partitions. Partitions allow you to parallelize topic by splitting the data in a particular topic across multiple brokers. Each partition can be placed on a separate machine to allow for multiple consumers to read from a topic in parallel.

Apache Kafka use cases

  • Stream Processing
  • Metrics Collection and Monitoring
  • Website activity tracking

Data Transformation Layer

Apache Spark Structure streaming

Structured Streaming is the new streaming model of the Apache Spark framework build on SQL engine. It is introduced in Apache Spark 2.0 version to provides fast, scalable, fault-tolerant and low latency processing. The main idea is that you should not have to reason about streaming, but instead use a single API for both streaming and batch operations.

Thus it allows you to write batch queries on your streaming data.Structured Streaming provide dataset/dataframe api in Scala,java,Python or R to express streaming aggregation,event-time windows and stream-to-bath join.

Dataframe

Data frame is a distributed collection of data organized in the form of named column and row.It is similar to the table in the relational database with proper optimization. Dataframe comes into existence to deal with both structured and unstructured data formats. For example Avro, CSV,elasticsearch, and Cassandra.

Dataset

Dataset is a data structure in SparkSQL which is strongly typed and is a map to a relational schema. It is an extension to data frame API represents structured queries with encoders. Spark Dataset provides both type safety and object-oriented programming interface.


Read Streaming data from Kafka topics through apache spark

Structured Streaming provides a tied in batch and streaming API that enables us to view data published to Kafka as a DataFrame. The first step is to specify the location of our Kafka cluster and which topic we are interested in reading.Spark allows you to read an individual topic, like a specific set of topics, a regex pattern of topics, or even a specific set of partitions belonging to a set of topics.

    Import org.apache.spark.sql.function._
   Create sparkSession
   val spark = SparkSession
    .builder
    .appName("Spark-Kafka-Integration")
    .master("local")
    .getOrCreate()

In general, a session means interaction between two or more entity. But in Apache spark, SparkSession create a single point of an entity to interact with underlying spark functionality and allow programming spark with data frames and dataset APIs.

  Connecting to kafka topic Val df = spark.readStream.format("kafka").option(
  "kafka.bootstrap.servers", "host1:port1,host2:port2"
).option("subscribe", "topic1").option("starting Offsets", "earliest").load() df.printSchema() reveals the schema of our DataFrame.root | -- key: binary (nullable = true)
| -- value: binary (nullable = true)
| -- topic: string (nullable = true)
| -- partition: integer (nullable = true)
| -- offset: long (nullable = true)
| -- timestamp: timestamp (nullable = true)
| -- timestampType: integer (nullable = true)

The returned DataFrame (df) contains all the intimate fields of a Kafka record and its associated metadata. We can now use all of the intimate DataFrame or Dataset operations to transform the result.

Streaming ETL

Now the stream is set up, we can start doing the required ETL on it to extract meaningful information.Let's say that real-time streaming data push by NIfi to the Kafka as below.


 {
  "city": "",
  "country": "India",
  "countryCode": "+91”
  "lat": 0.00,
  "regionName": "Mumbai"
  "status": "success"
  "zip": ""
 }

It is now possible to do analysis quickly,such as how many users are coming from india


val result = df.select(get_json_object(($ "value").cast("string"), "$.country")
  .alias("countByCountry"))
 .groupBy("countByContry")
 .county()

result.writeStream.format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("topic", "topic1").start

Now, we can parse the country out of incoming JSON messages, group them and do a count, all in real-time as we are reading data from Kafka topic. The spark streaming job works in the background and continuously updates the counts as new messages arrive.

Low Latency Continuous Processing Mode in Structured Streaming

Continuous Processing is new Execution engine introduced in Spark 2.3 that allow very low end to end latency with at-least-once fault tolerance guarantee. Compare this with the default micro-batch processing engine which can achieve exactly-once guarantees latency of 1 second.

In spark structured streaming spark wait for 1 second and batches together all the events that received during that interval into a micro batch.This micro batch is then scheduled by the Driver to executed as tasks at the Executors.

After a micro-batch execution is complete, the next batch is collected and scheduled again. This scheduling is frequently done to give an impression of streaming execution. However, low latency doesn’t come without any costs. In fact, faster processing decreases the delivery guarantees to at-least-once from exactly once. So the continuous execution is advised for the system where the processing latency is more important than the delivery guarantee.


val result = spark.writeStream.format("kafka")
 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
 .option("topic", "topic1")
 .trigger(Trigger.Continuous(" 1 seconds ")) //only this line include for continuous
 .start()

For continuous mode, you can choose which mode that help to execute without modifying the application logic. To run a query in continuous processing mode, all you need to do is specify a continuous trigger with the desired checkpoint interval as the parameter.No need to change the logic of the code.


How Can Don Help You?