Real Time Analytics with Azure Databricks

Build your Data Estate with Azure DataBricks – Part 3 – Real Time Analytics

It is not the strongest of the species that survives, nor the most intelligent that survives. It is the one that is most adaptable to change~Charles Darwin

With increasing clock speeds and levels of sophistication, we have ventured into the era of the Internet of Things and real-time feeds, thus leading to the high-velocity paradigm of Big Data. This real-time path of the lambda architecture augments a wide variety of critical applications like predictive maintenance, disaster prediction, etc. where timely actions can save assets as well as lives.

Also Read: DataBricks Part 2 – Big Data Lambda Architecture and Batch Processing

In Azure, there are multiple ways to realize real-time architecture, thus enabling faster analytics. Broadly it can be classified as the Infrastructure as a service (IaaS) way or the Platform as a Service (PaaS) way. With IaaS, we have Kafka in Azure to receive real-time feeds. This streaming data can then be fed into Storm (or any PaaS service like Databricks) enabling stream analytics. Although the IaaS way has its advantages, to realize the architecture in a serverless fashion, we will go PaaS way; the IoT Hub way

Related: Part – 1: Build your Data Estate with Azure Databricks

IoT Hub is the bidirectional messaging PaaS to communicate with your devices/sensors etc. Data from IoT hub can be processed using two PaaS services in Azure viz. Azure Stream Analytics and Azure Databricks. We want to clarify that Azure Stream Analytics is an excellent service and it is widely used in the Industry. We encourage you to go ahead and give it a try as well. However, in this article, we will stick with Azure Databricks for three reasons:

  1.  It gives us an integrated platform for both Batch processing and Real-time Analytics of the Lambda Architecture.
  2.  It helps us leverage the power of Spark Streaming under the hood.
  3.  The cluster autoscaling feature enables us to save a lot of expenses.

Roughly the Architecture looks like this:

Step 1: Device to IoT hub

For demonstration purpose, we will introduce a Raspberry PI simulator which will push the fabricated weather data to IoT hub. To achieve this, we need to declare a device in the IoT hub, which is the simulator in this case. Click on the add icon and mention the device name:

After this click on the registered device and retrieve the Primary connection string from the device details:

Paste the connection String extracted into the connection string field in the Raspberry PI simulator:

A sanity check here would be the glowing of the LED in the picture.

Step 2: IoT Hub to Databricks

Once the IoT hub setup is ready, it is essential to read and process the streaming data. Here services like Azure Stream Analytics and Databricks comes into the picture. In Databricks, we leverage the power of Spark Streaming to perform SQL like manipulations on Streaming Data. The first step here is to establish a connection between the IoT hub and Databricks. First, we need to install the spark.eventhubs library to the pertinent cluster. After that, we need to write the below code(Scala):

import org.apache.spark.eventhubs.{ ConnectionStringBuilder, EventHubsConf, EventPosition }
import org.apache.spark.sql.functions.{ explode, split }
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

// Build connection string with the above information
val connectionString = ConnectionStringBuilder("<Event Hub Compatible endpoint of IoT Hub>")
.setEventHubName("<IoT Hub Name>")
.build

val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream);
//follow by the different options usable

val incomingStream = spark.readStream
.format("eventhubs")
.options(eventHubsConf.toMap)
.option("eventhubs.partition.count", "4")
.load()

incomingStream.printSchema

After establishing the connection, we need to define the JSON Schema to match the structure of the incoming stream. It can be achieved using the below code(Scala):

import org.apache.spark.sql.types._ // https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/types/package-summary.html
import org.apache.spark.sql.functions._

// Our JSON Schema
val jsonSchema = new StructType()
.add("messageId", StringType)
.add("deviceId", StringType)
.add("temperature", StringType)
.add("humidity", StringType)

// Convert our EventHub data, where the body contains our message and which we decode the JSON
val messages = incomingStream
// Parse our columns from what EventHub gives us (which is the data we are sending, plus metadata such as offset, enqueueTime, ...)
.withColumn("Offset", $"offset".cast(LongType))
.withColumn("Time (readable)", $"enqueuedTime".cast(TimestampType))
.withColumn("Timestamp", $"enqueuedTime".cast(LongType))
.withColumn("Body", $"body".cast(StringType))
// Select them so we can play with them
.select("Offset", "Time (readable)", "Timestamp", "Body")
// Parse the "Body" column as a JSON Schema which we defined above
.select(from_json($"Body", jsonSchema) as "sensors")
// Now select the values from our JSON Structure and cast them manually to avoid problems
.select(
$"sensors.messageId".cast("string"),
$"sensors.deviceId".cast("string"),
$"sensors.temperature".cast("double") as "tempVal",
$"sensors.humidity".cast("double") as "humVal"
)
messages.printSchema()

Once the messages are received, we create a temporary view on top of the JSON Schema in order to write SQL queries to perform advanced analytics using the function ‘createOrReplaceTempView’:

messages.createOrReplaceTempView("dataStreamsView")

After this your streaming data is ready for advanced analytics:

P.S. This isn’t the comprehensive guide to Real-Time Analytics with Databricks. We can persist the streaming data in CosmosDB and visualize it in PowerBI using its spark connector.

Disclaimer: The Questions and Answers provided on https://www.gigxp.com are for general information purposes only. We make no representations or warranties of any kind, express or implied, about the completeness, accuracy, reliability, suitability or availability with respect to the website or the information, products, services, or related graphics contained on the website for any purpose.