Machine learning & Kafka KSQL stream processing
Using H2O.ai (the open source machine learning package) and a very helpful getting started guide I was able to produce a function that returns an anomaly score based on day, hour and power usage. That is, passed a day of month, hour-of-day and power-usage reading the function returns an anomaly score
AnomalyScore = AnomalyFunction (day, hour, power-usage)
Consuming 1500 W/hr at 9 am is pretty normal, however 1500 W/hr power draw at 4 am is a cause for alarm as that’s 500% more than expected. In short, an anomaly score above 1.0 signifies power usage beyond what has previously been historically measured at that time and day.
User Defined Function — and KSQL
KSQL is an open source, streaming SQL engine that enables real-time data processing against Apache Kafka. KSQL supports creating User Defined Scalar Functions (UDFs) via custom jars that are uploaded to the ext/ directory of the KSQL installation. That is, my compiled anomaly score function can be exposed to the KSQL server — and executed against the Kafka stream
TL;DR summary — compile some Java and place in the right directory, start ksql
server and verify the function is there …
ksql> list functions;
Function Name | Type------------------------------------- . . . ANOMOLY_POWER | SCALAR <--- I need this one ANOMOLY_WATER | SCALAR
Firstly a stream (“raw_power_stream”) is created to expose the real-time power-consumption from the kafka topic with real-time power consumption.
The scripts below show the steps to create the final “anomaly_power” kafka topic which will be a stream of events where the anomaly function (“anomaly_power”) has found a significantly unusual value. That is, the “anomaly_power” topic should be silent unless an unusual event has occurred
create stream raw_power_stream with (kafka_topic='raw_power', value_format='avro');create stream power_stream_rekeyed as \select rowtime, hour, kwh, anomoly_power(hour, kwh) as fn \from raw_power_stream partition by rowtime;create stream anomoly_power with (value_format='JSON') as \select rowtime as event_ts, hour, kwh, fn \from power_stream_rekeyed where fn>1.0;
Ringing the Alarm
The continuous real-time stream processing is fairly useless unless we have a way of raising an alarm when something odd has happened. I wanted notifications to appear on both my mobile phone and be announced over our Google Home smart speaker. Why get bothered only once when I can get annoyed twice?
Mobile Device notification via Pushbullet
Pushbullet is an awesome service bridge that provide a simple API for sending push-notifications to iOS and Android devices.
A bit of python code consumes the ANOMOLY_POWER topic and calls pushbullet. A consumer is established, and an event handler calls the notification service on receipt of a new Kafka events. Each message generates a new push notification.
Google Home Text-to-Speech (TTS) via Home Assistant
I run the Hass.io Home Assistant software locally to control home automation tasks. One nice feature is a simple API to send text messages as spoken voice commands directly to out Google Home smart speaker. That is, construct a string — and it gets spoken (quite loudly) via the Google Home.
What did I learn?
The first legitimate alarm I received was when I was on holiday. The notification at 9am on a Saturday morning was an notification signifying the household was using substantially less power than was expected. This was indeed true — maybe I need more holidays to train the model correctly!
Ready To Try
The entire project simply requires docker to run locally.