1. 程式人生 > >Machine learning & Kafka KSQL stream processing 

Machine learning & Kafka KSQL stream processing 

Using H2O.ai (the open source machine learning package) and a very helpful getting started guide I was able to produce a function that returns an anomaly score based on day, hour and power usage. That is, passed a day of month, hour-of-day and power-usage reading the function returns an anomaly score

AnomalyScore = AnomalyFunction (day, hour, power-usage)

Consuming 1500 W/hr at 9 am is pretty normal, however 1500 W/hr power draw at 4 am is a cause for alarm as that’s 500% more than expected. In short, an anomaly score above 1.0 signifies power usage beyond what has previously been historically measured at that time and day.

Anomaly score based on hour and power usage. Values above 1.0 are unusual

User Defined Function — and KSQL

KSQL is an open source, streaming SQL engine that enables real-time data processing against Apache Kafka. KSQL supports creating User Defined Scalar Functions (UDFs) via custom jars that are uploaded to the ext/ directory of the KSQL installation. That is, my compiled anomaly score function can be exposed to the KSQL server — and executed against the Kafka stream

TL;DR summary — compile some Java and place in the right directory, start ksql server and verify the function is there …

ksql> list functions;
 Function Name           | Type-------------------------------------  . . . ANOMOLY_POWER           | SCALAR   <--- I need this one ANOMOLY_WATER           | SCALAR

Firstly a stream (“raw_power_stream”) is created to expose the real-time power-consumption from the kafka topic with real-time power consumption.

The scripts below show the steps to create the final “anomaly_power” kafka topic which will be a stream of events where the anomaly function (“anomaly_power”) has found a significantly unusual value. That is, the “anomaly_power” topic should be silent unless an unusual event has occurred

create stream raw_power_stream with (kafka_topic='raw_power', value_format='avro');create stream  power_stream_rekeyed as \select rowtime, hour, kwh, anomoly_power(hour, kwh) as fn \from raw_power_stream partition by rowtime;create stream anomoly_power with (value_format='JSON') as \select rowtime as event_ts, hour, kwh, fn \from power_stream_rekeyed where fn>1.0;

Ringing the Alarm

The continuous real-time stream processing is fairly useless unless we have a way of raising an alarm when something odd has happened. I wanted notifications to appear on both my mobile phone and be announced over our Google Home smart speaker. Why get bothered only once when I can get annoyed twice?

Mobile Device notification via Pushbullet

Pushbullet is an awesome service bridge that provide a simple API for sending push-notifications to iOS and Android devices.

An iOS push-notification via push-bullet

A bit of python code consumes the ANOMOLY_POWER topic and calls pushbullet. A consumer is established, and an event handler calls the notification service on receipt of a new Kafka events. Each message generates a new push notification.

Google Home Text-to-Speech (TTS) via Home Assistant

I run the Hass.io Home Assistant software locally to control home automation tasks. One nice feature is a simple API to send text messages as spoken voice commands directly to out Google Home smart speaker. That is, construct a string — and it gets spoken (quite loudly) via the Google Home.

What did I learn?

The first legitimate alarm I received was when I was on holiday. The notification at 9am on a Saturday morning was an notification signifying the household was using substantially less power than was expected. This was indeed true — maybe I need more holidays to train the model correctly!

Ready To Try

The entire project simply requires docker to run locally.

Credits & References