Real-time Data Streaming with Kafka: A Practical Experiment with Raspberry Pi
Introduction:
In the digital age, the allure of experimentation never loses its charm, especially when it means breaking things down only to rebuild them better. That's the essence of learning, right? In this adventurous endeavor, I embarked on an exploration to understand the complexities of real-time data streaming, using equipment that was just gathering dust in my study room. Here's the story of how I turned my Raspberry Pi into a powerhouse of real-time data streaming and analytics, all while learning the intricate details of Kafka, n8n, Databricks, and more.
The Problem:
With a couple of Raspberry Pi devices lying idle and some curiosity about real-time data streaming, I set out to monitor my IoT devices, track temperature, power status, and, more importantly, understand how everything fits together in a data ecosystem.
The Objective:
The goal is simple yet intriguing – capture the operating temperature every second, send the data to Databricks, visualize it with Metabase, and create a system that can notify me about power outages or when temperatures exceed a certain threshold. It may not change the world, but it is an exercise in creativity, exploration, and a practical application of data technology.
Architecture Overview:
The architecture I implemented is as simple as it is ingenious:
- Raspberry Pi: Capturing temperature and power status every second.
- n8n: An open-source workflow automation tool to create webhooks and post data to Kafka.
- Kafka: Sure to generate mixed reactions (especially among Kinesis enthusiasts), my self-hosted Apache Kafka serves as both a broker and consumer.
- Databricks, Spark, and Delta Tables: Powerful tools to process and manage data.
- Metabase: Visualizing the data in a way that even a layman could understand.
- Element, Telegram, and Uptime-kuma: Different tools for receiving notifications and alerts.
- Ansible: The glue that holds it all together, automating docker and various other configurations.
The Journey
I began assembling a set of tools that would harmonize in my self-hosting hobby. From these, I formed a dedicated data-related toolkit, humorously coined YADS (Yet Another Data Stack). Starting with tools like Dremio, Hive, Minio, Querybook, Trino, Presto, PostgreSQL, and MariaDB, I later added Kafka. All were run as docker containers, interconnected by a secure dedicated network. Ansible was my tool of choice for deployment and configuration.
In this post, I'll focus on the Kafka, Spark, and Metabase components, but feel free to reach out to me personally at my Matrix instance @teej:teej.sh for more details.
The Adventure Begins: Kafka, Spark, and Metabase
Here's a glimpse of the setup:
- A single-node Kafka docker-compose built using Bitnami images. Scalable and customizable, I also exposed the LISTENER for cross-server data push.
- A script for reading and writing streams on Databricks, effectively capturing the data for analysis.
- Visualization using Metabase, quickly spun and connected to Databricks.
Kafka
version: "2
services:
kafka:
image: docker.io/bitnami/kafka:3.5
container_name: kafka1
ports:
- "9092:9092"
- "9094:9094"
volumes:
- "kafka_data:/bitnami"
# - ./config.properties:/opt/bitnami/kafka/config/config.properties:ro
environment:
# KRaft settings
# - KAFKA_CFG_SASL_ENABLED_MECHANISMS=SCRAM-SHA-256
# - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=SCRAM-SHA-256
# - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://0.0.0.0:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092,EXTERNAL://<FQDN>:9094
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
# Configure ACL
# - KAFKA_CFG_AUTHORIZER_CLASS_NAME=kafka.security.authorizer.AclAuthorizer
# - KAFKA_CFG_SUPER_USERS=User:user
volumes:
kafka_data:
driver: local
networks:
default:
external:
name: rofl"
Creating a topic
docker exec -it kafka-kafka-1 kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic nathass-pi-temperature
Listing topic
docker exec -it kafka-kafka-1 kafka-topics.sh --bootstrap-server localhost:9092 --list
Testing and Results:
Testing the consumer
docker exec -it kafka1 kafka-console-consumer.sh --topic nathass-pi-temperature --from-beginning --bootstrap-server localhost:9092
{"headers":{"host":"n8n.teej.xyz","x-real-ip":"1.2.3.4","x-forwarded-for":"210.18.138.42, 210.18.138.42","x-forwarded-proto":"https","content-length":"76","accept-encoding":"gzip","cf-ray":"7f5a21324f339e2f-SIN","cf-visitor":"{\"scheme\":\"https\"}","user-agent":"curl/7.74.0","accept":"*/*","content-type":"application/json","cdn-loop":"cloudflare","cf-connecting-ip":"1.2.3.4","cf-ipcountry":"IN"},"params":{},"query":{},"body":{"day":"2023-08-12 22:05:01","temp":"49230","device":"PiZeroW"},"temp":49230,"device":"PiZeroW","updated_dt":"2023-08-12 22:05:01"
{"headers":{"host":"n8n.teej.xyz","x-real-ip":"1.2.3.4","x-forwarded-for":"210.18.138.42, 210.18.138.42","x-forwarded-proto":"https","content-length":"76","accept-encoding":"gzip","cf-ray":"7f5a22864ee64104-SIN","cf-visitor":"{\"scheme\":\"https\"}","user-agent":"curl/7.88.1","accept":"*/*","content-type":"application/json","cdn-loop":"cloudflare","cf-connecting-ip":"1.2.3.4","cf-ipcountry":"IN"},"params":{},"query":{},"body":{"day":"2023-08-12 22:06:01","temp":"55504","device":"Pi4_8GB"},"temp":55504,"device":"Pi4_8GB","updated_dt":"2023-08-12 22:06:01"}
{"headers":{"host":"n8n.teej.xyz","x-real-ip":"1.2.3.4","x-forwarded-for":"210.18.138.42, 210.18.138.42","x-forwarded-proto":"https","content-length":"76","accept-encoding":"gzip","cf-ray":"7f5a25726dc93dc9-SIN","cf-visitor":"{\"scheme\":\"https\"}","user-agent":"curl/7.88.1","accept":"*/*","content-type":"application/json","cdn-loop":"cloudflare","cf-connecting-ip":"1.2.3.4","cf-ipcountry":"IN"},"params":{},"query":{},"body":{"day":"2023-08-12 22:06:03","temp":"53556","device":"Pi4_8GB"},"temp":53556,"device":"Pi4_8GB","updated_dt":"2023-08-12 22:06:03"}
{"headers":{"host":"n8n.teej.xyz","x-real-ip":"1.2.3.4","x-forwarded-for":"210.18.138.42, 210.18.138.42","x-forwarded-proto":"https","content-length":"76","accept-encoding":"gzip","cf-ray":"7f5a26ee5ecc6bc1-SIN","cf-visitor":"{\"scheme\":\"https\"}","user-agent":"curl/7.74.0","accept":"*/*","content-type":"application/json","cdn-loop":"cloudflare","cf-connecting-ip":"1.2.3.4","cf-ipcountry":"IN"},"params":{},"query":{},"body":{"day":"2023-08-12 22:06:04","temp":"50844","device":"PiZeroW"},"temp":50844,"device":"PiZeroW","updated_dt":"2023-08-12 22:06:04"}
{"headers":{"host":"n8n.teej.xyz","x-real-ip":"1.2.3.4","x-forwarded-for":"210.18.138.42, 210.18.138.42","x-forwarded-proto":"https","content-length":"76","accept-encoding":"gzip","cf-ray":"7f5a27a43e15481c-SIN","cf-visitor":"{\"scheme\":\"https\"}","user-agent":"curl/7.88.1","accept":"*/*","content-type":"application/json","cdn-loop":"cloudflare","cf-connecting-ip":"1.2.3.4","cf-ipcountry":"IN"},"params":{},"query":{},"body":{"day":"2023-08-12 22:06:04","temp":"53556","device":"Pi4_8GB"},"temp":53556,"device":"Pi4_8GB","updated_dt":"2023-08-12 22:06:04"}
{"headers":{"host":"n8n.teej.xyz","x-real-ip":"1.2.3.4","x-forwarded-for":"210.18.138.42, 210.18.138.42","x-forwarded-proto":"https","content-length":"76","accept-encoding":"gzip","cf-ray":"7f5a2cd009634d63-SIN","cf-visitor":"{\"scheme\":\"https\"}","user-agent":"curl/7.74.0","accept":"*/*","content-type":"application/json","cdn-loop":"cloudflare","cf-connecting-ip":"1.2.3.4","cf-ipcountry":"IN"},"params":{},"query":{},"body":{"day":"2023-08-12 22:06:05","temp":"51382","device":"PiZeroW"},"temp":51382,"device":"PiZeroW","updated_dt":"2023-08-12 22:06:05"}
{"headers":{"host":"n8n.teej.xyz","x-real-ip":"1.2.3.4","x-forwarded-for":"210.18.138.42, 210.18.138.42","x-forwarded-proto":"https","content-length":"76","accept-encoding":"gzip","cf-ray":"7f5a32a5ab1f40dc-SIN","cf-visitor":"{\"scheme\":\"https\"}","user-agent":"curl/7.88.1","accept":"*/*","content-type":"application/json","cdn-loop":"cloudflare","cf-connecting-ip":"1.2.3.4","cf-ipcountry":"IN"},"params":{},"query":{},"body":{"day":"2023-08-12 22:06:05","temp":"53556","device":"Pi4_8GB"},"temp":53556,"device":"Pi4_8GB","updated_dt":"2023-08-12 22:06:05"}
{"headers":{"host":"n8n.teej.xyz","x-real-ip":"1.2.3.4","x-forwarded-for":"210.18.138.42, 210.18.138.42","x-forwarded-proto":"https","content-length":"76","accept-encoding":"gzip","cf-ray":"7f5a32be1ab64a95-SIN","cf-visitor":"{\"scheme\":\"https\"}","user-agent":"curl/7.74.0","accept":"*/*","content-type":"application/json","cdn-loop":"cloudflare","cf-connecting-ip":"1.2.3.4","cf-ipcountry":"IN"},"params":{},"query":{},"body":{"day":"2023-08-12 22:06:05","temp":"50306","device":"PiZeroW"},"temp":50306,"device":"PiZeroW","updated_dt":"2023-08-12 22:06:05"}}
Spark
A heartfelt thank you to Neeraj Yadav, whose inspiration and guidance have been invaluable to me. A recognized expert in Spark and Databricks, his insights have truly enriched my understanding.
from pyspark.sql import SparkSessio
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, MapType
# Define the full schema of the JSON
jsonSchema = StructType([
StructField("headers", MapType(StringType(), StringType()), True),
StructField("body", StructType([
StructField("day", StringType(), True),
StructField("temp", StringType(), True),
StructField("device", StringType(), True)
]), True),
StructField("temp", StringType(), True),
StructField("device", StringType(), True),
StructField("updated_dt", StringType(), True)
])
# Read the Kafka stream
kafkaStreamDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "https://<FQDN>:9094") \
.option("subscribe", "nathass-pi-temperature").option("maxOffsetsPerTrigger", 100) \
.load()
# Cast the value to a string and parse the JSON
stringDF = kafkaStreamDF.selectExpr("CAST(value AS STRING) as value")
jsonDF = stringDF.select(from_json(col("value"), jsonSchema).alias("parsed_value"))
# Flatten the JSON structure to extract all values
finalDF = jsonDF.select(
col("parsed_value.headers").alias("headers"),
col("parsed_value.body.day").alias("day"),
col("parsed_value.body.temp").alias("body_temp"),
col("parsed_value.body.device").alias("body_device"),
col("parsed_value.temp").alias("temp"),
col("parsed_value.device").alias("device"),
col("parsed_value.updated_dt").alias("updated_dt")
)
# Write the stream to a Databricks table
writeStream = finalDF.writeStream \
.outputMode("append") \
.format("delta") \
.option("checkpointLocation", "/nathass/pitemp_checkpoint/") \
.trigger(processingTime="30 seconds") \
.option("mergeSchema", "true") \
.table("nathass_pi_temperature")
Data on Databricks table
Metabase
Creating a container
sudo docker run -d -p 3000:3000 --network rofl -v /opt/metabase/metabase-data:/metabase-data -e "MB_DB_FILE=/metabase-data/metabase.db" --name metabase metabase/metabase
I've also incorporated instant alerts through Uptime-kuma, sending screaming notifications to Telegram and Element chat clients. They ensure immediate attention to critical changes.
More detailed insights into the setup, including the Ansible piece and automations, will be covered in a follow-up post.
Find the link to the Databricks driver for Metabase if you wish to try.
My experience showed me the beauty and the complexity of working with Kafka and other tools. Observing the temperature data flowing in real-time, seeing the notifications come to life, and realizing how all these tools can be interconnected was an enriching experience. While this initial data set is not vast in scope, it represents a meaningful beginning. My forthcoming experiment will involve pushing substantial volumes of data to Kafka, conducting stress tests, and probing the system's downstream effects. This process will help me grasp the complexities of the system and guide my ongoing efforts to refine and enhance it.
Conclusion:
This experiment was not about breaking new ground in technology but about demystifying it, understanding it, and applying it creatively. It reminded me that sometimes the best way to learn is to experiment, to break things, and then figure out how to put them back together, only better.
Knowledge acquired never goes in vain. And sometimes, the journey is as exciting as the destination.Your questions and insights are welcome. Connect with me on Matrix @teej:teej.sh.